Skip to content

Instantly share code, notes, and snippets.

Created March 7, 2022 01:27
Show Gist options
  • Save zzstoatzz/544b5b8e4b1d88c7769d1101ec10de20 to your computer and use it in GitHub Desktop.
Save zzstoatzz/544b5b8e4b1d88c7769d1101ec10de20 to your computer and use it in GitHub Desktop.
from chessdotcom import get_player_game_archives
from io import StringIO
from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.tasks import task_input_hash
from typing import List, Tuple
import boto3, chess.pgn as pgn, pandas as pd, requests
class Game(pgn.Game):
def __init__(self: object, pgn_str: str):
self.game_obj = pgn.read_game(StringIO(pgn_str)).__dict__
self.game_obj['variations'] = str(self.game_obj['variations'][0])
self.df = pd.json_normalize(dict(self.game_obj['headers']))
self.df['pgn'] = self.game_obj['variations']
def alreadyStored(bucket: str, username: str) -> List[str]:
client = boto3.client('s3')
return [i['Key'].split('.')[0] for i in client.list_objects_v2(Bucket=bucket, Prefix=username)['Contents']]
except KeyError:
return []
def get_games(url: str) -> Tuple[Game]:
print(f"GET {url}")
raw_games = requests.get(url).json()['games']
return (Game(game['pgn']) for game in raw_games)
def load_games(games: Tuple[Game], base_path: str) -> None:
df = pd.concat([game.df for game in games])
year, month, _ = list(df['Date'])[0].split('.')
print(f'storing games from month {month} of year {year}...')
filepath = f's3://{base_path}/games/{year}/{month}.parquet.gzip'
df.to_parquet(filepath, compression='gzip')
@flow(name="Store users' games", version="1.0.0")
def orca(S3_bucket: str) -> None:
for username in ['n80n8']:
print(f"Checking for games on from: {username}")
# list of URLs to GET months of games from
archive_urls = get_player_game_archives(username=username).archives
# only get_games if they're not already stored
stored_games = alreadyStored(S3_bucket, username)
path_from = lambda url: url.split('player/')[-1]
new_user_games = [get_games(url) for url in archive_urls if path_from(url) not in stored_games]
if len(new_user_games) == 0:
print(f'No new months of games to load for {username}!')
print(f'Fetching {len(new_user_games) } new months of games from {username}..')
for month in new_user_games:
name='PGN ETL',
flow_name="Store users' games",
parameters={"S3_bucket": "nate-demo-bucket"},
tags=["chess", "funsies"]
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment