Skip to content

Instantly share code, notes, and snippets.

View jcrist's full-sized avatar

Jim Crist-Harif jcrist

View GitHub Profile
@jcrist
jcrist / structs.py
Created August 20, 2020 17:25
Python struct class
import typing
import inspect
def _extract_attributes(bases, attrs):
arg_fields = {}
kwarg_fields = {}
existing_slots = set()
# Walk up the bases, validating and merging defaults
@jcrist
jcrist / bench.py
Created September 6, 2020 15:50
Quickle Structs benchmark
In [3]: from typing import NamedTuple
In [4]: from quickle import Struct
In [5]: from dataclasses import dataclass
In [6]: class PointTuple(NamedTuple):
...: x: int
...: y: int
...:
@jcrist
jcrist / kubernetes_job_environment.py
Last active December 16, 2020 17:57
Prefect Example using KubernetesJobEnviornment
from prefect import Flow
from prefect.environments.execution import KubernetesJobEnvironment
from prefect.environments.storage import Docker
with Flow("kubernetes-example") as flow:
# Add tasks to flow here...
# Run on Kubernetes using a custom job specification
# This was needed to do even simple things like increase
# the job resource limits
@jcrist
jcrist / kubernetes_run_config.py
Created December 16, 2020 17:54
Prefect Example using KubernetesRun
from prefect import Flow
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
with Flow("kubernetes-example") as flow:
# Add tasks to flow here...
# Run on Kubernetes with a custom resource configuration
flow.run_config = KubernetesRun(cpu_request=2, memory_request="4Gi")
# Store the flow in a docker image
@jcrist
jcrist / dask_cloudprovider.py
Created December 16, 2020 18:05
Prefect Example using DaskExecutor with dask-cloudprovider
from prefect import Flow
from prefect.executors import DaskExecutor
with Flow("daskcloudprovider-example") as flow:
# Add tasks to flow here...
# Execute this flow on a Dask cluster deployed on AWS Fargate
flow.executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"image": "prefecthq/prefect", "n_workers": 5}
@jcrist
jcrist / ecs_with_dask_cloudprovider.py
Last active December 17, 2021 19:48
Prefect Examples with RunConfigs and Executors
from prefect import Flow
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect.executors import DaskExecutor
with Flow("example") as flow:
...
flow.storage = S3("my-flows")
flow.run_config = ECSRun() # Run job on ECS instead of locally
@jcrist
jcrist / dask_resource_manager_example.py
Created February 5, 2021 20:46
An example of using Dask with a Prefect `resource_manager`
"""
For workloads where most of the grunt work is *driven* by prefect, but done
using some external system like dask, it makes more sense to use Prefect to
drive Dask rather than running Prefect inside Dask.
If you want your prefect Flow to startup a dask cluster, you'll want to ensure
all resources are still cleaned up properly, even in the case of Flow failure.
To do this, you can make use of a `prefect.resource_manager`. This mirrors the
`contextmanager` pattern you may be familiar with in Python, but makes it work
with Prefect tasks. See
@jcrist
jcrist / client.py.diff
Created February 9, 2021 15:18
Garnet.ai client diff from dask-gateway 0.8.0
diff --git a/dask_gateway/client.py b/garnet/client.py
index db044b9..6f35ea1 100644
--- a/dask_gateway/client.py
+++ b/garnet/client.py
@@ -27,25 +27,25 @@ from .utils import format_template, cancel_task
del comm
-__all__ = ("Gateway", "GatewayCluster", "GatewayClusterError", "GatewayServerError")
+__all__ = ("Garnet", "GarnetCluster", "GarnetClusterError", "GarnetServerError")
@jcrist
jcrist / bench.py
Created July 17, 2021 03:46
Benchmark for msgspec JSON encoding
import argparse
import json
import lzma
import os
import timeit
import urllib.request
import msgspec
@jcrist
jcrist / bench.py
Created January 31, 2022 20:58
A (naive) benchmark comparing pydantic & msgspec performance
"""
This benchmark is a modified version of the benchmark available at
https://github.com/samuelcolvin/pydantic/tree/master/benchmarks to support
benchmarking msgspec.
The benchmark measures the time to JSON encode/decode `n` random objects
matching a specific schema. It compares the time required for both
serialization _and_ schema validation.
"""