Last active
February 23, 2024 04:48
-
-
Save mattfysh/ce7fe7f9315fc27e0b682a133bb1de32 to your computer and use it in GitHub Desktop.
Custom bytewax operators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from copy import deepcopy | |
from typing import Optional, Any, TypeVar, Tuple | |
from bytewax.dataflow import operator | |
from bytewax import operators as op | |
from bytewax.operators import KeyedStream | |
RETAIN = False | |
DISCARD = True | |
E = TypeVar("E") | |
R = TypeVar("R") | |
class _EnrichJoinLogic(op.UnaryLogic): | |
def __init__(self, step_id: str, state: Optional[Any] = None): | |
self.step_id = step_id | |
self.state = state | |
def on_item(self, _now, name_value): | |
name, value = name_value | |
print("on_item", name) | |
if name == "reference": | |
if value is None: | |
return [], DISCARD | |
self.state = value | |
return [], RETAIN | |
if name == "event": | |
if self.state is None: | |
return [], DISCARD | |
return [(value, self.state)], RETAIN | |
def on_notify(self, _s): | |
return [], RETAIN | |
def on_eof(self): | |
return [], RETAIN | |
def notify_at(self): | |
return None | |
def snapshot(self): | |
return deepcopy(self.state) | |
@operator | |
def enrich_join( | |
step_id, *, event: KeyedStream[E], reference: KeyedStream[R] | |
) -> KeyedStream[Tuple[E, R]]: | |
up_ev = op.map_value("name_event", event, lambda v: ("event", v)) | |
up_ref = op.map_value("name_reference", reference, lambda v: ("reference", v)) | |
def shim_builder(_now, resume_state): | |
return _EnrichJoinLogic(step_id, resume_state) | |
# merge reference messages into the stream first | |
# in case they apply to incoming event messages | |
merged = op.merge("merge", up_ref, up_ev) | |
return op.unary("enrich_join", merged, shim_builder) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bytewax.dataflow import operator | |
from bytewax import operators as op | |
@operator | |
def filter_map_value(step_id, up, mapper): | |
def shim_mapper(k_v): | |
k, v = k_v | |
return k, mapper(v) | |
return op.filter_map("filter_map", up, shim_mapper) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment