Skip to content

Instantly share code, notes, and snippets.

View tuulos's full-sized avatar

Ville Tuulos tuulos

View GitHub Profile
import { spawnSync, ChildProcessWithoutNullStreams } from "child_process";
import { readFileSync } from 'fs';
import tmp from 'tmp';
tmp.setGracefulCleanup();
function waitFlow(flowname: string,
runID: string,
resolve: (reason?: any) => void,
reject: (value?: any) => void) {
from metaflow import metaflow_runner
from tempfile import NamedTemporaryFile
import ast
# fixme - it runner shouldn't require nest_asyncio in notebooks
import nest_asyncio
nest_asyncio.apply()
def get_cell():
from IPython import get_ipython
from metaflow import FlowSpec, step
from functools import wraps
def log_to_sentry(x):
print('sentry', x)
def sentry_logger(f):
@wraps(f)
def func(self):
@tuulos
tuulos / send_event.py
Created September 27, 2023 04:52
send events to OBP
import os
import json
from subprocess import check_call
import click
def assume_role(role_arn):
import boto3
sts_client = boto3.client('sts')
@tuulos
tuulos / resumingflow.py
Created August 4, 2023 21:25
demonstrates resumable processing
from metaflow import FlowSpec, step, retry, S3, current
from functools import wraps
import pickle
import random
PREFIX = "resumable-processing"
class resumable_processing:
def __init__(self, process="list", results="output"):
@tuulos
tuulos / dump_data.py
Created May 12, 2023 18:56
export Metaflow tasks in a CSV
from metaflow import namespace, Metaflow, Run
def fmt(t):
return t.strftime('%Y-%m-%dT%H:%M:%SZ')
print('flow,run,step,task,created,finished,user,runtime,pod_id,pod_name')
namespace(None)
for flow in Metaflow():
for run in flow:
if run.successful:
import random
from metaflow import FlowSpec, step, S3, Flow, Parameter, profile, kubernetes, conda, conda_base
# change columns according to your schema (or remove column list to load all)
COLUMNS = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
# group parquet files as 1GB batches
def shard_data(src, batch_size=1_000_000_000):
with S3() as s3:
objs = s3.list_recursive([src])
@tuulos
tuulos / s3dir.py
Created March 10, 2023 06:43
Sync full directories to/from S3
import os
from metaflow import S3
def put_dir(local_root, s3root):
root = os.path.abspath(local_root)
objs = []
for p, _, files in os.walk(root):
for f in files:
path = os.path.join(p, f)
key = os.path.relpath(path, start=root)
@tuulos
tuulos / config_train.py
Created February 4, 2023 00:52
Train a model with a config file using Metaflow
from metaflow import FlowSpec, step, IncludeFile
def dataset_wine():
from sklearn import datasets
return datasets.load_wine(return_X_y=True)
def model_knn(train_data, train_labels):
from sklearn.neighbors import KNeighborsClassifier
model = KNeighborsClassifier()
model.fit(train_data, train_labels)
import sys
def baseline(k):
n = 0
for i in range(k + 1):
n += str(i).count('1')
return n
def modulo(k):
if k == 0: