Skip to content

Instantly share code, notes, and snippets.

View cicdw's full-sized avatar

Chris White cicdw

View GitHub Profile
@cicdw
cicdw / prefect_coiled_demo.ipynb
Last active December 7, 2020 21:27
Outline of Prefect + Coiled demo
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@cicdw
cicdw / custom_trigger.py
Last active May 15, 2020 05:13
Custom Prefect Trigger
def aggregation_skipped_trigger(upstream_states: Dict[Edge, State]) -> bool:
"""
Custom trigger which trigger fails only if the aggregation task
specifically was skipped.
"""
# upstream_states is a dictionary of _all_ upstream dependencies
# but this task will only care about the aggregation task specifically
agg_state = [
state
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@cicdw
cicdw / result_interface.py
Last active April 6, 2020 20:59
Proposed Result interface
"""
The new Prefect Result interface is a mechanism for interacting with / retrieving data from possibly third party locations
in a secure manner. Results also provide convenient hooks for caching.
"""
class Result:
def __init__(self, **kwargs):
"""
Initialize the class.
"""
@cicdw
cicdw / provide_scheduled_start.py
Created October 26, 2019 22:42
Example of updating inputs to provide a scheduled start time
# if you wish to pass a start time other than immediately
# you can provide an ISO formatted timestamp for when you
# want this run to begin
from datetime import datetime, timedelta
# actually run the flow in two hours
inputs['scheduledStartTime'] = (datetime.utcnow() + timedelta(hours=2)).isoformat()
@cicdw
cicdw / provide_idem_key.py
Created October 26, 2019 22:38
Example of updating inputs to provide an Idempotency Key
# if you wish to prevent duplicate runs from being created, you may also
# provide an idempotency key
# for example, the eventTime
inputs['idempotencyKey'] = event['Records'][0]['eventTime']
@cicdw
cicdw / provide_params.py
Created October 26, 2019 22:36
Snippet of providing parmeter values
# if you wish to pass information about the triggering event as a parameter,
# simply add that to the inputs dictionary under the parameters key,
# whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
# pass the full event
inputs['parameters'] = dict(event=event)
# or just the bucket name
inputs['parameters'] = dict(bucket_name=event['Records'][0]['s3']['bucket']['name'])
@cicdw
cicdw / prefect_lambda.py
Last active October 28, 2019 17:26
A template for an AWS Lambda Function which triggers Prefect Flow Runs
import json
import os
import urllib.parse
import urllib.request
print("Loading function")
def lambda_handler(event, context):
@cicdw
cicdw / sql_create.py
Created October 13, 2019 19:19
Creates a SQLite Table for storing SSH attempts
from prefect.tasks.database.sqlite import SQLiteScript
create_script = "CREATE TABLE IF NOT EXISTS SSHATTEMPTS (timestamp TEXT, username TEXT, port INTEGER, city TEXT, country TEXT, latitude REAL, longitude REAL)"
create_table = SQLiteScript(
db="ssh.db", script=create_script, name="Create Database and Table"
)
from prefect import task, Flow
@task
def create_list():
return [1, 1, 2, 3]
@task
def add_one(x):
return x + 1