Skip to content

Instantly share code, notes, and snippets.

@asardaes
Last active August 14, 2023 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asardaes/714b8c1db0c4020f5fde9865b95fc398 to your computer and use it in GitHub Desktop.
Save asardaes/714b8c1db0c4020f5fde9865b95fc398 to your computer and use it in GitHub Desktop.
Flink set serialization for POJOs
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
/**
* See {@link ListSerializer}.
*
* @param <T> The type of the elements contained in the set.
*/
public class SetSerializer<T> extends TypeSerializer<Set<T>> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<T> elementSerializer;
public SetSerializer(TypeSerializer<T> elementSerializer) {
this.elementSerializer = elementSerializer;
}
public TypeSerializer<T> getElementSerializer() {
return elementSerializer;
}
@Override
public Set<T> createInstance() {
return new LinkedHashSet<>();
}
@Override
public int getLength() {
return -1;
}
@Override
public boolean isImmutableType() {
return false;
}
@Override
public TypeSerializer<Set<T>> duplicate() {
TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
return duplicateElement == elementSerializer
? this
: new SetSerializer<>(duplicateElement);
}
@Override
public Set<T> copy(Set<T> from) {
Set<T> set = new LinkedHashSet<>();
if (elementSerializer.isImmutableType()) {
set.addAll(from);
} else {
for (T element : from) {
set.add(elementSerializer.copy(element));
}
}
return set;
}
@Override
public Set<T> copy(Set<T> from, Set<T> reuse) {
return copy(from);
}
@Override
public void serialize(Set<T> record, DataOutputView target) throws IOException {
target.writeInt(record.size());
for (T element : record) {
elementSerializer.serialize(element, target);
}
}
@Override
public Set<T> deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
final Set<T> set = new LinkedHashSet<>();
for (int i = 0; i < size; i++) {
set.add(elementSerializer.deserialize(source));
}
return set;
}
@Override
public Set<T> deserialize(Set<T> reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
final int num = source.readInt();
target.writeInt(num);
for (int i = 0; i < num; i++) {
elementSerializer.copy(source, target);
}
}
@Override
public TypeSerializerSnapshot<Set<T>> snapshotConfiguration() {
return new SetSerializerSnapshot<>(this);
}
@Override
public boolean equals(Object obj) {
return obj == this || (
obj != null &&
obj.getClass() == getClass() &&
elementSerializer.equals(((SetSerializer<?>) obj).elementSerializer)
);
}
@Override
public int hashCode() {
return Objects.hash(SetSerializer.class, elementSerializer);
}
}
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.Set;
/**
* See {@link org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot ListSerializerSnapshot}.
*/
public class SetSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<Set<T>, SetSerializer<T>> {
private static final int CURRENT_VERSION = 1;
public SetSerializerSnapshot() {
super(SetSerializer.class);
}
public SetSerializerSnapshot(SetSerializer<T> serializerInstance) {
super(serializerInstance);
}
@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
protected TypeSerializer<?>[] getNestedSerializers(SetSerializer<T> outerSerializer) {
return new TypeSerializer<?>[]{outerSerializer.getElementSerializer()};
}
@Override
protected SetSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
return new SetSerializer<>(elementSerializer);
}
}
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.Set;
/**
* See {@link org.apache.flink.api.java.typeutils.ListTypeInfo ListTypeInfo}.
*
* @param <T> The type of the elements contained in the set.
*/
public class SetTypeInfo<T> extends TypeInformation<Set<T>> {
private static final long serialVersionUID = 1L;
private final TypeInformation<T> elementTypeInfo;
public SetTypeInfo(TypeInformation<T> elementTypeInfo) {
this.elementTypeInfo = elementTypeInfo;
}
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return false;
}
@Override
public int getArity() {
return 0;
}
@Override
public int getTotalFields() {
return 1;
}
@SuppressWarnings("unchecked")
@Override
public Class<Set<T>> getTypeClass() {
return (Class<Set<T>>) (Class<?>) Set.class;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<Set<T>> createSerializer(ExecutionConfig config) {
TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config);
return new SetSerializer<>(elementTypeSerializer);
}
@Override
public String toString() {
return "Set<" + elementTypeInfo + '>';
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj instanceof SetTypeInfo) {
final SetTypeInfo<?> other = (SetTypeInfo<?>) obj;
return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
} else {
return false;
}
}
@Override
public int hashCode() {
return 31 * elementTypeInfo.hashCode() + 1;
}
@Override
public boolean canEqual(Object obj) {
return obj != null && obj.getClass() == getClass();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment