Dr. Arne JachensDr. Arne Jachens

Heating and Ventilation Control

HvcMqtt

Keine Erläuterungen gefunden.

import  os.path
import  queue
from paho.mqtt import  client as  mqtt_client
import  json
import  time
import  threading

q = queue.Queue() #queue for  data exchange

mqtt_connect=False
def  on_connect(client, userdata, flags, rc):
    if rc == 0:
        if True: #debug
            print("Connection to broker successfull")
        global mqtt_connect
        mqtt_connect = True   
    else:
        print("Connection to broker failed")
    
def  on_message(client, userdata, message):
    """
    Callback function for  MQTT client,
    feeds all received messages into the queue.
    """
    global q #queue for  data exchange
    if False:
    #if True: #debug
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    q.put(message)
    

def  fillQueue():
    """
    For testing purpose only,
    fill the queue by some meaningfull values
    """
    global q
    msg = {}
    now = datetime.now()
    msg["today"]   = now.strftime("%Y-%m-%d")
    msg["hour"]    = now.strftime("%H")
    msg["minutes"] = now.strftime("%M")
    msg["seconds"] = now.strftime("%S")
    q.put(msg)


    
class HvcMqtt:
    def  __init__(self, debug=False):
        """
        Initialize and configure the MQTT client,
        put all sensors into the dictionary.
        """
        global q
        self.debug = debug
        self.logPath = "./LogDir/"
        #self.q = queue.Queue()
        self.broker ="localhost" #alternatively use IP = 192.168.178.39
        self.port = 1883
        self.topic = "zigbee2mqtt/"
        #compose list of sensors here:
        self.sensor = {}
        self.sensor["S01"] = "S01"
        self.sensor["S02"] = "S02"
        self.sensor["S03"] = "S03"
        self.sensor["S04"] = "S04"
        self.sensor["S05"] = "S05"
        self.sensor["S06"] = "S06"
        self.sensor["S07"] = "S07"
        self.sensor["S08"] = "S08"
        self.sensor["S09"] = "S09"
        self.sensor["S10"] = "S10"
        
        self.vars = ["T", "phi", "p"]
        for  v in self.vars:
            self.actual[v] = {}
            self.last[v] = {}
            for  s in self.sensor:
                self.actual[v][s]=-999
                self.last[v][s]=-999
                self.state[s]=-1
     



    def  subscribe2MQTT(self):
        """
        Create MQTT client  
        and subscribe it to sensorIDs
        """
        global mqtt_connect
        mqtt_topic = []
        mqtt_topic.append( ("zigbee2mqtt/bridge/request/networkmap",0) ) #read Zigbee map
        for  i,s in enumerate(self.sensor):
            thisTopic = self.topic + self.sensor[s]
            mqtt_topic.append( (thisTopic,0) ) #QOS=0

        if self.debug:
            print("creating new instance")
        client = mqtt_client.Client("TemperatureSensors")
        #attach function to callback
        client.on_connect= on_connect
        #attach function to callback
        client.on_message= on_message  
        if self.debug:
            print("connecting to broker:", self.broker)                    
        client.connect(self.broker, self.port)
        client.loop_start()
        #Wait for  connection
        while mqtt_connect != True: 
            if self.debug:
                print("waiting for  mqtt ....")
            time.sleep(0.1)

        if self.debug:
            print("Subscribing to sensor topics", mqtt_topic)
        client.subscribe(mqtt_topic)

    
    def  initialize(self):
        """
        create MQTT clients, subscribe to sensors, keep listening
        """
        if self.debug:
            print("initialize MQTT listener")
            
        worker = threading.Thread( target=self.subscribe2MQTT() )
        worker.setDaemon(True)
        worker.start()
        
        return
    
    def  genZigbeeMap(self):
        client = mqtt_client.Client("ZigbeeMap")                   
        client.connect(self.broker, self.port)
        #mqtt_topic = "zigbee2mqtt/bridge/networkmap/graphviz"
        mqtt_topic = "zigbee2mqtt/bridge/request/networkmap"
        message = "{'type': 'graphviz', 'routes': False}"
        # -C 1 >${file}routes.dot &

        client.publish(mqtt_topic,message)
        
        
    #receive and process sensor values
    def  processQueue(self):
        """
        Process values in queue,
        message.topic indicates the sensor received
        message.payloud is binary and needs to be decoded.
        The resulting string is a json object,
        that needs to be converted to dictionary
        """
        global q
        while not q.empty():
            message = q.get()
            if message is None:
                continue
            
            try:
                topic = message.topic
                msgStr = message.payload.decode("utf-8")
                data  = json.loads(msgStr)
                sensorID = topic[topic.find("/")+1:]
                if self.debug:
                    print("data received from queue:")
                    print(sensorID)
                    print("data",data)
                    
                for  s in self.sensor:
                    if self.sensor[s] == str(sensorID):
                        thisSensor = s

                #update values for  thisSensor
                self.actual["T"][thisSensor]   = data["temperature"]
                self.actual["p"][thisSensor]   = data["pressure"]
                self.actual["phi"][thisSensor] = data["humidity"]
                self.state[thisSensor]         = data["battery"]
            except:
                if self.debug:
                    print("message from queue:",message)
                continue
        
        return self.actual, self.state

    
    def  writeLog(self, today, hour, minute, epsilon=1E-3):
        """
        On change, append values to logfiles.
        Since different variables do not change synchronously, split logfiles to type of variables,
        in case of pressure, the mean of all sensors might be sufficient?
        """    
        #check for  updated values
        anyChange = False
        change = {}
        for  v in self.vars:
            change[v] = False
            for  s in self.sensor:
                evalChange = (self.actual[v][s]-self.last[v][s]) / self.last[v][s]
                if abs(evalChange) > epsilon:
                    change[v] = True
                    self.last[v][s] = self.actual[v][s]
            if self.debug:
                print(v,"changed?\t",change[v])

        #in case of change, write to log
        file = self.logPath + today
        for  v in self.vars:
            if change[v]:
                anyChange = True
                thisFile = file + "_" + v + ".dat"
                if os.path.exists(thisFile):
                    #append to file
                    fp = open(thisFile, 'a', encoding='utf-8')
                else:
                    #create file with header
                    fp = open(thisFile, 'w', encoding='utf-8')
                    myStr = " #"
                    for  s in self.sensor:
                        myStr = myStr +  "\t" + s
                    fp.write(myStr+"\n")
                    
                myStr = hour +":"+ minute
                for  s in self.sensor:
                    myStr = myStr +  "\t" + '{0:5.2f}'.format(self.actual[v][s])

                fp.write(myStr+"\n")
                fp.close()
                
        return anyChange
               
 
#persistant data structures
HvcMqtt.actual = {}
HvcMqtt.last   = {}
HvcMqtt.state  = {}



if __name__ == "__main__":
    """
    Testing the threads, queue and MQTT.
    Put some equivalent to you main programm.
    """
    from datetime import   datetime

    
    myMqtt = HvcMqtt(True)
    myMqtt.initialize()
    #subscribe to Zigbee map
    #threading.Thread(
    #myMqtt.readZigbeeMap() 
    #generate Zigbee map
    print("generate Zigbee map")
    myMqtt.genZigbeeMap()
    #exit()
    
    def  secondContainer():
        
        #fillQueue() #for  testing purpose only:
        return

    def  minuteContainer(myMqtt):
        now    = datetime.now()
        today  = now.strftime("%Y-%m-%d")
        hour   = now.strftime("%H")
        minute = now.strftime("%M")
        vars, state = myMqtt.processQueue()
        #safe to logfile or control something...
        myMqtt.writeLog(today, hour, minute)
        print(str(hour)+":"+str(minute), vars["T"], vars["phi"], vars["p"], state)
        return

    secCount = 0
    def  mainLoop():
        global secCount
        secCount= secCount+1
        #call itself to loop forever, each 1 second
        threading.Timer(1, mainLoop).start()
        secondContainer()
        if secCount >= 60: # each 60 sec
            secCount = 0
            minuteContainer(myMqtt)

    mainLoop() 

Index of Library

1EnergyManager.py
2HvcControl.py
3HvcHCSR04ultrasonic.py
4HvcLightControl.py
5HvcMain.py
6HvcMotorDriver.py
7HvcMqtt.py
8HvcOneWire.py
9HvcOperationMode.py
10HvcRaw2phys.py
11HvcReadSPI.py
12HvcSendI2C.py
13HvcSetGPIO.py
14HvcTables.py
15HvcWeather.py
16HvcWifiRelay.py
17makeDoc.py

Der gesamte Sourcecode darf gemäß GNU General Public License weiterverbreitet werden.