Hamburger Hamburger

Heating and Ventilation Control

/HvcFronius

Keine Erläuterungen gefunden.

"""
Read data from Fronius inverter.
Infinite loop controlls execution of cyclic tasks.
Dr. Arne Jachens
2024-08-25

To automatically start the HVC at boot time, do:
sudo cp HVCfronius.service /etc/systemd/system/HVCfronius.service
sudo chmod +x /etc/systemd/system/HVCfronius.service
sudo systemctl enable HVCfronius.service
sudo systemctl start HVCfronius.service
sudo systemctl status HVCfronius.service

Then you may do: sudo service HVCfronius start/stop


alternatively, edit 
sudo /etc/rc.local
and add the line:
 cd /var/www/html/HVC && sudo -u arne python3.8 HvcFronius.py& 

"""
import  time
import  datetime
import  os
import  requests
from pathlib import  Path
import  logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class HvcFronius:
    def  __init__(self, host="192.168.178.55", path="./tmp/"):
        self.host = host
        self.path = path
        self.mean = {}
        self.counter = 0
        self.powerData = {}
        self.today = datetime.date.today()
        self.minuteCounter = 0
        self.readStatusCounter = 0
        self.powerFlow = None
        self.inverterData = None

    def  readFroniusPower(self, endpoint="solar_api/v1/GetPowerFlowRealtimeData.fcgi", retries=3):
        """
        Fetch power flow data from a Fronius inverter using its REST API.

        :param endpoint: API endpoint to fetch power flow data
        :param retries: Number of retry attempts in case of failure
        """
        url = f"http://{self.host}/{endpoint}"

        for  attempt in range(retries):
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()  # Raises an HTTPError if the status is 4xx, 5xx

                self.powerFlow = response.json()
                logging.info("Successfully fetched power flow data.")
                return

            except requests.exceptions.RequestException as  e:
                logging.warning(f"Attempt {attempt + 1} failed: {e}")
                time.sleep(5)  # Wait before retrying

        logging.error("Failed to fetch power flow data after several attempts.")
        self.powerFlow = None

    def  parsePower(self):
        """
        Parse and extract relevant power flow data.
        """
        if not self.powerFlow:
            logging.warning("No power flow data to parse.")
            return

        try:
            siteData = self.powerFlow.get('Body', {}).get('Data', {}).get('Site', {})
            if not siteData:
                raise ValueError("Invalid power flow data format.")
            
            self.powerData = {
                "consumption": siteData.get('P_Load', 'N/A'),
                "gridFeedIn": siteData.get('P_Grid', 'N/A'),
                "pvGeneration": siteData.get('P_PV', 'N/A'),
                "batteryPower": siteData.get('P_Akku', 'N/A'),
                "autonomy": siteData.get('rel_Autonomy', 'N/A'),
                "selfConsumption": siteData.get('rel_SelfConsumption', 'N/A')
            }
            logging.info("Power data parsed successfully.")

        except Exception as  e:
            logging.error(f"Error parsing power data: {e}")
            self.powerData = {}

    def  readFroniusStatus(self, endpoint="solar_api/v1/GetInverterRealtimeData.cgi", deviceId=1, retries=3):
        """
        Fetch status data from a Fronius inverter using its REST API.

        :param endpoint: API endpoint to fetch inverter data (default is for  realtime data)
        :param deviceId: ID of the inverter device (default is 1)
        :param retries: Number of retry attempts in case of failure
        """
        url = f"http://{self.host}/{endpoint}?Scope=Device&DeviceId={deviceId}&DataCollection=CommonInverterData"

        for  attempt in range(retries):
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()

                self.inverterData = response.json()
                logging.info("Successfully fetched inverter status data.")
                return

            except requests.exceptions.RequestException as  e:
                logging.warning(f"Attempt {attempt + 1} failed: {e}")
                time.sleep(5)

        logging.error("Failed to fetch inverter status data after several attempts.")
        self.inverterData = None

    def  accumulateData(self, mode):
        """
        Accumulate data for  calculating means.

        :param mode: Operation mode (reset, mean, accumulate)
        """
        if mode == "reset":
            self.counter = 1
            self.mean = {key: self.powerData[key] for  key in self.powerData.keys()}
        elif mode == "mean":
            for  key in self.mean.keys():
                if isinstance( self.mean[key], (int,float) ):
                    self.mean[key] = self.mean[key] / max(1, self.counter)
        elif mode == "accumulate":
            self.counter += 1
            for  key in self.mean.keys():
                if isinstance(  self.powerData[key], (int,float) ):
                    self.mean[key] += self.powerData[key]

    def  parseFroniusStatus(self):
        """
        Parse inverterData to HTML.
        :return: HTML formatted block
        """
        html = None
        if self.inverterData:
            data = self.inverterData
            if 'Body' in data and 'Data' in data['Body']:
                html = self._dictToHtml(data['Body']['Data'])
                
        return html

    def  _dictToHtml(self, data):
        """
        Converts a nested dictionary to an HTML table format.

        :param data: Dictionary containing the inverter data
        :return: A string representing the HTML table
        """
        html = "<table border='1' cellpadding='5' cellspacing='0'>\n"
        html += "    <thead>\n"
        html += "        <tr>\n"
        html += "            <th>Parameter</th>\n"
        html += "            <th>Value</th>\n"
        html += "            <th>Unit</th>\n"
        html += "        </tr>\n"
        html += "    </thead>\n"
        html += "    <tbody>\n"
        
        for  key, value in data.items():
            if isinstance(value, dict):
                if 'Value' in value and 'Unit' in value:
                    if key == "TOTAL_ENERGY":
                        html += f"        <tr>\n"
                        html += f"            <td style='font-weight:bold;'>{key}</td>\n"
                        html += f"            <td style='font-weight:bold;'>{value['Value']}</td>\n"
                        html += f"            <td style='font-weight:bold;'>{value['Unit']}</td>\n"
                        html += f"        </tr>\n"
                    else:
                        html += f"        <tr>\n"
                        html += f"            <td>{key}</td>\n"
                        html += f"            <td>{value['Value']}</td>\n"
                        html += f"            <td>{value['Unit']}</td>\n"
                        html += f"        </tr>\n"
                else:
                    html += f"        <tr>\n"
                    html += f"            <td>{key}</td>\n"
                    html += "            <td>\n"
                    html += "                <table border='0' cellpadding='3' cellspacing='0'>\n"
                    for  sub_key, sub_value in value.items():
                        html += f"                    <tr><td>{sub_key}</td><td>{sub_value}</td></tr>\n"
                    html += "                </table>\n"
                    html += "            </td>\n"
                    html += "            <td>N/A</td>\n"
                    html += "        </tr>\n"
            else:
                html += f"        <tr>\n"
                html += f"            <td>{key}</td>\n"
                html += f"            <td>{value}</td>\n"
                html += "            <td>N/A</td>\n"
                html += "        </tr>\n"
                
        html += "    </tbody>\n"
        html += "</table>"

        return html
    
    def  writeLog(self, today):
        """
        Add latest mean values to logfile, stored in the given path.
        If today > last date, initialize a new logfile.
        """
        month = self.today.strftime('%Y-%m')
        logDir = Path(self.path) / f"{month}"
        logDir.mkdir(parents=True, exist_ok=True)
        logFile = logDir / f"power_{today}.dat"
        fileExists = logFile.is_file()
        
        with open(logFile, "a") as  f:
            if not fileExists:
                header = " #time\t" + "\t".join(self.mean.keys()) + "\n"
                f.write(header)
            
            timestamp = datetime.datetime.now().strftime("%H:%M")
            #line = timestamp + "," + ",".join([str(self.mean.get(key, 'N/A')) for  key in self.mean.keys()]) + "\n"
            #line = timestamp + "," + ",".join([f"{self.mean[key]:.1f}" if isinstance(self.mean.get(key, 'N/A'), (int, float)) else str(self.mean.get(key, 'N/A')) for  key in self.mean.keys()]) + "\n"
            line = timestamp + "\t" + "\t".join([f"{self.mean.get(key, 'N/A'):.1f}" if isinstance(self.mean.get(key, 'N/A'), (int, float)) else str(self.mean.get(key, 'N/A')) for  key in ['consumption', 'gridFeedIn', 'pvGeneration']]) + "\n"

            f.write(line)

    def  main(self):
        """
        Main loop:
        - Each minute, read actual power values and accumulate mean.
        - Every 5 minutes, calculate the mean and add it to the logfile.
        - At each start of a new day, create a new logfile.
        """
        while True:
            self.readFroniusPower()
            self.parsePower()
            
            currentDay = datetime.date.today()
            if currentDay > self.today:
                self.today = currentDay
                self.accumulateData("reset")
                self.writeLog(self.today)
                self.minuteCounter = 0
                self.readStatusCounter = 0
            
            if self.minuteCounter == 0:
                self.accumulateData("reset")
            else:
                self.accumulateData("accumulate")
                
            self.minuteCounter += 1
            
            if self.minuteCounter >= 5:
                self.readStatusCounter += 1
                self.accumulateData("mean")
                self.writeLog(self.today)
                self.accumulateData("reset")
                self.minuteCounter = 0

            if self.readStatusCounter >= 15:
                self.readFroniusStatus()
                html = self.parseFroniusStatus()
                if html:
                    with open("./FroniusStatus.html", "w") as  statusFile:
                        statusFile.write(html)
                self.readStatusCounter = 0
                
            time.sleep(60)  # Sleep for  1 minute

if __name__ == "__main__":
    host = "192.168.178.55"
    path = "./LogDir/"
    today = datetime.date.today()

    FI = HvcFronius(host, path)
    logging.info(f"Check output in: {os.path.join(path, f'YYY-MM/power_{today}.dat')}")
    FI.main()

python

1/HvcLightControl.py
2/HvcHCSR04ultrasonic.py
3/HvcPV.py
4/HvcMotorDriver.py
5/HvcRollerShutter.py
6/manGenMqttMap.py
7/HvcReadSPI.py
8/HvcMqtt.py
9/HvcTables.py
10/HvcMain.py
11/HvcSetGPIO.py
12/HvcWifiRelay.py
13/HvcOperationMode.py
14/HvcControl.py
15/HvcRaw2phys.py
16/HvcWeather.py
17/HvcOneWire.py
18/makeDoc.py
19/HvcFronius.py
20/EnergyManager.py
21/HvcSendI2C.py

php

1/HV_colorMap.php
2/HV_Admin_Login.php
3/HV_readOperationState.php
4/HV_setParameters.php
5/HV_config.php
6/EM_handleJSON.php
7/index.php
8/readFilenames.php
9/HV_restart.php
10/HV_moveGate.php
11/HV_showLog.php
12/HV_RollerShutter.php
13/EM_editParameter.php
14/HV_serviceLog.php
15/HV_H2Olevel.php
16/HV_TempCal.php
17/HV_Fronius.php
18/EM_plot.php
19/readNamedData.php
20/HV_composeH2Oplot.php
21/HVdoc.php
22/HV_showWeatherForecast.php
23/HV_showHouse.php

Der gesamte Sourcecode darf gemäß GNU General Public License weiterverbreitet werden.