Dr. Arne Jachens

Get Weather Forecast

For the heating control, you might be interested in the outside weather. Instead of own sensors, I utilize a public weather forecast service.
Especially, if you intend to control the heating in dependency of the todays sun shine, you are interested in the sun shine prediction.
The German weather service DWD offers weather prediction data for many towns for free.

My /etc/crontab to execute the script to update the weather forecast 4 times per day looks like:

SHELL=/bin/sh
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin

# m h dom mon dow user  command
16 4,10,16,22 * * *     www-data    ( cd /var/www/html/HVC &&  ./HvcWeather.py > /tmp/cron.log  2>&1 )

Python interface

#!/usr/local/bin/python3.8


"""
/usr/bin/wget –no-check-certificate https://opendata.dwd.de/weather/local_foreca
sts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz -O /var/www/h
tml/HVC/altenstadt.kmz
cd /var/www/html/HVC/ && /usr/bin/unzip altenstadt.kmz

for  f in /var/www/html/HVC/MOSMIX*; do
  mv "$f" /var/www/html/HVC/altenstadt.kml
done
"""
import  os
import  xml.etree.ElementTree as  ET
from math import  exp
import  copy

class HvcWeather:
    """
    Instead of ambient sensors, utilize some public weather forecast to have an estimate of the sun shine today.
    In Germany, you get the data for  you local region from the Deutscher Wetterdienst:
    https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz

    You may run this regularly by adding the line:
    sudo nano /etc/crontab
    16 4,10,16,22 * * *     www-data    ( cd /var/www/html/HVC &&  ./HvcWeather.py > /tmp/cron.log  2>&1 )


    Dr. Arne Jachens
    2020-06-22
    """
    
    def  __init__(self,source):
        """
        Select the variables, you are interested in.
        The KML file contains much more information then we need,
        let's find the relevant data...
        Data content specification:
        https://opendata.dwd.de/weather/lib/MetElementDefinition.xml
        TTT   Temperature 2m above surface, K
        Td    Dewpoint 2m above surface, K
        FX3   Maximum wind gust within the last hour, m/s
        RR1c  Rain-Equivalent during the last 1 hours kg/m2
        RRS3c Snow-Rain-Equivalent during the last 3 hours kg/m2, nur im Winter
        Neff  Effective cloud cover, % (0..100)
        PPPP  Surface pressure, Pa
        Rad1h Global Irradiance, kJ/m2
        FXh55 Probability of wind gusts >= 55kn within the last 12 hours, % (0..100)
        SunD  Yesterdays total sunshine duration, s
        SunD3 Sunshine duration during the last three hours, s
        To test the stuff, set debug = True.
        """
        self.debug = False
        self.source=source
        self.vars = ['PPPP','TTT','Td','FX3','RR1c','Neff','Rad1h','FXh55','SunD3']

        
    def  pull(self):
        """
        Utilize wget to retrieve the actual weather forecast.
        """
        #utilize wget
        cmd = "/usr/bin/wget –no-check-certificate " + self.source
        myLocation = self.source[ self.source.rfind("_")+1: ]
        cmd = cmd + " -O " + myLocation
        os.system(cmd)

        #extract archive
        cmd = "/usr/bin/unzip " + myLocation
        os.system(cmd)

        #rename the file
        kmlfile = myLocation[:myLocation.find(".")] + ".kml"
        cmd = "mv MOSMIX*.kml " + kmlfile
        print(cmd)
        os.system(cmd)

        return kmlfile

    
    """
    Download the document specifications and put them to http://localhost/HVC/ :
    /usr/bin/wget http://www.opengis.net/kml/2.2 -O ogckml22.xsd
    /usr/bin/wget https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd
    """
    def  parse(self,kmlFile):
        """
        The KML is a specific XML format, use etree to extract the content.
        The variables of interest are extrected, some units changed,
        and finally derived humidity data is computed.
        """
        namespaces = {'kml': 'http://www.opengis.net/kml/2.2', 'dwd': 'https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd'}
        #namespaces = {'kml': './ogckml22.xsd', 'dwd': './pointforecast_dwd_extension_V1_0.xsd'}
        if os.path.exists(kmlFile):
            xmlTree = ET.parse(kmlFile)
            xmlRoot = xmlTree.getroot()
        else:
            print (fileName+' does not exist')
            quit()         

        #time steps for  the prediction
        tag = 'kml:Document/kml:ExtendedData/dwd:ProductDefinition/dwd:ForecastTimeSteps/dwd:TimeStep'
        timeStepsXml = xmlRoot.findall(tag,namespaces)
        timeSteps = []
        for  ts in timeStepsXml:
            thisText = ts.text
            timeSteps.append(thisText[:thisText.find(".")])
            
        #select all Forcast elements
        tag = 'kml:Document/kml:Placemark/kml:ExtendedData'
        ExtendedData = xmlRoot.findall(tag,namespaces)

        #put data sets of interest in 'data' structure
        data={}
        for  v in self.vars:
            for  ed in ExtendedData:
                thisXML = ed.find("./dwd:Forecast[@dwd:elementName='"+v+"']",namespaces)
                data[v] = thisXML.find("./dwd:value",namespaces).text.split()

        #split weather forecast in days an do some basic conversions
        lastDate=""
        weather = {}
#        for  i,t in enumerate(timeSteps):
        for  i in range(len(timeSteps)-1):
            t = timeSteps[i]
            date = t[:t.find("T")]
            time = t[t.find("T")+1:]
            time = time[:time.rfind(":")]  # HH:MM
            if date!=lastDate:
                lastDate=date
                weather[date] = {}

            weather[date][time] = {}
            for  v in self.vars:
                if v=="TTT" and data[v][i]!="-":
                    TAmb = float(data[v][i])-273.15
                    weather[date][time]['TAmb'] = float(TAmb)                   # C
                elif v=="Td" and data[v][i]!="-":
                    Tdp = float(data[v][i])-273.15
                    weather[date][time][v] = float(Tdp)                         # C
                elif v=="PPPP" and data[v][i]!="-":
                    p = float(data[v][i])/1000.0
                    weather[date][time]['p'] = p                                # kPa
                elif v=="Neff" and data[v][i]!="-":
                    weather[date][time][v] = float(data[v][i])/100.0            # 0...1
                elif v=="Rad1h" and data[v][i]!="-":
                    weather[date][time]['I'] = float(data[v][i])/3.6            # W/m2
                elif v=="RR1c" and data[v][i]!="-":
                    weather[date][time]['rain'] = float(data[v][i])             # mm/m2
                elif v=="FX3" and data[v][i]!="-":
                    weather[date][time]['windMax'] = float(data[v][i])          # m/s
                elif v=="Neff" and data[v][i]!="-":
                    weather[date][time]['clouds'] = float(data[v][i])           # 0...1
                elif v=="FXh55":
                    if data[v][i]=="-":
                        weather[date][time]['storm'] = 0.0
                    else:
                        weather[date][time]['storm'] = float(data[v][i])/100.0  # 0...1
                elif v=="SunD3" and data[v][i+1]!="-":
                    weather[date][time]['sun'] = float(data[v][i+1])/3.0/3600.0 # 0...1
                elif data[v][i]!="-":
                    weather[date][time][v] = float(data[v][i])
                    
            #derived data
            HA = HumidAir()
            rhoAir = HA.densityAir(TAmb+273.15,p*1000.0)
            phi    = HA.relHumidity(Tdp+273.15,TAmb+273.15)
            rhoW   = HA.absHumidity(TAmb+273.15,phi)
            x      = HA.specHumidity(rhoW,rhoAir)
            weather[date][time]['phi'] = phi*100.0                               # 0...100%
            weather[date][time]['x']   = x*1000.0                                # g/kg 
                
                    
        return weather

    
    def  read(self,today):
        """
        Read weather data of today, 
        to replace updated data.
        """
        myWeather={}
        myWeather[today]={}
        month = today[0:7]
        fileName = month+"/"+"weather_"+today+".log"
        if os.path.exists(fileName):
            with open(fileName,"r") as  fid:
                lines = fid.readlines()
                
            varNames = lines[0].split()
            varNames[0] = varNames[0][1:]
            for  i,l in enumerate(lines):
                if i>0:
                    values = l.split()
                    hour = values[0]
                    myWeather[today][hour]={}
                    for  j,v in enumerate(values):
                        if j>0:
                            myWeather[today][hour][varNames[j]] = float(v)
            
        else:
            print("file "+fileName+" not found!")

        return myWeather

      
    def  write(self,weather):  
        """
        write weather data to files,
        one per day
        """
        message = ""
        dates =  list(weather.keys())
        dates.sort()
        try:
            varNames = list(weather[dates[0]]['23:00'].keys())
            varNames.sort()
        except:
            print("tryed to access ",dates[0])

        for  i,d in enumerate(dates):
            if i<7:
                today = dates[i]
                todaysWeather = weather[today]
                msg = self.__writeTodaysWeather(today,todaysWeather,varNames)
                message = message + msg
                
        return message
    
                        
    def  __writeTodaysWeather(self,today,weather,varNames):
        message = ""
        month = today[0:7]
        #create logDirectory, if it does not exist yet
        try:
            os.mkdir(month)
            cmd = "chown arne:www-data "+month
            os.system(cmd)
            cmd = "chmod g+w "+month
            os.system(cmd)
            message = message + "directory "+month+" created \n"
        except FileExistsError:
            pass

        fileName = month+"/"+"weather_"+today+".log"
        with open(fileName,"w") as  fid:
            string = " #time"
            for  v in varNames:
                string = string + "\t"+v
                
            fid.write(string+"\n")
            myTimes = list(weather.keys())
            myTimes.sort()
            for  i,t in enumerate(myTimes):
                if t in weather:
                    string = myTimes[i]
                    for  v in varNames:
                        string = string + ('\t{0:.2f}'.format(float(weather[t][v])))
                        
                else:
                    print(t," hour not in weather???")
                        
                fid.write(string+"\n")

        return message

        

    def  merge(self,oldWeather,newWeather):
        """
        To keep the history of weather data, 
        replace updated values for  today.
        """
        today = list(oldWeather.keys())[0]
        allHours = list(oldWeather[today].keys())
        allHours.sort()
        try:
            varNames = list(oldWeather[today]['23:00'].keys())
            varNames.sort()
        except:
            print("ERROR in merge",today)
            print(oldWeather[today]['23:00'])
            exit()
            
        if self.debug:
            print("HvcWeather.merge ",today)
            print(varNames)
            print(allHours)
            
        for  h in allHours:
            #take actual values if existing
            if h in newWeather[today]:
                #keep new data
                pass
            else:
                #conserve history
                newWeather[today][h] = copy.deepcopy(oldWeather[today][h])
                
                if self.debug:
                    print(h," conserved ",newWeather[today][h])
                    
        if self.debug:
            allDays = list(newWeather.keys())
            print(allDays)
            #print("------------------")
            #for  h in allHours:
            #    print(h,newWeather[today][h])
            
        return newWeather


    def  actual(self,today,hour,minutes):
        """
        To have actual data, interpolate hourly values
        """
        weather =  self.read(today)
        allHours = list(weather[today].keys())
        varNames = list(weather[today]['23:00'].keys())
        
        varNames.sort()
        if self.debug:
            print("HvcWeather.actual ",today,hour,minutes)
            print(varNames)
            print(weather[today])
            
        lastWeather = weather[today]["00:00"]
        actWeather  = weather[today]["00:00"]
        hourFlt = float(hour)
        if hourFlt == 23.0:
            actWeather = weather[today]["23:00"]
        else:
            for  h in allHours:
                nextWeather = weather[today][h]
                hFlt = float(h[0:h.find(":")])
                if hFlt>hourFlt:
                    w = float(minutes)/60.0
                    actWeather = {}
                    for  v in varNames:
                        actWeather[v] = (1.0-w)*float(nextWeather[v]) + w*float(lastWeather[v])
                    break;
                else:
                    lastWeather = nextWeather


        return actWeather

  
class HumidAir:      
    """
    Physical properties of humid air
    """
    def  __init__(self):
        self.Ttr=273.16
        self.pTr=611.657
        self.Rair=287.06
        self.Rw=461.52
        
    def  pSat(self,T):
        k1=17.2799
        k2=4102.99
        k3=237.431
        theta = T - self.Ttr;
        p = self.pTr * exp(k1 - k2/(theta + k3));
        return p
        
    def  densityAir(self,T,p):
        rhoAir = p/self.Rair/T
        return rhoAir

    def  relHumidity(self,Tdp,T):
        phi = self.pSat(Tdp)/self.pSat(T)
        return phi
            
    def  absHumidity(self,T,phi):
        rhoW = self.pSat(T)*phi/self.Rw/T
        return rhoW

    def  specHumidity(self,rhoW,rhoAir):
        x = rhoW/rhoAir
        return x

                
if __name__ == "__main__":
    """
    # test HumidAir
    HA = HumidAir()
    TAmb = 20+273.15
    Tdp  = 10+273.15
    p    =102.23
    rhoAir = HA.densityAir(TAmb,p*1000.0)
    phi    = HA.relHumidity(Tdp,TAmb)
    rhoW   = HA.absHumidity(TAmb,phi)
    x      = HA.specHumidity(rhoW,rhoAir)
    print('rhoAir',rhoAir)
    print('phi',phi)
    print('x',x)
    exit()
    """
    
    source = "https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz"

    
    debug=0
    
    wf = HvcWeather(source)
    if debug:
        from datetime import  datetime
        kmlfile = "L648.kml"
    else:
        kmlfile = wf.pull()
    
    weather = wf.parse(kmlfile)
    allDates = list(weather.keys())
    allDates.sort()
    if debug:
        print("allDates:",allDates)

    if debug:
        print("1. write weather")
        #wf.write(weather)
        today = allDates[0]
    else:
        today = allDates[0]
        
    print("merge data of ",today)
    todaysWeather = wf.read(today)
    #weather=todaysWeather
    weather = wf.merge(todaysWeather,weather)
    #print("2. write weather")
    print(wf.write(weather))
    
    if debug:
        now = datetime.now()
        today   = now.strftime("%Y-%m-%d")
        hour    = now.strftime("%H")
        minutes = now.strftime("%M")
        aW = wf.actual(today,hour,minutes)
        print("actual weather:")
        print(aW)

Usage of weather data

The provided kml file contains purely the prediction data. To archive the weather data, I write a logfile per day and spend some effort to merge the new prediction with the conserved past data.

Instead of an actual sensor, I interpolate the weather to the current point in time.
Additionaly I plot the weather prediction:

weather.png