public static void main(String[] args) throws IOException {
//create scope list with DataFlow's scopes
Set<String> scopeList = new HashSet<>();
GoogleCredentials credential = null;
try {
String curDir = Paths.get(".").toAbsolutePath().normalize().toString();
FileInputStream credFile = new FileInputStream(curDir + "/secrete.json");
credential = GoogleCredentials.fromStream(credFile).createScoped(scopeList);
} catch (Exception ex) {
System.out.println("Catch exception on read credential file...");
GCSToPubSubOptions options = PipelineOptionsFactory.fromArgs(args)
Pipeline pipeline = Pipeline.create(options);
String sourcePath = "gs://bucket-test";
PCollection<String> lines = pipeline.apply("readDataFromGCS", + "/fileprefix*.log.gz")
.watchForNewFiles(Duration.standardMinutes(2), Watch.Growth.never()));
PCollection<KV<String, Map<String, String>>> filter_event = lines.apply("ParseAndFilterFn", ParDo.of(new ParseAndFilterFn()));
PCollection<KV<String, Map<String, String>>> minute_window_events = filter_event.apply("MinuteFixwindow",
Window.<KV<String, Map<String, String>>>into(FixedWindows.of(Duration.standardMinutes(3)))
minute_window_events.apply("GroupByUserId", Combine.perKey(new MaxFn()))
.apply("AssembleUserMsg", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via((KV<String, Map<String, String>> kv) ->
KV.of(String.format("uid:%s,le:%s,ts:%s", kv.getKey(), kv.getValue().get("le"), kv.getValue().get("ts")),
.apply("ConvertToPubSubMsg", ParDo.of(new Stamp(null)))
.apply(new PubSubSink());
// Execute the pipeline and wait until it finishes running.;
private static class PubSubSink extends PTransform<PCollection<PubsubMessage>, PDone> { // Void
public PDone expand(PCollection<PubsubMessage> input) {
PubsubUnboundedSink sink = new PubsubUnboundedSink(
return input.apply(sink);
private static class SimpleStamp extends DoFn<KV<String, String>, PubsubMessage> {
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) {
out.output(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), new HashMap<>()));
private static class Stamp extends DoFn<KV<String, String>, PubsubMessage> {
private final Map<String, String> attributes;
private Stamp() {
private Stamp(Map<String, String> attributes) {
this.attributes = attributes;
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) {
out.outputWithTimestamp(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), attributes),
new Instant(elem.getValue()));
static class ParseAndFilterFn extends DoFn<String, KV<String, Map<String, String>>> {
public void processElement(@Element String element, OutputReceiver<KV<String, Map<String, String>>> out) {
try {
element = element.trim();
HashMap m = gson.fromJson(element, HashMap.class);
String ts = String.valueOf(m.get("ts"));
String uid = (String) m.getOrDefault("uid", "");
String le = Double.toString((Double) m.getOrDefault("le", 0));
if (!uid.equals("") && !le.equals("0")) {
Instant instant = Instant.parse(ts);
Map<String, String> map = new HashMap<>();
map.put("uid", uid);
map.put("ts", String.valueOf(instant.getMillis() / 1000));
map.put("le", le);
out.output(KV.of(uid, map));
} catch (Exception e) {
System.out.println("Parse json exception of ParseAndFilterFn:" + e.toString());
static class MaxFn extends Combine.CombineFn<Map<String, String>, Map<String, String>, Map<String, String>> {
public Map<String, String> createAccumulator() {
return new HashMap<>();
public Map<String, String> addInput(Map<String, String> mutableAccumulator, Map<String, String> input) {
int le = (int) Float.parseFloat((input.get("le")));
if (le > (int) Float.parseFloat(mutableAccumulator.getOrDefault("le", "0"))) {
mutableAccumulator.put("le", input.get("le"));
mutableAccumulator.put("ts", input.get("ts"));
return mutableAccumulator;
public Map<String, String> mergeAccumulators(Iterable<Map<String, String>> accumulators) {
HashMap<String, String> m = new HashMap<>();
for (Map<String, String> next : accumulators) {
String le = m.getOrDefault("le", "0");
String next_le = next.getOrDefault("le", "0");
if ((int) Float.parseFloat(le) < (int) Float.parseFloat(next_le)) {
m.put("le", next.get("le"));
m.put("ts", next.get("ts"));
return m;
public Map<String, String> extractOutput(Map<String, String> accumulator) {
return accumulator;
