Skip to content

Instantly share code, notes, and snippets.

View tuulos's full-sized avatar

Ville Tuulos tuulos

View GitHub Profile
@tuulos
tuulos / send_event.py
Created September 27, 2023 04:52
send events to OBP
import os
import json
from subprocess import check_call
import click
def assume_role(role_arn):
import boto3
sts_client = boto3.client('sts')
@tuulos
tuulos / resumingflow.py
Created August 4, 2023 21:25
demonstrates resumable processing
from metaflow import FlowSpec, step, retry, S3, current
from functools import wraps
import pickle
import random
PREFIX = "resumable-processing"
class resumable_processing:
def __init__(self, process="list", results="output"):
@tuulos
tuulos / dump_data.py
Created May 12, 2023 18:56
export Metaflow tasks in a CSV
from metaflow import namespace, Metaflow, Run
def fmt(t):
return t.strftime('%Y-%m-%dT%H:%M:%SZ')
print('flow,run,step,task,created,finished,user,runtime,pod_id,pod_name')
namespace(None)
for flow in Metaflow():
for run in flow:
if run.successful:
import random
from metaflow import FlowSpec, step, S3, Flow, Parameter, profile, kubernetes, conda, conda_base
# change columns according to your schema (or remove column list to load all)
COLUMNS = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
# group parquet files as 1GB batches
def shard_data(src, batch_size=1_000_000_000):
with S3() as s3:
objs = s3.list_recursive([src])
@tuulos
tuulos / s3dir.py
Created March 10, 2023 06:43
Sync full directories to/from S3
import os
from metaflow import S3
def put_dir(local_root, s3root):
root = os.path.abspath(local_root)
objs = []
for p, _, files in os.walk(root):
for f in files:
path = os.path.join(p, f)
key = os.path.relpath(path, start=root)
@tuulos
tuulos / config_train.py
Created February 4, 2023 00:52
Train a model with a config file using Metaflow
from metaflow import FlowSpec, step, IncludeFile
def dataset_wine():
from sklearn import datasets
return datasets.load_wine(return_X_y=True)
def model_knn(train_data, train_labels):
from sklearn.neighbors import KNeighborsClassifier
model = KNeighborsClassifier()
model.fit(train_data, train_labels)
import sys
def baseline(k):
n = 0
for i in range(k + 1):
n += str(i).count('1')
return n
def modulo(k):
if k == 0:
from metaflow import FlowSpec, step, Parameter, resources, conda_base, profile
@conda_base(python='3.8.3', libraries={'scikit-learn': '0.24.1'})
class ManyKmeansFlow(FlowSpec):
num_docs = Parameter('num-docs', help='Number of documents', default=1000000)
@resources(memory=4000)
@step
def start(self):
@tuulos
tuulos / magicdir.py
Created February 5, 2022 00:57
magic dir
from metaflow import FlowSpec, step
from functools import wraps
from functools import wraps
dir = 'mydir'
def magicdir(f):
artifact = 'magicdir'
@wraps(f)
from metaflow import FlowSpec, step, Parameter, IncludeFile, catch
import math, time, uuid, datetime, random, string, sys
from decimal import Decimal
import requests
class CustomClass():
def __str__(self):
return 'a' * int(1024**2)