Skip to content

Instantly share code, notes, and snippets.

@ashokc
Created September 14, 2021 23:52
Show Gist options
  • Save ashokc/d3a75e6c2516b5fc39ab3e383a177800 to your computer and use it in GitHub Desktop.
Save ashokc/d3a75e6c2516b5fc39ab3e383a177800 to your computer and use it in GitHub Desktop.
PCollection<KV<String, RawVertex>> vertices = pipeline
.apply(KafkaIO.<String, GenericRecord>read() // Read from a kafka topic
...
// key is the vertex label: "A" , "B" or "C"
.withKeyDeserializer(StringDeserializer.class)
// value is an Avro object
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schema_registry,topic+"-value")
.withCreateTime(Duration.ZERO) // event createtime is used by Beam windows & watermark
.apply(ParDo.of(new EmitRawVertex())); // PCollection<KV<String, RawVertex>>
PCollection<KV<String, RawVertex>> windowedVertices = vertices
// 1 second windows
.apply(Window.<KV<String, RawVertex>>into(FixedWindows.of(Duration.standardSeconds(1)))
// keep the window open for a while to let in late arrivals
.withAllowedLateness(Duration.standardSeconds(2*runTime), Window.ClosingBehavior.FIRE_ALWAYS)
// trigger after every time 15 new vertices come into a window
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(15)))
// Keep the earlier arrivals for improved estimations
.accumulatingFiredPanes()) ; // PCollection<KV<String, RawVertex>>
windowedVertices
.apply(GroupByKey.<String, RawVertex>create()) // PCollection<KV<String, <Iterable<RawVertex>>>
// Emit the vertex estimated (averaged or interpolated) at mid-point.
// Key is switched from vertex label to time
.apply(ParDo.of(new EmitEstimatedVertex(vertex_estimation_strategy))) // PCollection<KV<Long, EstimatedVertex>>.
.apply(GroupByKey.<Long, EstimatedVertex>create()) // PCollection<KV<Long, <Iterable<EstimatedVertex>>>
// Define a triangle and compute metrics
.apply(ParDo.of(new EmitTriangle())) // PCollection<Triangle>>
.apply(ParDo.of(new EmitJsonTriangle())) // PCollection<String>
.apply(ElasticsearchIO.write() // write to Elasticsearch
.withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration
.create(esHosts, props.getProperty("es_triangle_index"))
.withApiKey(props.getProperty("es_api_key"))
...
)) ;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment