Skip to content

Instantly share code, notes, and snippets.

View tuulos's full-sized avatar

Ville Tuulos tuulos

View GitHub Profile
from metaflow import FlowSpec, Parameter, step, JSONType, Task, current
from datetime import datetime
from functools import wraps
from pydantic import BaseModel
class Config(BaseModel):
id: int
name = 'John Doe'
def validate(f):
import { spawnSync, ChildProcessWithoutNullStreams } from "child_process";
import { readFileSync } from 'fs';
import tmp from 'tmp';
tmp.setGracefulCleanup();
function waitFlow(flowname: string,
runID: string,
resolve: (reason?: any) => void,
reject: (value?: any) => void) {
from metaflow import metaflow_runner
from tempfile import NamedTemporaryFile
import ast
# fixme - it runner shouldn't require nest_asyncio in notebooks
import nest_asyncio
nest_asyncio.apply()
def get_cell():
from IPython import get_ipython
from metaflow import FlowSpec, step
from functools import wraps
def log_to_sentry(x):
print('sentry', x)
def sentry_logger(f):
@wraps(f)
def func(self):
@tuulos
tuulos / send_event.py
Created September 27, 2023 04:52
send events to OBP
import os
import json
from subprocess import check_call
import click
def assume_role(role_arn):
import boto3
sts_client = boto3.client('sts')
@tuulos
tuulos / resumingflow.py
Created August 4, 2023 21:25
demonstrates resumable processing
from metaflow import FlowSpec, step, retry, S3, current
from functools import wraps
import pickle
import random
PREFIX = "resumable-processing"
class resumable_processing:
def __init__(self, process="list", results="output"):
@tuulos
tuulos / dump_data.py
Created May 12, 2023 18:56
export Metaflow tasks in a CSV
from metaflow import namespace, Metaflow, Run
def fmt(t):
return t.strftime('%Y-%m-%dT%H:%M:%SZ')
print('flow,run,step,task,created,finished,user,runtime,pod_id,pod_name')
namespace(None)
for flow in Metaflow():
for run in flow:
if run.successful:
import random
from metaflow import FlowSpec, step, S3, Flow, Parameter, profile, kubernetes, conda, conda_base
# change columns according to your schema (or remove column list to load all)
COLUMNS = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
# group parquet files as 1GB batches
def shard_data(src, batch_size=1_000_000_000):
with S3() as s3:
objs = s3.list_recursive([src])
@tuulos
tuulos / s3dir.py
Created March 10, 2023 06:43
Sync full directories to/from S3
import os
from metaflow import S3
def put_dir(local_root, s3root):
root = os.path.abspath(local_root)
objs = []
for p, _, files in os.walk(root):
for f in files:
path = os.path.join(p, f)
key = os.path.relpath(path, start=root)
@tuulos
tuulos / config_train.py
Created February 4, 2023 00:52
Train a model with a config file using Metaflow
from metaflow import FlowSpec, step, IncludeFile
def dataset_wine():
from sklearn import datasets
return datasets.load_wine(return_X_y=True)
def model_knn(train_data, train_labels):
from sklearn.neighbors import KNeighborsClassifier
model = KNeighborsClassifier()
model.fit(train_data, train_labels)