Skip to content

Instantly share code, notes, and snippets.

@mxm
Created February 24, 2015 17:09
Show Gist options
  • Save mxm/d1929b4b69dda87d5c37 to your computer and use it in GitHub Desktop.
Save mxm/d1929b4b69dda87d5c37 to your computer and use it in GitHub Desktop.
Custom Flink Serializer
package org.apache.flink.mytests.customserializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
import java.io.IOException;
import java.util.Arrays;
public class CustomSerializer {
public static class Vector implements Value {
private transient double[] doubleValues;
public Vector() {
}
public Vector(double[] doubleValues) {
this.doubleValues = doubleValues;
}
public double getElement(int position) {
return doubleValues[position];
}
public void setElement(double value, int position) {
doubleValues[position] = value;
}
public void multiply(int factor) {
for (int i = 0; i < doubleValues.length; i++) {
doubleValues[i] *= factor;
}
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(doubleValues.length);
for (double value : doubleValues) {
out.writeDouble(value);
}
}
@Override
public void read(DataInputView in) throws IOException {
int length = in.readInt();
double[] array = new double[length];
for (int i = 0; i < length; i++) {
array[i] = in.readDouble();
}
this.doubleValues = array;
}
@Override
public String toString() {
return "Vector{" +
"doubleValues=" + Arrays.toString(doubleValues) +
'}';
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Vector[] vectorList = new Vector[1024];
// create some sample data
for (int v = 0; v < vectorList.length; v++) {
double[] arr = new double[128];
for (int i = 0; i < arr.length; i++) {
arr[i] = i * 1.23 * v;
}
vectorList[v] = new Vector(arr);
}
// create data set
DataSet<Vector> source = env.fromElements(vectorList);
// multiply all vectors by 2
DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() {
private static final long serialVersionUID = -1511665386949403921L;
@Override
public Vector map(Vector value) throws Exception {
value.multiply(2);
return value;
}
});
ds.print();
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment