Heating and Ventilation Control
/HvcMqtt
Keine Erläuterungen gefunden.
import os.path import queue from paho.mqtt import client as mqtt_client import json import time import threading q = queue.Queue() #queue for data exchange mqtt_connect=False def on_connect(client, userdata, flags, rc): if rc == 0: if True: #debug print("Connection to broker successfull") global mqtt_connect mqtt_connect = True else: print("Connection to broker failed") def on_message(client, userdata, message): """ Callback function for MQTT client, feeds all received messages into the queue. """ global q #queue for data exchange if False: #if True: #debug print("message received " ,str(message.payload.decode("utf-8"))) print("message topic=",message.topic) print("message qos=",message.qos) print("message retain flag=",message.retain) q.put(message) def fillQueue(): """ For testing purpose only, fill the queue by some meaningfull values """ global q msg = {} now = datetime.now() msg["today"] = now.strftime("%Y-%m-%d") msg["hour"] = now.strftime("%H") msg["minutes"] = now.strftime("%M") msg["seconds"] = now.strftime("%S") q.put(msg) class HvcMqtt: def __init__(self, debug=False): """ Initialize and configure the MQTT client, put all sensors into the dictionary. """ global q self.debug = debug self.logPath = "./LogDir/" #self.q = queue.Queue() self.broker ="localhost" #alternatively use IP = 192.168.178.39 self.port = 1883 self.topic = "zigbee2mqtt/" #compose list of sensors here: self.sensor = {} self.sensor["S01"] = "S01" self.sensor["S02"] = "S02" self.sensor["S03"] = "S03" self.sensor["S04"] = "S04" self.sensor["S05"] = "S05" self.sensor["S06"] = "S06" self.sensor["S07"] = "S07" self.sensor["S08"] = "S08" self.sensor["S09"] = "S09" self.sensor["S10"] = "S10" self.vars = ["T", "phi", "p"] for v in self.vars: self.actual[v] = {} self.last[v] = {} for s in self.sensor: self.actual[v][s]=-999 self.last[v][s]=-999 self.state[s]=-1 def subscribe2MQTT(self): """ Create MQTT client and subscribe it to sensorIDs """ global mqtt_connect mqtt_topic = [] mqtt_topic.append( ("zigbee2mqtt/bridge/request/networkmap",0) ) #read Zigbee map for i,s in enumerate(self.sensor): thisTopic = self.topic + self.sensor[s] mqtt_topic.append( (thisTopic,0) ) #QOS=0 if self.debug: print("creating new instance") client = mqtt_client.Client("TemperatureSensors") #attach function to callback client.on_connect= on_connect #attach function to callback client.on_message= on_message if self.debug: print("connecting to broker:", self.broker) client.connect(self.broker, self.port) client.loop_start() #Wait for connection while mqtt_connect != True: if self.debug: print("waiting for mqtt ....") time.sleep(0.1) if self.debug: print("Subscribing to sensor topics", mqtt_topic) client.subscribe(mqtt_topic) def initialize(self): """ create MQTT clients, subscribe to sensors, keep listening """ if self.debug: print("initialize MQTT listener") worker = threading.Thread( target=self.subscribe2MQTT() ) worker.setDaemon(True) worker.start() return def genZigbeeMap(self): client = mqtt_client.Client("ZigbeeMap") client.connect(self.broker, self.port) #mqtt_topic = "zigbee2mqtt/bridge/networkmap/graphviz" mqtt_topic = "zigbee2mqtt/bridge/request/networkmap" message = "{'type': 'graphviz', 'routes': False}" # -C 1 >${file}routes.dot & client.publish(mqtt_topic,message) #receive and process sensor values def processQueue(self): """ Process values in queue, message.topic indicates the sensor received message.payloud is binary and needs to be decoded. The resulting string is a json object, that needs to be converted to dictionary """ global q while not q.empty(): message = q.get() if message is None: continue try: topic = message.topic msgStr = message.payload.decode("utf-8") data = json.loads(msgStr) sensorID = topic[topic.find("/")+1:] if self.debug: print("data received from queue:") print(sensorID) print("data",data) for s in self.sensor: if self.sensor[s] == str(sensorID): thisSensor = s #update values for thisSensor self.actual["T"][thisSensor] = data["temperature"] self.actual["p"][thisSensor] = data["pressure"] self.actual["phi"][thisSensor] = data["humidity"] self.state[thisSensor] = data["battery"] except: if self.debug: print("message from queue:",message) continue return self.actual, self.state def writeLog(self, today, hour, minute, epsilon=1E-3): """ On change, append values to logfiles. Since different variables do not change synchronously, split logfiles to type of variables, in case of pressure, the mean of all sensors might be sufficient? """ #check for updated values anyChange = False change = {} for v in self.vars: change[v] = False for s in self.sensor: evalChange = (self.actual[v][s]-self.last[v][s]) / self.last[v][s] if abs(evalChange) > epsilon: change[v] = True self.last[v][s] = self.actual[v][s] if self.debug: print(v,"changed?\t",change[v]) #in case of change, write to log file = self.logPath + today for v in self.vars: if change[v]: anyChange = True thisFile = file + "_" + v + ".dat" if os.path.exists(thisFile): #append to file fp = open(thisFile, 'a', encoding='utf-8') else: #create file with header fp = open(thisFile, 'w', encoding='utf-8') myStr = " #" for s in self.sensor: myStr = myStr + "\t" + s fp.write(myStr+"\n") myStr = hour +":"+ minute for s in self.sensor: myStr = myStr + "\t" + '{0:5.2f}'.format(self.actual[v][s]) fp.write(myStr+"\n") fp.close() return anyChange #persistant data structures HvcMqtt.actual = {} HvcMqtt.last = {} HvcMqtt.state = {} if __name__ == "__main__": """ Testing the threads, queue and MQTT. Put some equivalent to you main programm. """ from datetime import datetime myMqtt = HvcMqtt(True) myMqtt.initialize() #subscribe to Zigbee map #threading.Thread( #myMqtt.readZigbeeMap() #generate Zigbee map print("generate Zigbee map") myMqtt.genZigbeeMap() #exit() def secondContainer(): #fillQueue() #for testing purpose only: return def minuteContainer(myMqtt): now = datetime.now() today = now.strftime("%Y-%m-%d") hour = now.strftime("%H") minute = now.strftime("%M") vars, state = myMqtt.processQueue() #safe to logfile or control something... myMqtt.writeLog(today, hour, minute) print(str(hour)+":"+str(minute), vars["T"], vars["phi"], vars["p"], state) return secCount = 0 def mainLoop(): global secCount secCount= secCount+1 #call itself to loop forever, each 1 second threading.Timer(1, mainLoop).start() secondContainer() if secCount >= 60: # each 60 sec secCount = 0 minuteContainer(myMqtt) mainLoop()
python
php
Der gesamte Sourcecode darf gemäß GNU General Public License weiterverbreitet werden.