Hamburger Hamburger

Heating and Ventilation Control

/HvcMqtt

Keine Erläuterungen gefunden.

import  os.path
import  queue
from paho.mqtt import  client as  mqtt_client
import  json
import  time
import  threading

q = queue.Queue() #queue for  data exchange

mqtt_connect=False
def  on_connect(client, userdata, flags, rc):
    if rc == 0:
        if True: #debug
            print("Connection to broker successfull")
        global mqtt_connect
        mqtt_connect = True   
    else:
        print("Connection to broker failed")
    
def  on_message(client, userdata, message):
    """
    Callback function for  MQTT client,
    feeds all received messages into the queue.
    """
    global q #queue for  data exchange
    if False:
    #if True: #debug
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    q.put(message)
    

def  fillQueue():
    """
    For testing purpose only,
    fill the queue by some meaningfull values
    """
    global q
    msg = {}
    now = datetime.now()
    msg["today"]   = now.strftime("%Y-%m-%d")
    msg["hour"]    = now.strftime("%H")
    msg["minutes"] = now.strftime("%M")
    msg["seconds"] = now.strftime("%S")
    q.put(msg)


    
class HvcMqtt:
    def  __init__(self, debug=False):
        """
        Initialize and configure the MQTT client,
        put all sensors into the dictionary.
        """
        global q
        self.debug = debug
        self.logPath = "./LogDir/"
        #self.q = queue.Queue()
        self.broker ="localhost" #alternatively use IP = 192.168.178.39
        self.port = 1883
        self.topic = "zigbee2mqtt/"
        #compose list of sensors here:
        self.sensor = {}
        self.sensor["S01"] = "S01"
        self.sensor["S02"] = "S02"
        self.sensor["S03"] = "S03"
        self.sensor["S04"] = "S04"
        self.sensor["S05"] = "S05"
        self.sensor["S06"] = "S06"
        self.sensor["S07"] = "S07"
        self.sensor["S08"] = "S08"
        self.sensor["S09"] = "S09"
        self.sensor["S10"] = "S10"
        
        self.vars = ["T", "phi", "p"]
        for  v in self.vars:
            self.actual[v] = {}
            self.last[v] = {}
            for  s in self.sensor:
                self.actual[v][s]=-999
                self.last[v][s]=-999
                self.state[s]=-1
     



    def  subscribe2MQTT(self):
        """
        Create MQTT client  
        and subscribe it to sensorIDs
        """
        global mqtt_connect
        mqtt_topic = []
        mqtt_topic.append( ("zigbee2mqtt/bridge/request/networkmap",0) ) #read Zigbee map
        for  i,s in enumerate(self.sensor):
            thisTopic = self.topic + self.sensor[s]
            mqtt_topic.append( (thisTopic,0) ) #QOS=0

        if self.debug:
            print("creating new instance")
        client = mqtt_client.Client("TemperatureSensors")
        #attach function to callback
        client.on_connect= on_connect
        #attach function to callback
        client.on_message= on_message  
        if self.debug:
            print("connecting to broker:", self.broker)                    
        client.connect(self.broker, self.port)
        client.loop_start()
        #Wait for  connection
        while mqtt_connect != True: 
            if self.debug:
                print("waiting for  mqtt ....")
            time.sleep(0.1)

        if self.debug:
            print("Subscribing to sensor topics", mqtt_topic)
        client.subscribe(mqtt_topic)

    
    def  initialize(self):
        """
        create MQTT clients, subscribe to sensors, keep listening
        """
        if self.debug:
            print("initialize MQTT listener")
            
        worker = threading.Thread( target=self.subscribe2MQTT() )
        worker.setDaemon(True)
        worker.start()
        
        return
    
    def  genZigbeeMap(self):
        client = mqtt_client.Client("ZigbeeMap")                   
        client.connect(self.broker, self.port)
        #mqtt_topic = "zigbee2mqtt/bridge/networkmap/graphviz"
        mqtt_topic = "zigbee2mqtt/bridge/request/networkmap"
        message = "{'type': 'graphviz', 'routes': False}"
        # -C 1 >${file}routes.dot &

        client.publish(mqtt_topic,message)
        
        
    #receive and process sensor values
    def  processQueue(self):
        """
        Process values in queue,
        message.topic indicates the sensor received
        message.payloud is binary and needs to be decoded.
        The resulting string is a json object,
        that needs to be converted to dictionary
        """
        global q
        while not q.empty():
            message = q.get()
            if message is None:
                continue
            
            try:
                topic = message.topic
                msgStr = message.payload.decode("utf-8")
                data  = json.loads(msgStr)
                sensorID = topic[topic.find("/")+1:]
                if self.debug:
                    print("data received from queue:")
                    print(sensorID)
                    print("data",data)
                    
                for  s in self.sensor:
                    if self.sensor[s] == str(sensorID):
                        thisSensor = s

                #update values for  thisSensor
                self.actual["T"][thisSensor]   = data["temperature"]
                self.actual["p"][thisSensor]   = data["pressure"]
                self.actual["phi"][thisSensor] = data["humidity"]
                self.state[thisSensor]         = data["battery"]
            except:
                if self.debug:
                    print("message from queue:",message)
                continue
        
        return self.actual, self.state

    
    def  writeLog(self, today, hour, minute, epsilon=1E-3):
        """
        On change, append values to logfiles.
        Since different variables do not change synchronously, split logfiles to type of variables,
        in case of pressure, the mean of all sensors might be sufficient?
        """    
        #check for  updated values
        anyChange = False
        change = {}
        for  v in self.vars:
            change[v] = False
            for  s in self.sensor:
                evalChange = (self.actual[v][s]-self.last[v][s]) / self.last[v][s]
                if abs(evalChange) > epsilon:
                    change[v] = True
                    self.last[v][s] = self.actual[v][s]
            if self.debug:
                print(v,"changed?\t",change[v])

        #in case of change, write to log
        file = self.logPath + today
        for  v in self.vars:
            if change[v]:
                anyChange = True
                thisFile = file + "_" + v + ".dat"
                if os.path.exists(thisFile):
                    #append to file
                    fp = open(thisFile, 'a', encoding='utf-8')
                else:
                    #create file with header
                    fp = open(thisFile, 'w', encoding='utf-8')
                    myStr = " #"
                    for  s in self.sensor:
                        myStr = myStr +  "\t" + s
                    fp.write(myStr+"\n")
                    
                myStr = hour +":"+ minute
                for  s in self.sensor:
                    myStr = myStr +  "\t" + '{0:5.2f}'.format(self.actual[v][s])

                fp.write(myStr+"\n")
                fp.close()
                
        return anyChange
               
 
#persistant data structures
HvcMqtt.actual = {}
HvcMqtt.last   = {}
HvcMqtt.state  = {}



if __name__ == "__main__":
    """
    Testing the threads, queue and MQTT.
    Put some equivalent to you main programm.
    """
    from datetime import   datetime

    
    myMqtt = HvcMqtt(True)
    myMqtt.initialize()
    #subscribe to Zigbee map
    #threading.Thread(
    #myMqtt.readZigbeeMap() 
    #generate Zigbee map
    print("generate Zigbee map")
    myMqtt.genZigbeeMap()
    #exit()
    
    def  secondContainer():
        
        #fillQueue() #for  testing purpose only:
        return

    def  minuteContainer(myMqtt):
        now    = datetime.now()
        today  = now.strftime("%Y-%m-%d")
        hour   = now.strftime("%H")
        minute = now.strftime("%M")
        vars, state = myMqtt.processQueue()
        #safe to logfile or control something...
        myMqtt.writeLog(today, hour, minute)
        print(str(hour)+":"+str(minute), vars["T"], vars["phi"], vars["p"], state)
        return

    secCount = 0
    def  mainLoop():
        global secCount
        secCount= secCount+1
        #call itself to loop forever, each 1 second
        threading.Timer(1, mainLoop).start()
        secondContainer()
        if secCount >= 60: # each 60 sec
            secCount = 0
            minuteContainer(myMqtt)

    mainLoop() 

python

1/HvcLightControl.py
2/HvcHCSR04ultrasonic.py
3/HvcPV.py
4/HvcMotorDriver.py
5/HvcRollerShutter.py
6/manGenMqttMap.py
7/HvcReadSPI.py
8/HvcMqtt.py
9/HvcTables.py
10/HvcMain.py
11/HvcSetGPIO.py
12/HvcWifiRelay.py
13/HvcOperationMode.py
14/HvcControl.py
15/HvcRaw2phys.py
16/HvcWeather.py
17/HvcOneWire.py
18/makeDoc.py
19/HvcFronius.py
20/EnergyManager.py
21/HvcSendI2C.py

php

1/HV_colorMap.php
2/HV_Admin_Login.php
3/HV_readOperationState.php
4/HV_setParameters.php
5/HV_config.php
6/EM_handleJSON.php
7/index.php
8/readFilenames.php
9/HV_restart.php
10/HV_moveGate.php
11/HV_showLog.php
12/HV_RollerShutter.php
13/EM_editParameter.php
14/HV_serviceLog.php
15/HV_H2Olevel.php
16/HV_TempCal.php
17/HV_Fronius.php
18/EM_plot.php
19/readNamedData.php
20/HV_composeH2Oplot.php
21/HVdoc.php
22/HV_showWeatherForecast.php
23/HV_showHouse.php

Der gesamte Sourcecode darf gemäß GNU General Public License weiterverbreitet werden.