Skip to content

Instantly share code, notes, and snippets.

@mattfysh
Created March 13, 2024 01:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattfysh/183538147bcdcd53638f2ed780f7cfd2 to your computer and use it in GitHub Desktop.
Save mattfysh/183538147bcdcd53638f2ed780f7cfd2 to your computer and use it in GitHub Desktop.
bytewax KafkaBuilder
from typing import Dict, List
from bytewax import operators as op
from bytewax.dataflow import Dataflow, Stream
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka.serde import SchemaDeserializer
class KeyDeserializer(SchemaDeserializer[bytes, str]):
def de(self, data):
return data.decode()
class KafkaBuilder:
brokers: List[str]
conf: Dict[str, str]
flow: Dataflow
def __init__(self, brokers: str, auth: str, flow: Dataflow):
self.brokers = brokers.split(",")
username, password = auth.split(":")
self.conf = {
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": username,
"sasl.password": password,
"security.protocol": "sasl_plaintext",
}
self.flow = flow
def build_input(self, id: str, topic: str, val_deserializer: SchemaDeserializer):
stream_input = kop.input(
f"inp_${id}",
self.flow,
brokers=self.brokers,
topics=[topic],
add_config=self.conf,
)
deser_input = kop.deserialize(
f"deser_{id}",
stream_input.oks,
key_deserializer=KeyDeserializer(),
val_deserializer=val_deserializer,
)
return deser_input.oks.then(op.key_on, f"key_{id}", lambda x: x.key).then(
op.map_value, f"map_{id}", lambda x: x.value
)
def build_output(self, up: Stream, id: str, topic: str):
return kop.output(
f"out_{id}",
up,
brokers=self.brokers,
topic=topic,
add_config=self.conf,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment