Skip to content

Instantly share code, notes, and snippets.

@maxiimilian
Created May 21, 2024 14:50
Show Gist options
  • Save maxiimilian/f6d0d5081a310cc102266f8089d86105 to your computer and use it in GitHub Desktop.
Save maxiimilian/f6d0d5081a310cc102266f8089d86105 to your computer and use it in GitHub Desktop.
"""
Download data from KNMI API.
Copyright (C) 2024 Maximilian Pierzyna, except when indicated otherwise
in the docstring of the functions.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import concurrent.futures
import pathlib
import sys
from typing import List, Tuple, Union
import requests
# Set API key
API_KEY = ""
# Define available datasets (name, version)
DS_TOWER_UNVAL = ("cesar_tower_meteo_la1_t10", "v1.2")
DS_SURFACE_UNVAL = ("cesar_surface_meteo_la1_t10", "v1.0")
DS_FLUX_UNVAL = ("cesar_surface_flux_la1_t10", "v1.0")
DS_CLOUD = ("cesar_nubiscope_cldcov_la1_t10", "v1.0")
DS_RAD = ("cesar_surface_radiation_la1_t10", "v1.0")
class OpenDataAPI:
"""Wrapper to access KNMI API
From KNMI tutorial: https://developer.dataplatform.knmi.nl/open-data-api#example-last
"""
def __init__(self, api_token: str):
self.base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
self.headers = {"Authorization": api_token}
def __get_data(self, url, params=None):
return requests.get(url, headers=self.headers, params=params).json()
def list_files(self, dataset_name: str, dataset_version: str, begin: str, end: str, **params):
# Translate begin and end dates into filenames for API filtering
params = {
**params,
"begin": f"{dataset_name}_{dataset_version}_{begin}.nc",
"end": f"{dataset_name}_{dataset_version}_{end}.nc"
}
# Query API and get file list
res = self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files",
params=params,
)
try:
files = res["files"]
except KeyError:
print(res)
raise
# If result is trunacted, submit again with nextPageToken, to retreive all data
if res["isTruncated"]:
files += self.list_files(dataset_name, dataset_version, begin, end, nextPageToken=res["nextPageToken"])
# Return full file list
return files
def get_file_url(self, dataset_name: str, dataset_version: str, filename: str):
return self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
)
def download_file(url: str, local_dst: pathlib.Path, chunk_size: int = 16384) -> None:
""" Streaming download for large files
Source: https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests.
"""
with requests.get(url, stream=True) as r:
r.raise_for_status()
with local_dst.open("wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk)
def download_dataset_subset(api: OpenDataAPI, dataset_name: str, dataset_version: str, begin: str, end: str,
output_path: pathlib.Path):
# Retrieve file list
print("Retrieving file list...")
files = api.list_files(dataset_name, dataset_version, begin, end)
filenames = [f["filename"] for f in files]
print(f"-> {len(filenames)} files.")
# Check if files are already downloaded
filenames = [f for f in filenames if not (output_path / f).exists()]
if len(filenames) == 0:
print("No new files to download. ")
print("-> Done.")
return
# Get download urls
print("Retrieving download urls for each file...")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(api.get_file_url, dataset_name=dataset_name, dataset_version=dataset_version, filename=f)
for f in filenames
]
file_urls = [f.result()["temporaryDownloadUrl"] for f in futures]
print("-> Done")
# Download files
print("Downloading files...")
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for f, u in zip(filenames, file_urls):
print(f"Downloading {f}...")
try:
executor.submit(download_file, url=u, local_dst=output_path / f)
except Exception as e:
print(f"Error downloading {f}: {e}")
print(f"-> Done. Check {output_path}.")
if __name__ == "__main__":
# Define datasets to download (based on available datasets defined above)
datasets = [
DS_TOWER_UNVAL,
DS_SURFACE_UNVAL,
DS_FLUX_UNVAL,
DS_RAD
]
# Define periods to download for each dataset.
periods_datasets = [
("20211221", "20211224", datasets),
("20220418", "20220423", datasets),
("20220726", "20220729", datasets),
("20221007", "20221010", datasets),
]
# Make output directory
output = pathlib.Path("knmi_data")
output.mkdir(exist_ok=True)
# Query API and download files
api = OpenDataAPI(API_KEY)
for begin, end, datasets in periods_datasets:
for ds in datasets:
download_dataset_subset(api, *ds, begin=begin, end=end, output_path=output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment