Skip to content

Instantly share code, notes, and snippets.

@mattfysh
Last active April 6, 2023 05:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattfysh/916827271255fea30c5b2407189d76a1 to your computer and use it in GitHub Desktop.
Save mattfysh/916827271255fea30c5b2407189d76a1 to your computer and use it in GitHub Desktop.
Roast My Flow
import hashlib
from datetime import datetime, timedelta
from prefect import flow, task
from prefect.blocks.system import JSON
from prefect.task_runners import SequentialTaskRunner
import pandas as pd
import requests
from deltalake.writer import write_deltalake
def hash_url_pattern(pattern: str):
h = hashlib.md5()
h.update(pattern.encode())
return h.hexdigest()
def hash_task_cache(context, parameters):
return hash_url_pattern(parameters['url_pattern'])
def download_capture(cdx_line: str):
tokens = cdx_line.split()
if len(tokens) < 3:
return None, None
ts = tokens[1]
url = tokens[2]
raw = requests.get(f'http://web.archive.org/web/{ts}id_/{url}')
row = {
'time': datetime.strptime(ts, '%Y%m%d%H%M%S'),
'req.url': url,
'res.status': raw.status_code,
'res.body': raw.text
}
return int(ts), row
@task(cache_key_fn=hash_task_cache, cache_expiration=timedelta(hours=1))
def search_archive(url_pattern: str, from_ts: int):
search_url = f'http://web.archive.org/cdx/search/cdx?url={url_pattern}&from={from_ts}'
r = requests.get(search_url)
data = []
for line in r.text.split('\n'):
ts, row = download_capture(line)
if row is not None:
from_ts = max(from_ts, ts)
data.append(row)
if len(data) > 0:
from_ts += 1
df = pd.DataFrame(data)
write_deltalake('storage/table', df, mode='append')
return from_ts
@flow(task_runner=SequentialTaskRunner)
def get_traffic(url_pattern: str):
hash = hash_url_pattern(url_pattern)
state_slug = f'get-traffic-state-{hash}'
try:
state = JSON.load(state_slug)
except ValueError:
state = JSON(value={'from_ts': '2022'})
# ts as an int64 is too large for json, and is un/marshalled here
from_ts = int(state.value['from_ts'])
next_ts = search_archive(url_pattern, from_ts)
state.value['from_ts'] = str(next_ts)
state.save(state_slug, overwrite=True)
print(get_traffic('en.wikipedia.org/wiki/Data_engineering'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment