Skip to content

Instantly share code, notes, and snippets.

@damiondoesthings
Created February 28, 2024 15:03
Show Gist options
  • Save damiondoesthings/b5d16d22d18f37675a8a76e318c1bc8c to your computer and use it in GitHub Desktop.
Save damiondoesthings/b5d16d22d18f37675a8a76e318c1bc8c to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta, timezone
from typing import Generator
from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.testing import run_main, cluster_main
from bytewax.inputs import StatefulSourcePartition, FixedPartitionedSource
import bytewax.operators.window as w
BATCH_SIZE = 100_000
BATCH_COUNT = 10
class _NumberSource(StatefulSourcePartition):
def __init__(self, i: int) -> None:
self.i = i
self.records = self._record_gen()
self.start = datetime.now()
def close(self) -> None:
pass
def next_awake(self) -> datetime | None:
return None
def _record_gen(self) -> Generator[list[int], None, None]:
yield [datetime.now(tz=timezone.utc) + timedelta(seconds=i) for i in range(self.i)]
stop = datetime.now()
print(f"Duration: {stop - self.start}", flush=True)
def next_batch(self, *args, **kwargs) -> list[int]:
return next(self.records)
def snapshot(self) -> None:
return None
class NumbersInput(FixedPartitionedSource):
def __init__(self, i: int) -> None:
self.i = i
def build_part(self, *args, **kwargs) -> _NumberSource:
return _NumberSource(self.i)
def list_parts(self) -> list[str]:
return ["single"]
clock_config = w.EventClockConfig(
dt_getter=lambda x: x,
wait_for_system_duration=timedelta(seconds=0),
)
window = w.TumblingWindow(align_to=datetime(2022, 1, 1, tzinfo=timezone.utc), length=timedelta(minutes=1))
flow = Dataflow("bench")
(
op.input("in", flow, NumbersInput(BATCH_SIZE))
.then(op.flat_map, "flat-map", lambda x: (x for _ in range(BATCH_COUNT)))
.then(op.key_on, "key-on", lambda _: "x")
.then(w.fold_window, "fold-window",
clock_config,
window,
lambda: None,
lambda s, _: s,
)
.then(op.filter, "filter_all", lambda _: False)
.then(op.output, "stdout", StdOutSink())
)
print("Running as single process")
run_main(flow)
print("Running as single process in cluster mode")
cluster_main(flow, addresses=["localhost:9999"], proc_id=0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment