Skip to content

Instantly share code, notes, and snippets.

@lauralorenz
lauralorenz / dag.py
Created February 2, 2017 17:53
Example of a self healing Airflow sensor polling an external async job
"""
Run a daily backup of all our GraphStory instances using the GraphStory alpha API.
Run every day at 23:00 UTC
"""
import admin # noqa
from airflow import DAG
import datetime
from datadive import settings
from datadive.neo4j_backups.sense import GraphStoryExportSensor
@lauralorenz
lauralorenz / 1-helloworld.py
Last active April 6, 2020 03:21
Project Earth Livestream Code Samples
from random import randint
from time import sleep
import prefect
from prefect import Task, task
class AddTask(Task):
def __init__(self, default: int, *args, **kwargs):
super().__init__(*args, **kwargs)
@lauralorenz
lauralorenz / pydata_denver_demo.py
Created April 15, 2020 23:43
Pydata Denver basic ETL flow
import requests
import json
from collections import namedtuple
from contextlib import closing
import sqlite3
from prefect import task, Flow
## extract
@task
import time
from prefect import task, Flow
from prefect.engine.executors import DaskExecutor
@task
def times():
return [1, 5, 10]
from prefect import Flow, task, Parameter
@task
def get_s3_keys(bucket: str):
# get a list_of_keys from your s3 bucket with boto3
return list_of_keys
@task
def process_s3_key(key: str):
from prefect import Flow, task, Parameter
@task
def get_s3_keys(bucket: str):
# get a list_of_keys from your s3 bucket with boto3
return list_of_keys
@task
def process_s3_key(key: str):
@lauralorenz
lauralorenz / example_diamond_flow.py
Last active September 10, 2021 08:23
A diamond-shaped Prefect flow using mapping
from prefect import task, Flow
import datetime
import random
@task
def inc(x):
return x + 1
@lauralorenz
lauralorenz / dask_flow_1and2.py
Created June 26, 2020 18:27
Configuration on a flow using a distributed dask scheduler you've started
from dask_flow import flow
from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(address="10.255.253.2:8786") # change this to yours
flow.run(executor=executor)
@lauralorenz
lauralorenz / config.yml
Created June 26, 2020 18:42
Basic config for Dask Helm for Prefect flows
worker:
replicas: 4
resources:
limits:
cpu: 1
memory: 3G
requests:
cpu: 1
memory: 3G
env:
@lauralorenz
lauralorenz / dask_flow_3.py
Created June 26, 2020 18:47
Using Prefect Cloud and just the Kubernetes Agent
from dask_flow import flow
from prefect.environments.storage import Docker
flow.storage = Docker(registry_url="gcr.io/execution-layer-demo/flows")
flow.register('execution-layer-demo')