#!/usr/bin/python #Bosch Home Automation <> MQTT Gateway import requests, json, time import paho.mqtt.client as mqtt #Suppress http request warning import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class BSH: #BSH Settings IP_SHC = "192.168.178.20" baseurl = 'https://' + IP_SHC + ':8444' url = baseurl + '/remote/json-rpc' states = ["childLock","value","position","armActivationDelayTime","alarmActivationDelayTime", "walkState","petImmunityState","quality","faults","triggers","actuators","remainingTimeUntilArmed", "state","path","enabled","devices","path","runningStartTime","runningEndTime","operationMode", "setpointTemperature","setpointTemperatureForLevelComfort","setpointTemperatureForLevelEco", "schedule","ventilationMode","low","boostMode","summerMode","on","calibrated","movingTimeTopToBottom", "movingTimeBottomToTop","level","operationState","deviceId","switchState","operationMode", "powerConsumption","energyConsumption","delay"] #Mosquitto Settings client_name = "BSH_Mosquitto_Client" host_name = "192.168.178.36" session = requests.Session() AutomationRules = {} DicAutomationRules = {} Devices = {} DicDevices = {} Scenarios = {} DicScenarios = {} Rooms = {} DicRooms = {} client = mqtt.Client(client_name) #get URL Headers def getURLHeaders(self): return {'Content-Type' : 'application/json'} def getCertificate(self): return ("./example-cert.pem","./example-key.pem") #subscribe to Long Polling def subscribe(self): payload = [ { "jsonrpc":"2.0", "method":"RE/subscribe", "id":"randomid", "params": ["com/bosch/sh/remote/*", None] } ] return self.shc_request(payload) #Start Long Polling and keep it 2 seconds open def poll(self,pollId): payload = [ { "jsonrpc":"2.0", "method":"RE/longPoll", "id":"randomid", "params": [pollId, 2] } ] return self.shc_request(payload) #Post Polling data to SHC def shc_request(self,payload): postRequest = requests.post(self.url, data=json.dumps(payload), headers = self.getURLHeaders(), verify = False, cert = self.getCertificate()) return json.loads(postRequest.text)[0]['result'] #Sent MQTT Debugging Message def publishDebugMsg(self,msg): self.client.publish("bsh/debug",msg,0,True) #Get all BSH Decives def getDevices(self): getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/devices", verify=False, cert = self.getCertificate(), headers = self.getURLHeaders()) return json.loads(getRequest.content) #Get all BSH automation rules def getAutomationRules(self): getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/automation/rules", verify=False, cert = self.getCertificate(), headers = self.getURLHeaders()) return json.loads(getRequest.content) #Get all BSH scenarios def getScenarios(self): getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/scenarios", verify=False, cert = self.getCertificate(), headers = self.getURLHeaders()) return json.loads(getRequest.content) #Get all BSH rooms def getRooms(self): getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/rooms", verify=False, cert = self.getCertificate(), headers = self.getURLHeaders()) contents = getRequest.content return json.loads(contents) #Generate a topic name for a device def getTopicBase(self,dev): topic = "" if "roomId" in dev: topic = "bsh/"+self.DicRooms[dev["roomId"]]["name"]+"/"+dev["name"].replace(" ","_")+"/" else: topic = "bsh/"+dev["name"].replace(" ","_")+"/" return topic.replace("-","") #update message for a single device def publishDevice(self,id): topicbase = self.getTopicBase(id) getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/devices/"+id["id"]+"/services", verify=False, cert = self.getCertificate(), headers = self.getURLHeaders()) data = json.loads(getRequest.content) for id in data: if "state" in id: for s in self.states: if s in id["state"]: topic = topicbase+id["state"]["@type"]+"/"+s payload = id["state"][s] self.client.publish(topic,json.dumps(payload, indent=4, sort_keys=False),0,True) self.publishDebugMsg(" Publishing topic: "+ topic + " Payload: "+ json.dumps(payload, indent=4, sort_keys=False)) #publish the initial status messages def publishBSHStatus(self): for x in self.Devices: self.publishDevice(x) #react to MQTT scenario requests: def onMqttMessage(self, client, userdata, message): #currently only react on trigger="ON" if (str(message.payload.decode("utf-8")) == "ON"): self.client.publish(message.topic,"OFF", 0,True) for s in self.Scenarios: topic = "bsh/scenarios/"+s["name"].replace(" ","_")+"/trigger" if (topic == message.topic): self.publishDebugMsg(" Scenario triggered: "+s["name"]) url = self.baseurl + "/smarthome/scenarios/"+s["id"]+"/triggers" postRequest = requests.post(url, data="", headers = self.getURLHeaders(), verify = False, cert = self.getCertificate()) data = json.loads(postRequest.text)[0]['result'] if "errorCode" in data: self.publishDebugMsg("Error occured during Scenario Call:") self.publishDebugMsg(data["errorCode"]) #subscribe to message to trigger scenarios def subscribeToScenariosRequests(self): for s in self.Scenarios: topic = "bsh/scenarios/"+s["name"].replace(" ","_")+"/trigger" self.publishDebugMsg("Adding subscribe: "+topic) self.client.publish(topic,"OFF", 0,True) self.client.subscribe(topic,0) self.client.on_message = self.onMqttMessage #set up the system: login to MQTT + get all needed data from the BSH def __init__(self): self.client.connect(self.host_name) self.publishDebugMsg("Connecting to MQTT Server") self.publishDebugMsg("Setting up the Environment...") self.publishDebugMsg("Getting Room information") self.Rooms = self.getRooms() self.publishDebugMsg(str(len(self.Rooms))+" rooms found:") for x in self.Rooms: self.publishDebugMsg(x["id"]+":"+x["name"]) self.DicRooms.update({x["id"]:x}) self.publishDebugMsg("Getting Devices") self.Devices = self.getDevices() self.publishDebugMsg (str(len(self.Devices))+" Devices found:") for x in self.Devices: if "roomId" in x: self.publishDebugMsg(x["id"]+":"+x["name"]+":"+self.DicRooms[x["roomId"]]["name"]) else: self.publishDebugMsg(x["id"]+":"+x["name"]) self.DicDevices.update({x["id"]:x}) self.publishDebugMsg("Getting Automation Rules") self.AutomationRules = self.getAutomationRules() self.publishDebugMsg(str(len(self.AutomationRules))+" Automation rules found:") for x in self.AutomationRules: self.publishDebugMsg(x["id"]+":"+x["name"]) self.DicAutomationRules.update({x["id"]:x}) self.publishDebugMsg("Getting Senarios") self.Scenarios = self.getScenarios() self.publishDebugMsg (str(len(self.Scenarios))+" Scenarios found:") for x in self.Scenarios: self.publishDebugMsg(x["id"]+":"+x["name"]) self.DicScenarios.update({x["id"]:x}) self.publishDebugMsg("Posting initial states to MQTT Server") self.publishBSHStatus() self.publishDebugMsg("Subscribe to Service Requests") self.subscribeToScenariosRequests() def loop(self): self.publishDebugMsg("Subscribing to Bosch Smart Home System...") pollId = self.subscribe() self.publishDebugMsg(pollId) self.publishDebugMsg("Starting Polling...") #Restart Long Polling every 5 seconds while True: result = self.poll(pollId) if result != []: for e in result: if "deviceId" in e: if e["deviceId"] in self.DicDevices: self.publishDebugMsg("status change for: "+self.DicDevices[e["deviceId"]]["name"]) self.publishDevice(self.DicDevices[e["deviceId"]]) else: self.publishDebugMsg("Unknown message received:") self.publishDebugMsg('\n------------------------------------------------------ %d -\n' % len(result)) self.publishDebugMsg(json.dumps(e, indent=4, sort_keys=False)) self.client.loop_start() time.sleep(5) self.client.loop_stop() BoschSmartHome = BSH() BoschSmartHome.loop()