Skip to content

Instantly share code, notes, and snippets.

Created September 3, 2021 08:24
Show Gist options
  • Save datajoely/f4d026dfcea3ad4b1fd9e44a30da56fd to your computer and use it in GitHub Desktop.
Save datajoely/f4d026dfcea3ad4b1fd9e44a30da56fd to your computer and use it in GitHub Desktop.
Expiring HTTP dataset
This module provides custom Kedro dataset
import hashlib
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse
import humanize
import pandas as pd
import requests
from import AbstractDataSet, DataSetError, MemoryDataSet, PickleLocalDataSet
class HTTPRequestDataSet(AbstractDataSet):
This class uses the requests library to ping an API endpoint and return the output.
def _clean_dict(some_dict: Optional[Dict[str, Any]]) -> Dict[str, Any]:
return some_dict if isinstance(some_dict, dict) else {}
def __init__(self, url: str, credentials: Optional[Dict[str, Any]] = None,
str_kwargs: Optional[Dict[str, Any]] = None, request_type: str = 'GET',
request_kwargs: Optional[Dict[str, Any]] = None):
The constructor initialises the class attributes and formats the API request url
url: The API URL
credentials: The (optional) key needed to connect to the service
str_kwargs: Any additional keyword arguments
request_type: Define if GET or POST request
request_kwargs : The (optional) ``kwargs`` dictionary to provide to the ``requests.get()``
assert request_type.upper() in ('GET', 'POST')
self.request_type = request_type.upper()
formatted_url_args = self._clean_dict(str_kwargs)
formatted_creds = self._clean_dict(credentials)
formatted_request_kwargs = self._clean_dict(request_kwargs)
self.request_kwargs = {k: {key: value.format(**credentials) for key, value in v.items()}
for k, v in formatted_request_kwargs.items()}
self.url = url.format(**formatted_creds, **formatted_url_args)
self.api_attrs = credentials
def _load(self) -> requests.Response:
The load function will return a ``requests.Response`` object and update class attributes
if a HTTP 200 response is returned.
HTTP Response object
DataSetError if request is unsuccessful
if self.request_type == 'GET':
response = requests.get(url=self.url, **self.request_kwargs)
response =, **self.request_kwargs)
if response.status_code == 200:
return response
raise DataSetError('Unable to make requests {} code returned in response'.format(response.status_code))
except requests.exceptions.MissingSchema as error:
raise DataSetError(error)
def _save(self, data: Any) -> MemoryDataSet:
The save function will simply return a ``MemoryDataSet`` object
return MemoryDataSet(data=data)
def _describe(self) -> Dict[str, Any]:
The describe function will provide information about the current object
return dict(url={self.url})
def exists(self) -> bool:
HTTP request can never exist before being called
return False
class ExpiringHTTPRequestDataSet(AbstractDataSet):
This class handles the HTTP request, but only calls a new version if the previous version
has expired or not
def __init__(self, dataset: Union[HTTPRequestDataSet, Dict],
folder_path: str,
expiry: str,
credentials: Optional[Dict[str, Any]] = None):
This constructor handles the creation of the expiring request dataset
dataset: The dataset to handle
credentials: The credentials to apply
folder_path: The folder path to handle
expiry: The maximum age of the dataset before it is recalled
self.expiry = pd.to_timedelta(expiry)
if isinstance(dataset, Dict):
self._dataset = HTTPRequestDataSet(**dataset, credentials=credentials)
elif isinstance(dataset, HTTPRequestDataSet):
self._dataset = dataset
raise ValueError("Invalid YAML definition provided")
self._folder_path = folder_path
self._name = self._hash_attrs()
self._host = urlparse(self._dataset.url).netloc
self._full_path = Path(self._folder_path + f'/{self._name}.pkl')
self._pickle = PickleLocalDataSet(filepath=self._full_path)
def _exists(self) -> bool:
This function checks if the file already exists, in this case a check to
see if the file has already by persisted.
return self._full_path.exists()
def _load(self) -> Any:
# If the file DOESN'T exist, load from scratch + save
if not self._exists():'Brand new request for "%s"', self._host)
data = self._dataset.load()
# If the file DOES exist, but it HAS EXPIRED, load from scratch + save
elif self._age_seconds > self.expiry.total_seconds():'Refreshing "%s" after expiry', self._host)
data = self._dataset.load()
# If the file DOES exist AND still within the expiry WINDOW, load saved
logging.debug('Loading "%s" from cache', self._host)
data = self._pickle.load()
return data
def _save(self, data: Any) -> None:
This function persists the dataset in the appropriate pickle
def _describe(self) -> Dict[str, Any]:
This function provides a description of the current description
return dict(expiry=self.expiry, host=self._host)
def get_created_date(self) -> pd.Timestamp:
Details the created date or if it doesn't, provides start of unix time
if self.exists():
return pd.to_datetime(self._full_path.stat().st_mtime, unit='s')
return pd.to_datetime('1970-01-01')
def _hash_attrs(self) -> str:
Since the name of the catalog entry is not known by the dataset object,
a hash of the object attributes is used in place
attributes = json.dumps(self._dataset.__dict__, sort_keys=True)
hash_value = str(hashlib.md5(attributes.encode('utf8')).hexdigest())
return hash_value
def _get_now() -> pd.Timestamp:
This convenience function is used to provide current time
return pd.to_datetime('now')
def _age_delta(self) -> pd.Timedelta:
Return the age of the dataset since the cache was last created
if self._exists():
delta = self._get_now() - self.get_created_date()
return delta
return pd.to_timedelta(0)
def _age_seconds(self) -> int:
Return the age of the dataset in seconds
return int(self._age_delta.total_seconds())
def _age_human(self) -> str:
Return the age of the dataset in human friendly format
return humanize.naturaldelta(self._age_delta)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment