Skip to content

Instantly share code, notes, and snippets.

@mattfysh
mattfysh / enrich_join.py
Last active February 23, 2024 04:48
Custom bytewax operators
from copy import deepcopy
from typing import Optional, Any, TypeVar, Tuple
from bytewax.dataflow import operator
from bytewax import operators as op
from bytewax.operators import KeyedStream
RETAIN = False
DISCARD = True
E = TypeVar("E")
@mattfysh
mattfysh / 1-cleaning-notes.md
Last active October 20, 2023 05:26
arroyo prototype
  • CAST(extract_json(...)[1] AS TEXT) does not work
    • internal error: entered unreachable code: invalid cast from Utf8 to Utf8
    • when targeting string output, must use extract_json_string as workaround. all other output types will use CAST

output

Field Value
json_parsed_value "{"string":"abc","number":123,"number_float": 123.456,"boolean":true,"null":null,"string_null":"null"}"
json_parsed_json_string ""abc""
@mattfysh
mattfysh / artist.py
Created October 3, 2023 23:43
de proto
import httpx
from urllib.parse import urlparse, parse_qs
from prefect import flow, task
@task(
version="0.2.0",
cache_key_fn=lambda ctx, args: f"{args['id']} v{ctx.task.version}",
)
def collate_soundcloud_tracks(id):
path = f"/matt/soundcloud-dev/rest/Artist?id={id}"
@mattfysh
mattfysh / benthos.yaml
Last active September 3, 2023 11:34
benthos batching
input:
generate:
count: 1
mapping: '[{"x": 1}, {"x": 2, "y": "a"}]'
pipeline:
processors:
- try:
- unarchive:
format: json_array
@mattfysh
mattfysh / benthos.yaml
Last active August 22, 2023 01:00
Benthos + Consul
input:
sequence:
inputs:
- generate:
count: 1
processors:
- http:
verb: GET
url: ${! env("CONSUL_ENDPOINT") }/v1/kv/${! env("CONSUL_KV_KEY") }
headers:
input:
redis_streams:
url: redis://redis:6379
streams: [foo_stream]
body_key: body
pipeline:
processors:
- unarchive:
format: json_array
// Worker
export default {
async fetch(request, env) {
return await handleRequest(request, env);
}
}
async function handleRequest(request, env) {
if (request.method !== 'POST') {
@mattfysh
mattfysh / resource.ts
Last active May 19, 2023 05:47
Pulumi auto-hierarchy
export abstract class ProjectResource extends pulumi.ComponentResource {
protected abstract name: string
protected addResource<RT, ResourceArgs>(
name: string,
Resource: new (
name: string,
args: ResourceArgs,
opts: pulumi.CustomResourceOptions
) => RT,
@mattfysh
mattfysh / flow.py
Last active April 6, 2023 05:38
Roast My Flow
import hashlib
from datetime import datetime, timedelta
from prefect import flow, task
from prefect.blocks.system import JSON
from prefect.task_runners import SequentialTaskRunner
import pandas as pd
import requests
from deltalake.writer import write_deltalake
@mattfysh
mattfysh / genzod.ts
Last active December 8, 2023 09:32
import fs from 'node:fs/promises'
import path from 'node:path'
import rimraf from 'rimraf'
import { glob } from 'glob'
import { generate } from 'ts-to-zod'
import ts from 'typescript'
import { v4 as uuidv4 } from 'uuid'
const OUTDIR = 'packages/types/gen'
const { factory } = ts