Skip to content

Instantly share code, notes, and snippets.

@richzw
Created October 24, 2019 15:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save richzw/afa4a2b97e939124c30265426de09c51 to your computer and use it in GitHub Desktop.
Save richzw/afa4a2b97e939124c30265426de09c51 to your computer and use it in GitHub Desktop.
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from("gcs path")
.watchForNewFiles(Duration.standardMinutes(2), Watch.Growth.never()));
PCollection<KV<String, Map<String, String>>> filter_event = lines.apply("ParseAndFilterFn", ParDo.of(new ParseAndFilterFn()));
PCollection<KV<String, Map<String, String>>> minute_window_events = filter_event.apply("MinuteFixwindow",
Window.<KV<String, Map<String, String>>>into(FixedWindows.of(Duration.standardMinutes(3)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1))
);
minute_window_events.apply("GroupByUserId", Combine.perKey(new MaxFn()))
.apply("AssembleUserMsg", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via((KV<String, Map<String, String>> kv) ->
KV.of(String.format("userid:%s,level:%s,ts:%s", kv.getKey(), kv.getValue().get("level"), kv.getValue().get("ts")),
kv.getValue().get("ts"))))
.apply("ConvertToString", MapElements.into(TypeDescriptors.strings())
.via((KV<String, String> kv) -> kv.getKey()))
.apply("WriteToPubSub",
PubsubIO.writeStrings()
.to(TOPICSTR)
);
static class MaxFn extends Combine.CombineFn<Map<String, String>, Map<String, String>, Map<String, String>> {
@Override
public Map<String, String> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, String> addInput(Map<String, String> mutableAccumulator, Map<String, String> input) {
int level = (int) Float.parseFloat((input.get("level")));
if (level > (int) Float.parseFloat(mutableAccumulator.getOrDefault("level", "0"))) {
mutableAccumulator.put("level", input.get("level"));
mutableAccumulator.put("ts", input.get("ts"));
}
return mutableAccumulator;
}
@Override
public Map<String, String> mergeAccumulators(Iterable<Map<String, String>> accumulators) {
HashMap<String, String> m = new HashMap<>();
for (Map<String, String> next : accumulators) {
String level = m.getOrDefault("level", "0");
String next_level = next.getOrDefault("level", "0");
if ((int) Float.parseFloat(level) < (int) Float.parseFloat(next_level)) {
m.put("level", next.get("level"));
m.put("ts", next.get("ts"));
}
}
return m;
}
@Override
public Map<String, String> extractOutput(Map<String, String> accumulator) {
return accumulator;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment