Skip to content

Instantly share code, notes, and snippets.

@ytaras
Created September 3, 2019 14:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ytaras/3fc0c9a11dedbfe82b806e8adf5dcbaf to your computer and use it in GitHub Desktop.
Save ytaras/3fc0c9a11dedbfe82b806e8adf5dcbaf to your computer and use it in GitHub Desktop.
package org.apache.beam.examples.usercount.transformexample;
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.FirestoreOptions;
import com.google.cloud.firestore.WriteBatch;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.examples.usercount.domain.ProgressEvent;
import org.apache.beam.sdk.transforms.DoFn;
public class FirestoreOutput extends DoFn<ProgressEvent, ProgressEvent> {
private List<ProgressEvent> mutations;
private Firestore client;
@StartBundle
public void start() {
this.mutations = new ArrayList<ProgressEvent>();
this.client = FirestoreOptions.getDefaultInstance()
.toBuilder()
.build().getService();
}
@ProcessElement
public void process(@Element ProgressEvent element, OutputReceiver<ProgressEvent> outputReceiver)
throws ExecutionException, InterruptedException {
this.mutations.add(element);
if(this.mutations.size() >= 200) {
this.flush();
}
outputReceiver.output(element);
}
@FinishBundle
public void finish() throws Exception {
if(!this.mutations.isEmpty()) {
this.flush();
}
this.client.close();
this.client = null;
}
private void flush() throws ExecutionException, InterruptedException {
WriteBatch batch = this.client.batch();
for (ProgressEvent mutation : mutations) {
Map<String, Object> doc = mutation.asFirestore();
batch.set(
this.client.collection("users").document(mutation.getUserId())
.collection("movie_progress").document(mutation.getTitleId()),
doc
);
}
batch.commit().get();
this.mutations = new ArrayList<>();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment