Created
February 26, 2024 16:03
-
-
Save fferegrino/1842ff274384337880dd214835463c94 to your computer and use it in GitHub Desktop.
Bytewax join example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from datetime import datetime, timezone, timedelta | |
import bytewax.operators.window as op_w | |
from bytewax.bytewax import EventClockConfig, SessionWindow | |
from bytewax.dataflow import Dataflow | |
import bytewax.operators as op | |
from bytewax.connectors.files import FileSource | |
from bytewax.connectors.stdio import StdOutSink | |
flow = Dataflow("wordcount_eg") | |
input0 = op.input("input0", flow, FileSource("w0.jsonl")) | |
input1 = op.input("input1", flow, FileSource("w1.jsonl")) | |
base_date = datetime(2024, 2, 1, tzinfo=timezone.utc) | |
def jsonize(line): | |
val= json.loads(line) | |
offset = val.pop('offset', 0) | |
val['ts'] = base_date + timedelta(seconds=int(offset)) | |
return val | |
map0 = op.map("p0", input0, jsonize) | |
map1 = op.map("p1", input1, jsonize) | |
clock = EventClockConfig( | |
dt_getter=lambda x: x["ts"], wait_for_system_duration=timedelta(seconds=60) | |
) | |
windower = SessionWindow(gap =timedelta(seconds=60)) | |
keyed0 = op.key_on('key0', map0, lambda m: m['key']) | |
keyed1 = op.key_on('key1', map1, lambda m: m['key']) | |
merge_named = op_w.join_window_named( | |
"merge_named", | |
clock, | |
windower, | |
w0=keyed0, | |
w1=keyed1, | |
) | |
def prepare_named(thing): | |
(key, (window, objects)) = thing | |
for w in ['w0', 'w1']: | |
if w in objects: | |
if isinstance( objects[w]['ts'], datetime): | |
objects[w]['ts'] = objects[w]['ts'].strftime("%H:%M:%S") | |
objects['w_open'] = window.open_time.strftime("%H:%M:%S") | |
objects['w_close'] = window.close_time.strftime("%H:%M:%S") | |
return objects | |
def filter_unmatched(object): | |
return 'w0' in object and 'w1' in object | |
prepared = op.map('prepare', merge_named, prepare_named) | |
filtered = op.filter('filtered', prepared, filter_unmatched) | |
op.output("out", filtered, StdOutSink()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"key": "a", "offset": 59} | |
{"key": "b", "offset": 0} | |
{"key": "c", "offset": 10} | |
{"key": "d", "offset": 0} | |
{"key": "e", "offset": 0} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"key": "a", "offset": 61} | |
{"key": "b", "offset": 59} | |
{"key": "c", "offset": 0} | |
{"key": "d", "offset": 10} | |
{"key": "e", "offset": 0} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment