Skip to content

Instantly share code, notes, and snippets.

@bfabry
Created January 30, 2017 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bfabry/f89aa19eac563b83840839a6add02829 to your computer and use it in GitHub Desktop.
Save bfabry/f89aa19eac563b83840839a6add02829 to your computer and use it in GitHub Desktop.
package clojure_dataflow;
import clojure.java.api.Clojure;
import clojure.lang.IFn;
import clojure.lang.Symbol;
import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import java.util.Map;
/**
* This class exists because DoFn relies on overriding abstract methods,
* but DoFnWithContext relies on overriding and annotating abstract methods,
* since annotating classmethods in clojure is a pain, we made this class.
* To use this class, extend it using `proxy` and override the abstract methods.
*/
public class CljDoFnWithContext<InputT, OutputT> extends DoFnWithContext<InputT, OutputT> {
Object name;
Object inputCoder;
Object wrappingCall;
Object cljCall;
Object startBundleResult;
Object creationStack;
IFn processElementFn;
public synchronized static void synchronizedRequire(Symbol namespace) {
Clojure.var("clojure.core","require").invoke(namespace);
}
public CljDoFnWithContext(Object aname, Object ainputCoder, Object awrappingCall, Object acljCall) {
this.name = aname;
this.inputCoder = ainputCoder;
this.wrappingCall = awrappingCall;
this.cljCall = acljCall;
this.creationStack = Thread.currentThread().getStackTrace();
}
@StartBundle
public void startBundle(Context context) {
processElementFn = Clojure.var("clojure-dataflow.pardo","process-element");
if (!((clojure.lang.Var)processElementFn).isBound())
synchronizedRequire((Symbol)Clojure.var("clojure.core","symbol").invoke("clojure-dataflow.pardo"));
startBundleResult = Clojure.var("clojure-dataflow.pardo","start-bundle").invoke(context, name, inputCoder, wrappingCall, cljCall);
}
@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window) {
processElementFn.invoke(context, window, name, inputCoder, wrappingCall, cljCall, creationStack, startBundleResult);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment