Skip to content

Instantly share code, notes, and snippets.

@seichter
Created January 13, 2024 18:43
Show Gist options
  • Save seichter/a2e37c55672b7c16631fd8319592a0ca to your computer and use it in GitHub Desktop.
Save seichter/a2e37c55672b7c16631fd8319592a0ca to your computer and use it in GitHub Desktop.
DWD weather data preprocessor
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
dwdhist.py: Preprocessor tool for DWD (German weather service) historical open climate data.
"""
__author__ = "Hartmut Seichter"
__copyright__ = "Copyright 2024, Hartmut Seichter"
__license__ = "GPL"
__version__ = "0.0.1"
__maintainer__ = __author__
__email__ = "hartmut@technotecture.com"
__status__ = "Production"
import pandas as pd
from zipfile import ZipFile
from io import BytesIO
import glob
import requests
from bs4 import BeautifulSoup
import argparse
import os
data_folder = 'data'
data_url = 'https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/water_equiv/historical/'
def download_file(url,prefix='.'):
# create cache dir
if not os.path.exists(prefix):
os.mkdir(prefix)
local_filename = os.path.join(prefix,url.split('/')[-1])
# NOTE the stream=True parameter below
with requests.get(url, stream=True) as r:
# download with stream
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_filename
def cache_files(url):
# creates a cache for the data to process
# gets data
r = requests.get(url)
data = r.text
# load data into scraper
soup = BeautifulSoup(data,features='lxml')
# list of files there (ignore the backlink)
file_links = [link.get("href") for link in soup.find_all('a') if not link.get('href').startswith('..')]
# check our cache of data
cached_files = glob.glob(os.path.join(data_folder,'cache','*'))
# only the names
cached_filenames = [ os.path.basename(fname) for fname in cached_files]
# get missing files (don't touch the cache ;))
missing_files = list(set(file_links) - set(cached_filenames))
# check cache
if len(missing_files) == 0:
print('cache is valid')
else:
# because there are usually thousands of files ...
for idx,f in enumerate(missing_files):
print('caching {}/{}'.format(idx+1,len(missing_files)), f)
download_file(''.join([url,f]),os.path.join(data_folder,'cache'))
def read_meta_txt(meta) -> pd.DataFrame:
# reads meta information for historic weather data
# the header of the included .txt file is messed up ... so we need to fix this first
# read only the first row with white space delimiting
cols = pd.read_csv(meta,delim_whitespace=True,nrows=0,index_col=False,encoding = "ISO-8859-1")
# print(cols.columns.to_list())
# meta file is fixed width, unfortunately first and second row are not, hence we need to skip these
df = pd.read_fwf(meta,skiprows=[0,1],header=None,delim_whitespace=True,encoding = "ISO-8859-1")
# recover descriptions of columns from before
df = df.set_axis(cols.columns.to_list(),axis=1)
# clean up ... (set proper datetime for rows)
df['von_datum'] = pd.to_datetime(df['von_datum'],format='%Y%m%d')
df['bis_datum'] = pd.to_datetime(df['bis_datum'],format='%Y%m%d')
# print(df.dtypes)
# print(df)
return df
def read_hist_zip(filename) -> pd.DataFrame:
# opens a zip file from the hist list
with ZipFile(filename,'r') as z:
for n in z.namelist():
if n.startswith('produkt'):
# print(n)
d = z.read(n)
bio = BytesIO(d)
df = pd.read_csv(bio,delimiter=';',index_col=False)
# clean up ...
df = df[df.SH_TAG >= 5] # only days with min. 5cm of snow
df['MESS_DATUM'] = pd.to_datetime(df['MESS_DATUM'],format='%Y%m%d') # proper date
# print(df.dtypes)
# print(df)
return df
def parse_all_and_merge():
# create accumulation data frame
accum = pd.DataFrame()
# get all data files from cache
hist_files = glob.glob(os.path.join(data_folder,'cache','tageswerte_Wa_*_hist.zip'))
if len(hist_files) == 0:
print('run caching process first!')
return
print('files with historical data ',len(hist_files))
for fname in hist_files:
df = read_hist_zip(fname)
if accum.empty:
accum = df
else:
accum = pd.concat([accum,df],ignore_index=True) # otherwise overwrites per index
accum = accum.sort_values(by='MESS_DATUM')
print(accum)
accum.to_csv(os.path.join(data_folder,'combined.csv'),index=False)
def merge_and_store():
# merges data into one overview file
# pivot file to describe the weather observataries name and position (and other meta data)
stationen = read_meta_txt(os.path.join(data_folder,'cache','Wa_Tageswerte_Beschreibung_Stationen.txt'))
stationen.to_csv(os.path.join(data_folder,'stationen.csv'),index=False)
# merge all historical data
parse_all_and_merge()
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.description = 'A preprocessor for DWD historical weather data'
ap.add_argument('--cache', help='check the cached data',action='store_true')
ap.add_argument('--merge', help='merge cached data into single files',action='store_true')
args = ap.parse_args()
if args.cache:
cache_files(data_url)
elif args.merge:
merge_and_store()
else:
ap.print_help()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment