Skip to content

Instantly share code, notes, and snippets.

View tuulos's full-sized avatar

Ville Tuulos tuulos

View GitHub Profile
@tuulos
tuulos / s3dir.py
Created March 10, 2023 06:43
Sync full directories to/from S3
import os
from metaflow import S3
def put_dir(local_root, s3root):
root = os.path.abspath(local_root)
objs = []
for p, _, files in os.walk(root):
for f in files:
path = os.path.join(p, f)
key = os.path.relpath(path, start=root)
@tuulos
tuulos / send_event.py
Created September 27, 2023 04:52
send events to OBP
import os
import json
from subprocess import check_call
import click
def assume_role(role_arn):
import boto3
sts_client = boto3.client('sts')
from metaflow import FlowSpec, step
class HelloWorldFlow(FlowSpec):
@step
def start(self):
print("This is start step")
import time
print("<<BEGIN>> 10")
for i in range(10):
@tuulos
tuulos / resumingflow.py
Created August 4, 2023 21:25
demonstrates resumable processing
from metaflow import FlowSpec, step, retry, S3, current
from functools import wraps
import pickle
import random
PREFIX = "resumable-processing"
class resumable_processing:
def __init__(self, process="list", results="output"):
@tuulos
tuulos / dump_data.py
Created May 12, 2023 18:56
export Metaflow tasks in a CSV
from metaflow import namespace, Metaflow, Run
def fmt(t):
return t.strftime('%Y-%m-%dT%H:%M:%SZ')
print('flow,run,step,task,created,finished,user,runtime,pod_id,pod_name')
namespace(None)
for flow in Metaflow():
for run in flow:
if run.successful:
import random
from metaflow import FlowSpec, step, S3, Flow, Parameter, profile, kubernetes, conda, conda_base
# change columns according to your schema (or remove column list to load all)
COLUMNS = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
# group parquet files as 1GB batches
def shard_data(src, batch_size=1_000_000_000):
with S3() as s3:
objs = s3.list_recursive([src])
@tuulos
tuulos / config_train.py
Created February 4, 2023 00:52
Train a model with a config file using Metaflow
from metaflow import FlowSpec, step, IncludeFile
def dataset_wine():
from sklearn import datasets
return datasets.load_wine(return_X_y=True)
def model_knn(train_data, train_labels):
from sklearn.neighbors import KNeighborsClassifier
model = KNeighborsClassifier()
model.fit(train_data, train_labels)
import sys
def baseline(k):
n = 0
for i in range(k + 1):
n += str(i).count('1')
return n
def modulo(k):
if k == 0:
@tuulos
tuulos / profileflow.py
Created February 2, 2021 05:32
Profile memory in Metaflow
from metaflow import FlowSpec, step
from functools import wraps
def profile_memory(f):
@wraps(f)
def func(self):
from memory_profiler import memory_usage
self.mem_usage = memory_usage((f, (self,), {}), timeout=6000000, interval=1)
return func
from metaflow import FlowSpec, step, Parameter, resources, conda_base, profile
@conda_base(python='3.8.3', libraries={'scikit-learn': '0.24.1'})
class ManyKmeansFlow(FlowSpec):
num_docs = Parameter('num-docs', help='Number of documents', default=1000000)
@resources(memory=4000)
@step
def start(self):