Skip to content

Instantly share code, notes, and snippets.

@mattfysh
Last active February 23, 2024 04:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattfysh/ce7fe7f9315fc27e0b682a133bb1de32 to your computer and use it in GitHub Desktop.
Save mattfysh/ce7fe7f9315fc27e0b682a133bb1de32 to your computer and use it in GitHub Desktop.
Custom bytewax operators
from copy import deepcopy
from typing import Optional, Any, TypeVar, Tuple
from bytewax.dataflow import operator
from bytewax import operators as op
from bytewax.operators import KeyedStream
RETAIN = False
DISCARD = True
E = TypeVar("E")
R = TypeVar("R")
class _EnrichJoinLogic(op.UnaryLogic):
def __init__(self, step_id: str, state: Optional[Any] = None):
self.step_id = step_id
self.state = state
def on_item(self, _now, name_value):
name, value = name_value
print("on_item", name)
if name == "reference":
if value is None:
return [], DISCARD
self.state = value
return [], RETAIN
if name == "event":
if self.state is None:
return [], DISCARD
return [(value, self.state)], RETAIN
def on_notify(self, _s):
return [], RETAIN
def on_eof(self):
return [], RETAIN
def notify_at(self):
return None
def snapshot(self):
return deepcopy(self.state)
@operator
def enrich_join(
step_id, *, event: KeyedStream[E], reference: KeyedStream[R]
) -> KeyedStream[Tuple[E, R]]:
up_ev = op.map_value("name_event", event, lambda v: ("event", v))
up_ref = op.map_value("name_reference", reference, lambda v: ("reference", v))
def shim_builder(_now, resume_state):
return _EnrichJoinLogic(step_id, resume_state)
# merge reference messages into the stream first
# in case they apply to incoming event messages
merged = op.merge("merge", up_ref, up_ev)
return op.unary("enrich_join", merged, shim_builder)
from bytewax.dataflow import operator
from bytewax import operators as op
@operator
def filter_map_value(step_id, up, mapper):
def shim_mapper(k_v):
k, v = k_v
return k, mapper(v)
return op.filter_map("filter_map", up, shim_mapper)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment