Created
January 13, 2024 18:43
-
-
Save seichter/a2e37c55672b7c16631fd8319592a0ca to your computer and use it in GitHub Desktop.
DWD weather data preprocessor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
dwdhist.py: Preprocessor tool for DWD (German weather service) historical open climate data. | |
""" | |
__author__ = "Hartmut Seichter" | |
__copyright__ = "Copyright 2024, Hartmut Seichter" | |
__license__ = "GPL" | |
__version__ = "0.0.1" | |
__maintainer__ = __author__ | |
__email__ = "hartmut@technotecture.com" | |
__status__ = "Production" | |
import pandas as pd | |
from zipfile import ZipFile | |
from io import BytesIO | |
import glob | |
import requests | |
from bs4 import BeautifulSoup | |
import argparse | |
import os | |
data_folder = 'data' | |
data_url = 'https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/water_equiv/historical/' | |
def download_file(url,prefix='.'): | |
# create cache dir | |
if not os.path.exists(prefix): | |
os.mkdir(prefix) | |
local_filename = os.path.join(prefix,url.split('/')[-1]) | |
# NOTE the stream=True parameter below | |
with requests.get(url, stream=True) as r: | |
# download with stream | |
r.raise_for_status() | |
with open(local_filename, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return local_filename | |
def cache_files(url): | |
# creates a cache for the data to process | |
# gets data | |
r = requests.get(url) | |
data = r.text | |
# load data into scraper | |
soup = BeautifulSoup(data,features='lxml') | |
# list of files there (ignore the backlink) | |
file_links = [link.get("href") for link in soup.find_all('a') if not link.get('href').startswith('..')] | |
# check our cache of data | |
cached_files = glob.glob(os.path.join(data_folder,'cache','*')) | |
# only the names | |
cached_filenames = [ os.path.basename(fname) for fname in cached_files] | |
# get missing files (don't touch the cache ;)) | |
missing_files = list(set(file_links) - set(cached_filenames)) | |
# check cache | |
if len(missing_files) == 0: | |
print('cache is valid') | |
else: | |
# because there are usually thousands of files ... | |
for idx,f in enumerate(missing_files): | |
print('caching {}/{}'.format(idx+1,len(missing_files)), f) | |
download_file(''.join([url,f]),os.path.join(data_folder,'cache')) | |
def read_meta_txt(meta) -> pd.DataFrame: | |
# reads meta information for historic weather data | |
# the header of the included .txt file is messed up ... so we need to fix this first | |
# read only the first row with white space delimiting | |
cols = pd.read_csv(meta,delim_whitespace=True,nrows=0,index_col=False,encoding = "ISO-8859-1") | |
# print(cols.columns.to_list()) | |
# meta file is fixed width, unfortunately first and second row are not, hence we need to skip these | |
df = pd.read_fwf(meta,skiprows=[0,1],header=None,delim_whitespace=True,encoding = "ISO-8859-1") | |
# recover descriptions of columns from before | |
df = df.set_axis(cols.columns.to_list(),axis=1) | |
# clean up ... (set proper datetime for rows) | |
df['von_datum'] = pd.to_datetime(df['von_datum'],format='%Y%m%d') | |
df['bis_datum'] = pd.to_datetime(df['bis_datum'],format='%Y%m%d') | |
# print(df.dtypes) | |
# print(df) | |
return df | |
def read_hist_zip(filename) -> pd.DataFrame: | |
# opens a zip file from the hist list | |
with ZipFile(filename,'r') as z: | |
for n in z.namelist(): | |
if n.startswith('produkt'): | |
# print(n) | |
d = z.read(n) | |
bio = BytesIO(d) | |
df = pd.read_csv(bio,delimiter=';',index_col=False) | |
# clean up ... | |
df = df[df.SH_TAG >= 5] # only days with min. 5cm of snow | |
df['MESS_DATUM'] = pd.to_datetime(df['MESS_DATUM'],format='%Y%m%d') # proper date | |
# print(df.dtypes) | |
# print(df) | |
return df | |
def parse_all_and_merge(): | |
# create accumulation data frame | |
accum = pd.DataFrame() | |
# get all data files from cache | |
hist_files = glob.glob(os.path.join(data_folder,'cache','tageswerte_Wa_*_hist.zip')) | |
if len(hist_files) == 0: | |
print('run caching process first!') | |
return | |
print('files with historical data ',len(hist_files)) | |
for fname in hist_files: | |
df = read_hist_zip(fname) | |
if accum.empty: | |
accum = df | |
else: | |
accum = pd.concat([accum,df],ignore_index=True) # otherwise overwrites per index | |
accum = accum.sort_values(by='MESS_DATUM') | |
print(accum) | |
accum.to_csv(os.path.join(data_folder,'combined.csv'),index=False) | |
def merge_and_store(): | |
# merges data into one overview file | |
# pivot file to describe the weather observataries name and position (and other meta data) | |
stationen = read_meta_txt(os.path.join(data_folder,'cache','Wa_Tageswerte_Beschreibung_Stationen.txt')) | |
stationen.to_csv(os.path.join(data_folder,'stationen.csv'),index=False) | |
# merge all historical data | |
parse_all_and_merge() | |
if __name__ == '__main__': | |
ap = argparse.ArgumentParser() | |
ap.description = 'A preprocessor for DWD historical weather data' | |
ap.add_argument('--cache', help='check the cached data',action='store_true') | |
ap.add_argument('--merge', help='merge cached data into single files',action='store_true') | |
args = ap.parse_args() | |
if args.cache: | |
cache_files(data_url) | |
elif args.merge: | |
merge_and_store() | |
else: | |
ap.print_help() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment