Skip to content

Instantly share code, notes, and snippets.

@calonso
Created May 29, 2018 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calonso/87d392a65079a66db75a78cb8d80ea98 to your computer and use it in GitHub Desktop.
Save calonso/87d392a65079a66db75a78cb8d80ea98 to your computer and use it in GitHub Desktop.
def doGetSchemas(input: SCollection[java.lang.Long], bq: BQConnector)
: SideInput[Map[TableReference, Iterable[TableSchema]]] = {
input
.withName("Set windowing")
.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES))
// Make sure we trigger a new pane
.groupBy[java.lang.Long](_ => 1L)
.withName("Retrieve schemas")
.flatMap { n =>
val t0 = new Instant
log.info("Refreshing schemas...")
val schemas = bq.getTables(project, dataset).map(t => (t, bq.getTableSchema(t)))
schemas
}
.withName("To side input")
.asMultiMapSideInput
}
it should "refresh schemas" in {
val s = testStreamOf[java.lang.Long]
.advanceWatermarkTo(ZeroTime)
.addElements(TimestampedValue.of(1L: java.lang.Long, ZeroTime.plus(1)))
.advanceProcessingTime(Duration.standardMinutes(1))
.advanceWatermarkTo(ZeroTime.plus(Duration.standardMinutes(1)))
.addElements(TimestampedValue.of(2L: java.lang.Long, ZeroTime.plus(Duration.standardMinutes(1)).plus(1)))
.advanceWatermarkToInfinity()
runWithContext { sc =>
val stream = sc.testStream(s)
// The connector is mocked to return schemas for two tables on the first request and three on the second
val side = Provider.doGetSchemas(stream, connector)
val out = stream.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO))
// Make sure we trigger a new pane
.groupBy[Long](_ => 1L)
.withSideInputs(side)
.flatMap { case (_, ctx) =>
ctx(side).toList
}.toSCollection
out should inEarlyGlobalWindowPanes {
// This assertion fails. It finds 3 elements, with the final status, on the early pane (just one)
haveSize(2)
}
// This assertion fails as well as it finds 6
out should haveSize(5)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment