Skip to content

Instantly share code, notes, and snippets.

@fferegrino
Created February 26, 2024 16:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fferegrino/1842ff274384337880dd214835463c94 to your computer and use it in GitHub Desktop.
Save fferegrino/1842ff274384337880dd214835463c94 to your computer and use it in GitHub Desktop.
Bytewax join example
import json
from datetime import datetime, timezone, timedelta
import bytewax.operators.window as op_w
from bytewax.bytewax import EventClockConfig, SessionWindow
from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.connectors.files import FileSource
from bytewax.connectors.stdio import StdOutSink
flow = Dataflow("wordcount_eg")
input0 = op.input("input0", flow, FileSource("w0.jsonl"))
input1 = op.input("input1", flow, FileSource("w1.jsonl"))
base_date = datetime(2024, 2, 1, tzinfo=timezone.utc)
def jsonize(line):
val= json.loads(line)
offset = val.pop('offset', 0)
val['ts'] = base_date + timedelta(seconds=int(offset))
return val
map0 = op.map("p0", input0, jsonize)
map1 = op.map("p1", input1, jsonize)
clock = EventClockConfig(
dt_getter=lambda x: x["ts"], wait_for_system_duration=timedelta(seconds=60)
)
windower = SessionWindow(gap =timedelta(seconds=60))
keyed0 = op.key_on('key0', map0, lambda m: m['key'])
keyed1 = op.key_on('key1', map1, lambda m: m['key'])
merge_named = op_w.join_window_named(
"merge_named",
clock,
windower,
w0=keyed0,
w1=keyed1,
)
def prepare_named(thing):
(key, (window, objects)) = thing
for w in ['w0', 'w1']:
if w in objects:
if isinstance( objects[w]['ts'], datetime):
objects[w]['ts'] = objects[w]['ts'].strftime("%H:%M:%S")
objects['w_open'] = window.open_time.strftime("%H:%M:%S")
objects['w_close'] = window.close_time.strftime("%H:%M:%S")
return objects
def filter_unmatched(object):
return 'w0' in object and 'w1' in object
prepared = op.map('prepare', merge_named, prepare_named)
filtered = op.filter('filtered', prepared, filter_unmatched)
op.output("out", filtered, StdOutSink())
{"key": "a", "offset": 59}
{"key": "b", "offset": 0}
{"key": "c", "offset": 10}
{"key": "d", "offset": 0}
{"key": "e", "offset": 0}
{"key": "a", "offset": 61}
{"key": "b", "offset": 59}
{"key": "c", "offset": 0}
{"key": "d", "offset": 10}
{"key": "e", "offset": 0}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment