Hamburger Hamburger

Heating and Ventilation Control

/HvcWifiRelay

Keine Erläuterungen gefunden.

import  urllib.request
import  os.path

class HvcWifiRelay:
    def  __init__(self, debug=False):
        self.debug = debug
        self.IPs = {}
        #self.IPs["r01"] = "http://192.168.xxx.xxx"
        self.IPs["OG1"] = "http://192.168.178.124" #Flur
        self.IPs["OG2"] = "http://192.168.178.125" #Bad
        self.IPs["OG3"] = "http://192.168.178.129" #Kinderzimmer
        self.IPs["OG4"] = "http://192.168.178.121" #HWR
        self.IPs["OG5"] = "http://192.168.178.123" #Schlafzimmer
        
        self.IPs["EG1"] = "http://192.168.178.126" #Wohnzimmer
        self.IPs["EG2"] = "http://192.168.178.128" #Diele
        self.IPs["EG3"] = "http://192.168.178.127" #WC / Windfang
        self.IPs["EG4"] = "http://192.168.178.120" #Kueche
        
        self.IPs["LC1"] = "http://192.168.178.122" #Aussenbeleuchtung
        self.logPath = "./LogDir/"
        self.WiFiResults = [False]
        keys = self.IPs.keys()
        for  k in keys:
            HvcWifiRelay.actual[k] = 0
            HvcWifiRelay.last[k] = 0
            
    def  switch(self, state):
        """
        Switch the state of relays according to input.
        'state' is a dictionary with the same keys as  'IPs'.
        Returns a dictionary indicating success (True) or failure (False) for  each relay.
        """
        keys = self.IPs.keys()
        self.WiFiResults = {k: False for  k in keys}  # Initialize all as  failed

        for  k in keys:
            if state.get(k) == "on" or state.get(k) == 1 or state.get(k) == True:
                HvcWifiRelay.actual[k] = 1
                url = self.IPs[k] + "/relay/0?turn=on"
            else:
                HvcWifiRelay.actual[k] = 0
                url = self.IPs[k] + "/relay/0?turn=off"
        
            if self.debug:
                print(f"Attempting to send request to {url}")

            try:
                with urllib.request.urlopen(url, timeout=5) as  response:
                    html = response.read().decode('utf-8')
                    if "on" in html or "off" in html:
                        self.WiFiResults[k] = True  # Mark this relay as  successfully switched
                        if self.debug:
                            print(f"Response received: {html}")
                    else:
                        if self.debug:
                            print(f"Unexpected response content for  relay {k}: {html}")

            except Exception as  e:
                if self.debug:
                    print(f"Failed to switch relay {k}: {e}")

			
        #url["on"]  = "http://192.168.xxx.xxx/relay/0?turn=on"
        #url["off"] = "http://192.168.xxx.xxx/relay/0?turn=off"





    def  writeLog(self, today, hour, minute, relayDict={}, doWrite=False):
        #check for  updated values
        anyChange = False
        keys = self.IPs.keys()

        for  k in keys:
            if HvcWifiRelay.last[k]!=HvcWifiRelay.actual[k]:
                anyChange = True
                HvcWifiRelay.last[k] = HvcWifiRelay.actual[k]

        if anyChange or doWrite:
            file = self.logPath + today + "_relay.dat"
            if os.path.exists(file):
                #append to file
                fp = open(file, 'a', encoding='utf-8')
            else:
                #create file with header
                fp = open(file, 'w', encoding='utf-8')
                myStr = " #"
                for  k in keys:
                    myStr = myStr +  "\t" + k
                fp.write(myStr+"\n")
                try:
                    relayDictInv = {}
                    for  k in keys:
                        relayDictInv[relayDict[k]] = k 
                        myStr = " #"
                        for  k in keys:
                            myStr = myStr +  "\t" + relayDictInv[k]
                            fp.write(myStr+"\n")
                except:
                    print("Warning: relayDict not set")
                    
            myStr = hour +":"+ minute
            for  k in keys:
                myStr = myStr +  "\t" + '{0:1.0f}'.format(HvcWifiRelay.actual[k])

            fp.write(myStr+"\n")
            fp.close()
                
        return anyChange
               
            
#persistant variables
HvcWifiRelay.actual={}
HvcWifiRelay.last={}


if __name__ == "__main__":
    import   time
    from datetime import   datetime
    next_call = time.time()
    myRelay = HvcWifiRelay(True)
    
    #toggle relay 5 times
    for  i in range(0,10):
        if i % 2 ==1:
            state = {"OG4": "off"}
        else:
            state = {"OG4": "on"}
            
        myRelay.switch(state)

        now    = datetime.now()
        today  = now.strftime("%Y-%m-%d")
        hour   = now.strftime("%H")
        minute = now.strftime("%M")
    
        myRelay.writeLog(today, hour, minute)
        #wait 2 seconds to proceed
        next_call = next_call+30;
        time.sleep(next_call - time.time())

python

1/HvcLightControl.py
2/HvcHCSR04ultrasonic.py
3/HvcPV.py
4/HvcMotorDriver.py
5/HvcRollerShutter.py
6/manGenMqttMap.py
7/HvcReadSPI.py
8/HvcMqtt.py
9/HvcTables.py
10/HvcMain.py
11/HvcSetGPIO.py
12/HvcWifiRelay.py
13/HvcOperationMode.py
14/HvcControl.py
15/HvcRaw2phys.py
16/HvcWeather.py
17/HvcOneWire.py
18/makeDoc.py
19/HvcFronius.py
20/EnergyManager.py
21/HvcSendI2C.py

php

1/HV_colorMap.php
2/HV_Admin_Login.php
3/HV_readOperationState.php
4/HV_setParameters.php
5/HV_config.php
6/EM_handleJSON.php
7/index.php
8/readFilenames.php
9/HV_restart.php
10/HV_moveGate.php
11/HV_showLog.php
12/HV_RollerShutter.php
13/EM_editParameter.php
14/HV_serviceLog.php
15/HV_H2Olevel.php
16/HV_TempCal.php
17/HV_Fronius.php
18/EM_plot.php
19/readNamedData.php
20/HV_composeH2Oplot.php
21/HVdoc.php
22/HV_showWeatherForecast.php
23/HV_showHouse.php

Der gesamte Sourcecode darf gemäß GNU General Public License weiterverbreitet werden.