-
-
Save calonso/87d392a65079a66db75a78cb8d80ea98 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def doGetSchemas(input: SCollection[java.lang.Long], bq: BQConnector) | |
: SideInput[Map[TableReference, Iterable[TableSchema]]] = { | |
input | |
.withName("Set windowing") | |
.withGlobalWindow(WindowOptions( | |
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()), | |
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES)) | |
// Make sure we trigger a new pane | |
.groupBy[java.lang.Long](_ => 1L) | |
.withName("Retrieve schemas") | |
.flatMap { n => | |
val t0 = new Instant | |
log.info("Refreshing schemas...") | |
val schemas = bq.getTables(project, dataset).map(t => (t, bq.getTableSchema(t))) | |
schemas | |
} | |
.withName("To side input") | |
.asMultiMapSideInput | |
} | |
it should "refresh schemas" in { | |
val s = testStreamOf[java.lang.Long] | |
.advanceWatermarkTo(ZeroTime) | |
.addElements(TimestampedValue.of(1L: java.lang.Long, ZeroTime.plus(1))) | |
.advanceProcessingTime(Duration.standardMinutes(1)) | |
.advanceWatermarkTo(ZeroTime.plus(Duration.standardMinutes(1))) | |
.addElements(TimestampedValue.of(2L: java.lang.Long, ZeroTime.plus(Duration.standardMinutes(1)).plus(1))) | |
.advanceWatermarkToInfinity() | |
runWithContext { sc => | |
val stream = sc.testStream(s) | |
// The connector is mocked to return schemas for two tables on the first request and three on the second | |
val side = Provider.doGetSchemas(stream, connector) | |
val out = stream.withGlobalWindow(WindowOptions( | |
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()), | |
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, | |
allowedLateness = Duration.ZERO)) | |
// Make sure we trigger a new pane | |
.groupBy[Long](_ => 1L) | |
.withSideInputs(side) | |
.flatMap { case (_, ctx) => | |
ctx(side).toList | |
}.toSCollection | |
out should inEarlyGlobalWindowPanes { | |
// This assertion fails. It finds 3 elements, with the final status, on the early pane (just one) | |
haveSize(2) | |
} | |
// This assertion fails as well as it finds 6 | |
out should haveSize(5) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment