Skip to content

Instantly share code, notes, and snippets.

@sandboxws
Created December 29, 2018 05:34
Show Gist options
  • Save sandboxws/c61130605c011125d6206eaa7f7e9643 to your computer and use it in GitHub Desktop.
Save sandboxws/c61130605c011125d6206eaa7f7e9643 to your computer and use it in GitHub Desktop.
TableRow Apache Beam Coder
package com.sandboxws.beam.coders;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* PostgreSQL TableRow to HashMap Coder.
*
* @author Ahmed El.Hussaini
*/
@SuppressWarnings({ "serial", "unchecked" })
public class TableRowCoder extends AtomicCoder<HashMap<String, Object>> {
private static final TableRowCoder INSTANCE = new TableRowCoder();
private static final TypeDescriptor<HashMap<String, Object>> TYPE_DESCRIPTOR =
new TypeDescriptor<HashMap<String, Object>>() { };
public static TableRowCoder of() {
return INSTANCE;
}
private TableRowCoder() {
}
@Override
public void encode(HashMap<String, Object> value, OutputStream outStream) throws IOException {
encode(value, outStream, Context.NESTED);
}
@Override
public void encode(HashMap<String, Object> row, OutputStream outStream, Context context) throws IOException {
Gson gson = new Gson();
StringUtf8Coder.of().encode(gson.toJson(row), outStream, context);
}
@Override
public HashMap<String, Object> decode(InputStream inStream) throws IOException {
return decode(inStream, Context.NESTED);
}
@Override
public HashMap<String, Object> decode(InputStream inStream, Context context) throws IOException {
String strValue = StringUtf8Coder.of().decode(inStream, context);
Gson gson = new Gson();
return gson.fromJson(strValue, HashMap.class);
}
@Override
protected long getEncodedElementByteSize(HashMap<String, Object> row) throws Exception {
Gson gson = new Gson();
return StringUtf8Coder.of().getEncodedElementByteSize(gson.toJson(row));
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this,
"HashMap<String, Object> can hold arbitrary instances, which may be non-deterministic.");
}
@Override
public TypeDescriptor<HashMap<String, Object>> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment