Created
September 14, 2021 23:52
-
-
Save ashokc/d3a75e6c2516b5fc39ab3e383a177800 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<KV<String, RawVertex>> vertices = pipeline | |
.apply(KafkaIO.<String, GenericRecord>read() // Read from a kafka topic | |
... | |
// key is the vertex label: "A" , "B" or "C" | |
.withKeyDeserializer(StringDeserializer.class) | |
// value is an Avro object | |
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schema_registry,topic+"-value") | |
.withCreateTime(Duration.ZERO) // event createtime is used by Beam windows & watermark | |
.apply(ParDo.of(new EmitRawVertex())); // PCollection<KV<String, RawVertex>> | |
PCollection<KV<String, RawVertex>> windowedVertices = vertices | |
// 1 second windows | |
.apply(Window.<KV<String, RawVertex>>into(FixedWindows.of(Duration.standardSeconds(1))) | |
// keep the window open for a while to let in late arrivals | |
.withAllowedLateness(Duration.standardSeconds(2*runTime), Window.ClosingBehavior.FIRE_ALWAYS) | |
// trigger after every time 15 new vertices come into a window | |
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(15))) | |
// Keep the earlier arrivals for improved estimations | |
.accumulatingFiredPanes()) ; // PCollection<KV<String, RawVertex>> | |
windowedVertices | |
.apply(GroupByKey.<String, RawVertex>create()) // PCollection<KV<String, <Iterable<RawVertex>>> | |
// Emit the vertex estimated (averaged or interpolated) at mid-point. | |
// Key is switched from vertex label to time | |
.apply(ParDo.of(new EmitEstimatedVertex(vertex_estimation_strategy))) // PCollection<KV<Long, EstimatedVertex>>. | |
.apply(GroupByKey.<Long, EstimatedVertex>create()) // PCollection<KV<Long, <Iterable<EstimatedVertex>>> | |
// Define a triangle and compute metrics | |
.apply(ParDo.of(new EmitTriangle())) // PCollection<Triangle>> | |
.apply(ParDo.of(new EmitJsonTriangle())) // PCollection<String> | |
.apply(ElasticsearchIO.write() // write to Elasticsearch | |
.withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration | |
.create(esHosts, props.getProperty("es_triangle_index")) | |
.withApiKey(props.getProperty("es_api_key")) | |
... | |
)) ; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment