Skip to content

Instantly share code, notes, and snippets.

View zzstoatzz's full-sized avatar

nate nowack zzstoatzz

View GitHub Profile
##### Frequency identification
# imports
from IPython.display import HTML
from FREQhelpers import createDash
import sys, time
# animation / dynamics inputs
resolution = .15
duration, T = 30, 2 # seconds, seconds/cycle
freqs, signal = [5, 10], False
@zzstoatzz
zzstoatzz / spec.json
Created April 26, 2021 00:56
airbyte-spec
{
"documentationUrl": "https://docs.airbyte.io",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Smartsheets Source Spec",
"type": "object",
"required": ["access_token", "spreadsheet_id"],
"additionalProperties": false,
"properties": {
"access_token": {
@zzstoatzz
zzstoatzz / source.py
Created April 26, 2021 01:22
airbyte source connector pseudo class definition
# main class definition
class SourceSmartsheets(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
# implement "check" that your credentials allow a good connection to your data source
# e.g. Smartsheets api call on my smartsheet of interest -> status 200
return AirbyteConnectionStatus(status=Status.FAILED)
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
# create a JSONschema catalog for your source based on column names and types
from prefect import Flow, Parameter, task
from prefect.backend import get_key_value
from prefect.engine import signals
from prefect.engine.results import S3Result
from prefect.run_configs import KubernetesRun
from prefect.storage.s3 import S3
from prefect.tasks.secrets import PrefectSecret
import pandas as pd
import requests
from chessdotcom import get_player_game_archives
from io import StringIO
from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.tasks import task_input_hash
from typing import List, Tuple
import boto3, chess.pgn as pgn, pandas as pd, requests
# https://python-chess.readthedocs.io/en/latest/pgn.html
class Game(pgn.Game):
from chessdotcom import get_player_game_archives
from io import StringIO
from typing import List, Tuple
import boto3, chess.pgn as pgn, pandas as pd, requests
# https://python-chess.readthedocs.io/en/latest/pgn.html
class Game(pgn.Game):
def __init__(self: object, pgn_str: str):
self.game_obj = pgn.read_game(StringIO(pgn_str)).__dict__
self.game_obj['variations'] = str(self.game_obj['variations'][0])
Event Site Date Round White Black Result BlackElo CurrentPosition ECO ECOUrl EndDate EndTime Link StartTime Termination TimeControl Timezone UTCDate UTCTime WhiteElo pgn
Live Chess Chess.com 2021.12.03 - glen_bishop n80n8 1-0 414 4Rk2/ppp3p1/7p/5pN1/2n5/2b5/P4PPP/4R1K1 b - - C51 https://www.chess.com/openings/Giuoco-Piano-Game-Evans-Accepted-McDonnell-Defense-6.d4 2021.12.03 18:41:27 https://www.chess.com/game/live/32295537927 18:32:34 glen_bishop won by checkmate 600 UTC 2021.12.03 18:32:34 1226 1. e4 { [%clk 0:10:00] } 1... e5 { [%clk 0:10:00] } 2. Nf3 { [%clk 0:09:55.7] } 2... Nc6 { [%clk 0:09:54.8] } 3. Bc4 { [%clk 0:09:48] } 3... Bc5 { [%clk 0:09:35.6] } 4. b4 { [%clk 0:09:43.8] } 4... Bxb4 { [%clk 0:09:28.3] } 5. c3 { [%clk 0:09:35.5] } 5... Bc5 { [%clk 0:09:25.8] } 6. d4 { [%clk 0:09:33.2] } 6... Bb6 { [%clk 0:09:13.2] } 7. dxe5 { [%clk 0:09:07.2] } 7... Nge7 { [%clk 0:09:01] } 8. O-O { [%clk 0:08:56.7] } 8... d5 { [%clk 0:08:54.2] } 9. exd5 { [%clk 0:08:49.4] } 9... Na5 { [%clk 0:08:43.5] } 10. Bb5+ {
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def get_games(url: str) -> List[Game]:
logger = get_run_logger()
logger.info(f"GET {url}")
return [Game(game['pgn'])for game in requests.get(url).json()['games']]
from chessdotcom import get_player_game_archives
from io import StringIO
from prefect import flow, task, get_run_logger
from typing import List, Tuple
import boto3, chess.pgn as pgn, pandas as pd, requests
# https://python-chess.readthedocs.io/en/latest/pgn.html
class Game(pgn.Game):
def __init__(self: object, pgn_str: str):
self.game_obj = pgn.read_game(StringIO(pgn_str)).__dict__
❯ python flow.py
10:56:13.218 | INFO | prefect.engine - Created flow run 'marvellous-serval' for flow "Store chess.com users' games"
10:56:13.218 | INFO | Flow run 'marvellous-serval' - Using task runner 'ConcurrentTaskRunner'
Checking for games on chess.com from: n80n8
10:56:13.522 | INFO | Flow run 'marvellous-serval' - Created task run 'get_games-d2192cf5-0' for task 'get_games'
10:56:13.558 | INFO | Flow run 'marvellous-serval' - Created task run 'get_games-d2192cf5-1' for task 'get_games'
GET https://api.chess.com/pub/player/n80n8/games/2021/11
/Users/nate/opt/miniconda3/envs/chess/lib/python3.7/site-packages/prefect/client.py:1196: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
"No default storage has been set on the server. "