238 lines
10 KiB
Python
238 lines
10 KiB
Python
#!/usr/bin/python
|
|
#Bosch Home Automation <> MQTT Gateway
|
|
|
|
import requests, json, time
|
|
import paho.mqtt.client as mqtt
|
|
|
|
#Suppress http request warning
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class BSH:
|
|
#BSH Settings
|
|
IP_SHC = "192.168.178.20"
|
|
baseurl = 'https://' + IP_SHC + ':8444'
|
|
url = baseurl + '/remote/json-rpc'
|
|
states = ["childLock","value","position","armActivationDelayTime","alarmActivationDelayTime",
|
|
"walkState","petImmunityState","quality","faults","triggers","actuators","remainingTimeUntilArmed",
|
|
"state","path","enabled","devices","path","runningStartTime","runningEndTime","operationMode",
|
|
"setpointTemperature","setpointTemperatureForLevelComfort","setpointTemperatureForLevelEco",
|
|
"schedule","ventilationMode","low","boostMode","summerMode","on","calibrated","movingTimeTopToBottom",
|
|
"movingTimeBottomToTop","level","operationState","deviceId","switchState","operationMode",
|
|
"powerConsumption","energyConsumption","delay"]
|
|
|
|
#Mosquitto Settings
|
|
client_name = "BSH_Mosquitto_Client"
|
|
host_name = "192.168.178.76"
|
|
|
|
session = requests.Session()
|
|
AutomationRules = {}
|
|
DicAutomationRules = {}
|
|
|
|
Devices = {}
|
|
DicDevices = {}
|
|
|
|
Scenarios = {}
|
|
DicScenarios = {}
|
|
|
|
Rooms = {}
|
|
DicRooms = {}
|
|
|
|
client = mqtt.Client(client_name)
|
|
|
|
#get URL Headers
|
|
def getURLHeaders(self):
|
|
return {'Content-Type' : 'application/json'}
|
|
|
|
def getCertificate(self):
|
|
return ("./example-cert.pem","./example-key.pem")
|
|
|
|
#subscribe to Long Polling
|
|
def subscribe(self):
|
|
payload = [
|
|
{
|
|
"jsonrpc":"2.0",
|
|
"method":"RE/subscribe",
|
|
"id":"randomid",
|
|
"params": ["com/bosch/sh/remote/*", None]
|
|
}
|
|
]
|
|
return self.shc_request(payload)
|
|
|
|
#Start Long Polling and keep it 2 seconds open
|
|
def poll(self,pollId):
|
|
payload = [
|
|
{
|
|
"jsonrpc":"2.0",
|
|
"method":"RE/longPoll",
|
|
"id":"randomid",
|
|
"params": [pollId, 2]
|
|
}
|
|
]
|
|
return self.shc_request(payload)
|
|
|
|
#Post Polling data to SHC
|
|
def shc_request(self,payload):
|
|
postRequest = requests.post(self.url, data=json.dumps(payload), headers = self.getURLHeaders(),
|
|
verify = False, cert = self.getCertificate())
|
|
return json.loads(postRequest.text)[0]['result']
|
|
|
|
#Sent MQTT Debugging Message
|
|
def publishDebugMsg(self,msg):
|
|
self.client.publish("bsh/debug",msg,0,True)
|
|
print(msg)
|
|
|
|
#Get all BSH Decives
|
|
def getDevices(self):
|
|
getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/devices", verify=False,
|
|
cert = self.getCertificate(), headers = self.getURLHeaders())
|
|
return json.loads(getRequest.content)
|
|
|
|
|
|
#Get all BSH automation rules
|
|
def getAutomationRules(self):
|
|
getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/automation/rules", verify=False,
|
|
cert = self.getCertificate(), headers = self.getURLHeaders())
|
|
return json.loads(getRequest.content)
|
|
|
|
#Get all BSH scenarios
|
|
def getScenarios(self):
|
|
getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/scenarios", verify=False,
|
|
cert = self.getCertificate(), headers = self.getURLHeaders())
|
|
return json.loads(getRequest.content)
|
|
|
|
#Get all BSH rooms
|
|
def getRooms(self):
|
|
getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/rooms", verify=False,
|
|
cert = self.getCertificate(), headers = self.getURLHeaders())
|
|
contents = getRequest.content
|
|
return json.loads(contents)
|
|
|
|
#Generate a topic name for a device
|
|
def getTopicBase(self,dev):
|
|
topic = ""
|
|
if "roomId" in dev:
|
|
topic = "bsh/"+self.DicRooms[dev["roomId"]]["name"]+"/"+dev["name"].replace(" ","_")+"/"
|
|
else:
|
|
topic = "bsh/"+dev["name"].replace(" ","_")+"/"
|
|
return topic.replace("-","")
|
|
|
|
#update message for a single device
|
|
def publishDevice(self,id):
|
|
topicbase = self.getTopicBase(id)
|
|
getRequest = self.session.get("https://"+self.IP_SHC+":8444/smarthome/devices/"+id["id"]+"/services", verify=False,
|
|
cert = self.getCertificate(), headers = self.getURLHeaders())
|
|
data = json.loads(getRequest.content)
|
|
for id in data:
|
|
if "state" in id:
|
|
for s in self.states:
|
|
if s in id["state"]:
|
|
topic = topicbase+id["state"]["@type"]+"/"+s
|
|
payload = id["state"][s]
|
|
self.client.publish(topic,json.dumps(payload, indent=4, sort_keys=False),0,True)
|
|
self.publishDebugMsg(" Publishing topic: "+ topic + " Payload: "+ json.dumps(payload, indent=4, sort_keys=False))
|
|
|
|
#publish the initial status messages
|
|
def publishBSHStatus(self):
|
|
for x in self.Devices:
|
|
self.publishDevice(x)
|
|
|
|
#react to MQTT scenario requests:
|
|
def onMqttMessage(self, client, userdata, message):
|
|
#currently only react on trigger="ON"
|
|
if (str(message.payload.decode("utf-8")) == "ON"):
|
|
self.client.publish(message.topic,"OFF", 0,True)
|
|
for s in self.Scenarios:
|
|
topic = "bsh/scenarios/"+s["name"].replace(" ","_")+"/trigger"
|
|
if (topic == message.topic):
|
|
self.publishDebugMsg(" Scenario triggered: "+s["name"])
|
|
url = self.baseurl + "/smarthome/scenarios/"+s["id"]+"/triggers"
|
|
postRequest = requests.post(url, data="", headers = self.getURLHeaders(),
|
|
verify = False, cert = self.getCertificate())
|
|
data = json.loads(postRequest.text)[0]['result']
|
|
if "errorCode" in data:
|
|
self.publishDebugMsg("Error occured during Scenario Call:")
|
|
self.publishDebugMsg(data["errorCode"])
|
|
|
|
#subscribe to message to trigger scenarios
|
|
def subscribeToScenariosRequests(self):
|
|
for s in self.Scenarios:
|
|
topic = "bsh/scenarios/"+s["name"].replace(" ","_")+"/trigger"
|
|
self.publishDebugMsg("Adding subscribe: "+topic)
|
|
self.client.publish(topic,"OFF", 0,True)
|
|
self.client.subscribe(topic,0)
|
|
self.client.on_message = self.onMqttMessage
|
|
|
|
|
|
#set up the system: login to MQTT + get all needed data from the BSH
|
|
def __init__(self):
|
|
self.client.connect(self.host_name,1884)
|
|
self.publishDebugMsg("Connecting to MQTT Server")
|
|
self.publishDebugMsg("Setting up the Environment...")
|
|
|
|
self.publishDebugMsg("Getting Room information")
|
|
self.Rooms = self.getRooms()
|
|
self.publishDebugMsg(str(len(self.Rooms))+" rooms found:")
|
|
for x in self.Rooms:
|
|
self.publishDebugMsg(x["id"]+":"+x["name"])
|
|
self.DicRooms.update({x["id"]:x})
|
|
|
|
self.publishDebugMsg("Getting Devices")
|
|
self.Devices = self.getDevices()
|
|
self.publishDebugMsg (str(len(self.Devices))+" Devices found:")
|
|
for x in self.Devices:
|
|
if "roomId" in x:
|
|
self.publishDebugMsg(x["id"]+":"+x["name"]+":"+self.DicRooms[x["roomId"]]["name"])
|
|
else:
|
|
self.publishDebugMsg(x["id"]+":"+x["name"])
|
|
|
|
self.DicDevices.update({x["id"]:x})
|
|
|
|
self.publishDebugMsg("Getting Automation Rules")
|
|
self.AutomationRules = self.getAutomationRules()
|
|
self.publishDebugMsg(str(len(self.AutomationRules))+" Automation rules found:")
|
|
for x in self.AutomationRules:
|
|
self.publishDebugMsg(x["id"]+":"+x["name"])
|
|
self.DicAutomationRules.update({x["id"]:x})
|
|
|
|
self.publishDebugMsg("Getting Senarios")
|
|
self.Scenarios = self.getScenarios()
|
|
self.publishDebugMsg (str(len(self.Scenarios))+" Scenarios found:")
|
|
for x in self.Scenarios:
|
|
self.publishDebugMsg(x["id"]+":"+x["name"])
|
|
self.DicScenarios.update({x["id"]:x})
|
|
|
|
self.publishDebugMsg("Posting initial states to MQTT Server")
|
|
self.publishBSHStatus()
|
|
|
|
self.publishDebugMsg("Subscribe to Service Requests")
|
|
self.subscribeToScenariosRequests()
|
|
|
|
|
|
def loop(self):
|
|
self.publishDebugMsg("Subscribing to Bosch Smart Home System...")
|
|
pollId = self.subscribe()
|
|
self.publishDebugMsg(pollId)
|
|
self.publishDebugMsg("Starting Polling...")
|
|
#Restart Long Polling every 5 seconds
|
|
while True:
|
|
result = self.poll(pollId)
|
|
if result != []:
|
|
for e in result:
|
|
if "deviceId" in e:
|
|
if e["deviceId"] in self.DicDevices:
|
|
self.publishDebugMsg("status change for: "+self.DicDevices[e["deviceId"]]["name"])
|
|
self.publishDevice(self.DicDevices[e["deviceId"]])
|
|
else:
|
|
self.publishDebugMsg("Unknown message received:")
|
|
self.publishDebugMsg('\n------------------------------------------------------ %d -\n' % len(result))
|
|
self.publishDebugMsg(json.dumps(e, indent=4, sort_keys=False))
|
|
self.client.loop_start()
|
|
time.sleep(5)
|
|
self.client.loop_stop()
|
|
|
|
|
|
|
|
BoschSmartHome = BSH()
|
|
BoschSmartHome.loop()
|