Skip to content

Instantly share code, notes, and snippets.

View dewaldabrie's full-sized avatar

dewaldabrie

  • Cover Genius
  • Sydney, Australia
View GitHub Profile
from coroflow import Pipeline, Node
import logging
from .data_collector import agg_data_deserialise, get_symbols, agg_data_update, agg_data_serialize
def main_sync():
# Create Pipeline with a node for each function passed in.
# Data flows like so:
# get_symbols -> symbol_data -> agg_data_update
"""
Find list of ASX ETF symbols at asxetfs.com
"""
import os
import time
import logging
import pickle
import csv
import pandas as pd
"""
Find list of ASX ETF symbols at asxetfs.com
"""
import os
from coroflow import Pipeline, Node
import time
import logging
import pickle
import csv
import pandas as pd
@dewaldabrie
dewaldabrie / async_queued_pipeline.py
Created November 10, 2020 23:24
Async Queued Pipeline
import asyncio
import time
tasks = []
async def func(input_q, target_qs, task_id, param):
print(f"{task_id}: Initialised with param: {param}")
async def func_inner(input_q, target_qs, inpt):
print(f"{task_id}: Recieved input: {inpt}")
@dewaldabrie
dewaldabrie / async_generator_pipeline.py
Last active November 11, 2020 00:35
Asynchronous Generator Pipeline
import asyncio
import time
tasks = []
def func(targets, task_id, param):
print(f"{task_id}: Initialised with param: {param}")
async def func_inner(targets, inpt):
await asyncio.sleep(1) # simulated IO delay
@dewaldabrie
dewaldabrie / synchronous_generator_pipeline.py
Last active June 10, 2021 02:49
Synchronous Generator Pipeline
import time
def func(targets, task_id, param=None):
print(f"{task_id}: Initialised with param: {param}")
while True:
inpt = (yield)
print(f"{task_id}: Received input: {inpt}")
time.sleep(1) # simulated IO delay
for target in targets:
print(f"{task_id}: T1 sending {inpt}")
@dewaldabrie
dewaldabrie / bq_stored_proc.sql
Last active February 27, 2022 08:56
BigQuery Stored Procedure Example
CREATE OR REPLACE PROCEDURE ledger.make_ledger_entries() BEGIN
DECLARE unique_item_names ARRAY<STRING>;
DECLARE item_idx INT64 DEFAULT 0;
DECLARE current_table_name STRING;
-- Create some fake staging data for illustration
-- In reality, this could be ingested from a federated data source
CREATE OR REPLACE TEMPORARY TABLE new_purchases AS (
SELECT *
@dewaldabrie
dewaldabrie / bq_cte.sql
Created September 30, 2020 01:57
BigQuery CTE Example
WITH basket_1 AS (
SELECT *
FROM UNNEST([
STRUCT("apple" AS item_name, 2 AS quantity),
("banana", 0),
("pear", 4)
])
),
basket_2 AS (
SELECT *
@dewaldabrie
dewaldabrie / bq_vars.sql
Created September 30, 2020 01:34
BigQuery Variables Example
DECLARE start_time TIMESTAMP;
DECLARE stop_time TIMESTAMP;
SET start_time = TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY);
SET stop_time = TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 HOUR);
SELECT id
FROM my_dataset.my_table
WHERE created_at BETWEEN start_time and stop_time;
@dewaldabrie
dewaldabrie / bq_js_persitent_fun_with_ext_source.sql
Last active September 30, 2020 01:22
Bigquery JS UDF Examples
-- Example of a persistent function that use libraries stored in Google Cloud Storage
-- Taken from https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions
CREATE OR REPLACE FUNCTION dataset.myFunc(a FLOAT64, b STRING)
RETURNS STRING
LANGUAGE js
OPTIONS (
library=["gs://my-bucket/path/to/lib1.js", "gs://my-bucket/path/to/lib2.js"]
)
AS
"""