Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
def doGetSchemas(input: SCollection[java.lang.Long], bq: BQConnector)
: SideInput[Map[TableReference, Iterable[TableSchema]]] = {
.withName("Set windowing")
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES))
// Make sure we trigger a new pane
.groupBy[java.lang.Long](_ => 1L)
.withName("Retrieve schemas")
.flatMap { n =>
val t0 = new Instant"Refreshing schemas...")
val schemas = bq.getTables(project, dataset).map(t => (t, bq.getTableSchema(t)))
.withName("To side input")
it should "refresh schemas" in {
val s = testStreamOf[java.lang.Long]
.addElements(TimestampedValue.of(1L: java.lang.Long,
.addElements(TimestampedValue.of(2L: java.lang.Long,
runWithContext { sc =>
val stream = sc.testStream(s)
// The connector is mocked to return schemas for two tables on the first request and three on the second
val side = Provider.doGetSchemas(stream, connector)
val out = stream.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO))
// Make sure we trigger a new pane
.groupBy[Long](_ => 1L)
.flatMap { case (_, ctx) =>
out should inEarlyGlobalWindowPanes {
// This assertion fails. It finds 3 elements, with the final status, on the early pane (just one)
// This assertion fails as well as it finds 6
out should haveSize(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.