Last active
April 6, 2023 05:38
-
-
Save mattfysh/916827271255fea30c5b2407189d76a1 to your computer and use it in GitHub Desktop.
Roast My Flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from datetime import datetime, timedelta | |
from prefect import flow, task | |
from prefect.blocks.system import JSON | |
from prefect.task_runners import SequentialTaskRunner | |
import pandas as pd | |
import requests | |
from deltalake.writer import write_deltalake | |
def hash_url_pattern(pattern: str): | |
h = hashlib.md5() | |
h.update(pattern.encode()) | |
return h.hexdigest() | |
def hash_task_cache(context, parameters): | |
return hash_url_pattern(parameters['url_pattern']) | |
def download_capture(cdx_line: str): | |
tokens = cdx_line.split() | |
if len(tokens) < 3: | |
return None, None | |
ts = tokens[1] | |
url = tokens[2] | |
raw = requests.get(f'http://web.archive.org/web/{ts}id_/{url}') | |
row = { | |
'time': datetime.strptime(ts, '%Y%m%d%H%M%S'), | |
'req.url': url, | |
'res.status': raw.status_code, | |
'res.body': raw.text | |
} | |
return int(ts), row | |
@task(cache_key_fn=hash_task_cache, cache_expiration=timedelta(hours=1)) | |
def search_archive(url_pattern: str, from_ts: int): | |
search_url = f'http://web.archive.org/cdx/search/cdx?url={url_pattern}&from={from_ts}' | |
r = requests.get(search_url) | |
data = [] | |
for line in r.text.split('\n'): | |
ts, row = download_capture(line) | |
if row is not None: | |
from_ts = max(from_ts, ts) | |
data.append(row) | |
if len(data) > 0: | |
from_ts += 1 | |
df = pd.DataFrame(data) | |
write_deltalake('storage/table', df, mode='append') | |
return from_ts | |
@flow(task_runner=SequentialTaskRunner) | |
def get_traffic(url_pattern: str): | |
hash = hash_url_pattern(url_pattern) | |
state_slug = f'get-traffic-state-{hash}' | |
try: | |
state = JSON.load(state_slug) | |
except ValueError: | |
state = JSON(value={'from_ts': '2022'}) | |
# ts as an int64 is too large for json, and is un/marshalled here | |
from_ts = int(state.value['from_ts']) | |
next_ts = search_archive(url_pattern, from_ts) | |
state.value['from_ts'] = str(next_ts) | |
state.save(state_slug, overwrite=True) | |
print(get_traffic('en.wikipedia.org/wiki/Data_engineering')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment