Skip to content

Instantly share code, notes, and snippets.

@bfabry
Created January 30, 2017 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bfabry/8df6cbf7a955484f388a44289896c480 to your computer and use it in GitHub Desktop.
Save bfabry/8df6cbf7a955484f388a44289896c480 to your computer and use it in GitHub Desktop.
package clojure_dataflow;
import clojure.java.api.Clojure;
import clojure.lang.Symbol;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default coder for all Clojure data structures. Not used for coding KV keys since it's not deterministic.
*/
public class NippyCoder extends CustomCoder<Object> {
private static final Logger LOG = LoggerFactory.getLogger(NippyCoder.class);
public static NippyCoder of() {
return new NippyCoder();
}
private void clojurePrint(OutputStream out, Object o) {
try {
Clojure.var("clojure-dataflow.nippy", "fast-encode-stream").invoke(out, o);
}
catch (IllegalStateException e) {
CljDoFnWithContext.synchronizedRequire((Symbol)Clojure.var("clojure.core","symbol").invoke("clojure-dataflow.nippy"));
clojurePrint(out,o);
}
}
private Object clojureRead(InputStream in) {
try {
return Clojure.var("clojure-dataflow.nippy", "fast-decode-stream").invoke(in);
}
catch (IllegalStateException e) {
CljDoFnWithContext.synchronizedRequire((Symbol)Clojure.var("clojure.core","symbol").invoke("clojure-dataflow.nippy"));
return clojureRead(in);
}
}
public void encode(Object o, OutputStream out, Coder.Context context) throws IOException {
clojurePrint(out, o);
}
public Object decode(InputStream in, Coder.Context context) throws IOException {
return clojureRead(in);
}
public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this, "nippy is not deterministic");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment