Skip to content

Instantly share code, notes, and snippets.

@datajoely
Created September 3, 2021 08:24
Show Gist options
  • Save datajoely/f4d026dfcea3ad4b1fd9e44a30da56fd to your computer and use it in GitHub Desktop.
Save datajoely/f4d026dfcea3ad4b1fd9e44a30da56fd to your computer and use it in GitHub Desktop.
Expiring HTTP dataset
"""
This module provides custom Kedro dataset
"""
import hashlib
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse
import humanize
import pandas as pd
import requests
from kedro.io import AbstractDataSet, DataSetError, MemoryDataSet, PickleLocalDataSet
class HTTPRequestDataSet(AbstractDataSet):
"""
This class uses the requests library to ping an API endpoint and return the output.
"""
@staticmethod
def _clean_dict(some_dict: Optional[Dict[str, Any]]) -> Dict[str, Any]:
return some_dict if isinstance(some_dict, dict) else {}
def __init__(self, url: str, credentials: Optional[Dict[str, Any]] = None,
str_kwargs: Optional[Dict[str, Any]] = None, request_type: str = 'GET',
request_kwargs: Optional[Dict[str, Any]] = None):
"""
The constructor initialises the class attributes and formats the API request url
Args:
url: The API URL
credentials: The (optional) key needed to connect to the service
str_kwargs: Any additional keyword arguments
request_type: Define if GET or POST request
request_kwargs : The (optional) ``kwargs`` dictionary to provide to the ``requests.get()``
"""
assert request_type.upper() in ('GET', 'POST')
self.request_type = request_type.upper()
formatted_url_args = self._clean_dict(str_kwargs)
formatted_creds = self._clean_dict(credentials)
formatted_request_kwargs = self._clean_dict(request_kwargs)
self.request_kwargs = {k: {key: value.format(**credentials) for key, value in v.items()}
for k, v in formatted_request_kwargs.items()}
self.url = url.format(**formatted_creds, **formatted_url_args)
self.api_attrs = credentials
def _load(self) -> requests.Response:
"""
The load function will return a ``requests.Response`` object and update class attributes
if a HTTP 200 response is returned.
Returns:
HTTP Response object
Raises:
DataSetError if request is unsuccessful
"""
if self.request_type == 'GET':
response = requests.get(url=self.url, **self.request_kwargs)
else:
response = requests.post(url=self.url, **self.request_kwargs)
try:
if response.status_code == 200:
return response
raise DataSetError('Unable to make requests {} code returned in response'.format(response.status_code))
except requests.exceptions.MissingSchema as error:
raise DataSetError(error)
def _save(self, data: Any) -> MemoryDataSet:
"""
The save function will simply return a ``MemoryDataSet`` object
"""
return MemoryDataSet(data=data)
def _describe(self) -> Dict[str, Any]:
"""
The describe function will provide information about the current object
"""
return dict(url={self.url})
def exists(self) -> bool:
"""
HTTP request can never exist before being called
"""
return False
class ExpiringHTTPRequestDataSet(AbstractDataSet):
"""
This class handles the HTTP request, but only calls a new version if the previous version
has expired or not
"""
def __init__(self, dataset: Union[HTTPRequestDataSet, Dict],
folder_path: str,
expiry: str,
credentials: Optional[Dict[str, Any]] = None):
"""
This constructor handles the creation of the expiring request dataset
Args:
dataset: The dataset to handle
credentials: The credentials to apply
folder_path: The folder path to handle
expiry: The maximum age of the dataset before it is recalled
"""
self.expiry = pd.to_timedelta(expiry)
if isinstance(dataset, Dict):
self._dataset = HTTPRequestDataSet(**dataset, credentials=credentials)
elif isinstance(dataset, HTTPRequestDataSet):
self._dataset = dataset
else:
raise ValueError("Invalid YAML definition provided")
self._folder_path = folder_path
self._name = self._hash_attrs()
self._host = urlparse(self._dataset.url).netloc
self._full_path = Path(self._folder_path + f'/{self._name}.pkl')
self._pickle = PickleLocalDataSet(filepath=self._full_path)
def _exists(self) -> bool:
"""
This function checks if the file already exists, in this case a check to
see if the file has already by persisted.
"""
return self._full_path.exists()
def _load(self) -> Any:
"""
"""
# If the file DOESN'T exist, load from scratch + save
if not self._exists():
logging.info('Brand new request for "%s"', self._host)
data = self._dataset.load()
self._save(data=data)
# If the file DOES exist, but it HAS EXPIRED, load from scratch + save
elif self._age_seconds > self.expiry.total_seconds():
logging.info('Refreshing "%s" after expiry', self._host)
data = self._dataset.load()
self._save(data=data)
# If the file DOES exist AND still within the expiry WINDOW, load saved
else:
logging.debug('Loading "%s" from cache', self._host)
data = self._pickle.load()
return data
def _save(self, data: Any) -> None:
"""
This function persists the dataset in the appropriate pickle
"""
self._pickle.save(data)
def _describe(self) -> Dict[str, Any]:
"""
This function provides a description of the current description
"""
return dict(expiry=self.expiry, host=self._host)
def get_created_date(self) -> pd.Timestamp:
"""
Details the created date or if it doesn't, provides start of unix time
"""
if self.exists():
return pd.to_datetime(self._full_path.stat().st_mtime, unit='s')
return pd.to_datetime('1970-01-01')
def _hash_attrs(self) -> str:
"""
Since the name of the catalog entry is not known by the dataset object,
a hash of the object attributes is used in place
"""
attributes = json.dumps(self._dataset.__dict__, sort_keys=True)
hash_value = str(hashlib.md5(attributes.encode('utf8')).hexdigest())
return hash_value
@staticmethod
def _get_now() -> pd.Timestamp:
"""
This convenience function is used to provide current time
"""
return pd.to_datetime('now')
@property
def _age_delta(self) -> pd.Timedelta:
"""
Return the age of the dataset since the cache was last created
"""
if self._exists():
delta = self._get_now() - self.get_created_date()
return delta
return pd.to_timedelta(0)
@property
def _age_seconds(self) -> int:
"""
Return the age of the dataset in seconds
"""
return int(self._age_delta.total_seconds())
@property
def _age_human(self) -> str:
"""
Return the age of the dataset in human friendly format
"""
return humanize.naturaldelta(self._age_delta)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment