Last active
August 14, 2023 13:05
-
-
Save asardaes/714b8c1db0c4020f5fde9865b95fc398 to your computer and use it in GitHub Desktop.
Flink set serialization for POJOs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.typeutils.TypeSerializer; | |
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; | |
import org.apache.flink.api.common.typeutils.base.ListSerializer; | |
import org.apache.flink.core.memory.DataInputView; | |
import org.apache.flink.core.memory.DataOutputView; | |
import java.io.IOException; | |
import java.util.LinkedHashSet; | |
import java.util.Objects; | |
import java.util.Set; | |
/** | |
* See {@link ListSerializer}. | |
* | |
* @param <T> The type of the elements contained in the set. | |
*/ | |
public class SetSerializer<T> extends TypeSerializer<Set<T>> { | |
private static final long serialVersionUID = 1L; | |
private final TypeSerializer<T> elementSerializer; | |
public SetSerializer(TypeSerializer<T> elementSerializer) { | |
this.elementSerializer = elementSerializer; | |
} | |
public TypeSerializer<T> getElementSerializer() { | |
return elementSerializer; | |
} | |
@Override | |
public Set<T> createInstance() { | |
return new LinkedHashSet<>(); | |
} | |
@Override | |
public int getLength() { | |
return -1; | |
} | |
@Override | |
public boolean isImmutableType() { | |
return false; | |
} | |
@Override | |
public TypeSerializer<Set<T>> duplicate() { | |
TypeSerializer<T> duplicateElement = elementSerializer.duplicate(); | |
return duplicateElement == elementSerializer | |
? this | |
: new SetSerializer<>(duplicateElement); | |
} | |
@Override | |
public Set<T> copy(Set<T> from) { | |
Set<T> set = new LinkedHashSet<>(); | |
if (elementSerializer.isImmutableType()) { | |
set.addAll(from); | |
} else { | |
for (T element : from) { | |
set.add(elementSerializer.copy(element)); | |
} | |
} | |
return set; | |
} | |
@Override | |
public Set<T> copy(Set<T> from, Set<T> reuse) { | |
return copy(from); | |
} | |
@Override | |
public void serialize(Set<T> record, DataOutputView target) throws IOException { | |
target.writeInt(record.size()); | |
for (T element : record) { | |
elementSerializer.serialize(element, target); | |
} | |
} | |
@Override | |
public Set<T> deserialize(DataInputView source) throws IOException { | |
final int size = source.readInt(); | |
final Set<T> set = new LinkedHashSet<>(); | |
for (int i = 0; i < size; i++) { | |
set.add(elementSerializer.deserialize(source)); | |
} | |
return set; | |
} | |
@Override | |
public Set<T> deserialize(Set<T> reuse, DataInputView source) throws IOException { | |
return deserialize(source); | |
} | |
@Override | |
public void copy(DataInputView source, DataOutputView target) throws IOException { | |
final int num = source.readInt(); | |
target.writeInt(num); | |
for (int i = 0; i < num; i++) { | |
elementSerializer.copy(source, target); | |
} | |
} | |
@Override | |
public TypeSerializerSnapshot<Set<T>> snapshotConfiguration() { | |
return new SetSerializerSnapshot<>(this); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
return obj == this || ( | |
obj != null && | |
obj.getClass() == getClass() && | |
elementSerializer.equals(((SetSerializer<?>) obj).elementSerializer) | |
); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(SetSerializer.class, elementSerializer); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; | |
import org.apache.flink.api.common.typeutils.TypeSerializer; | |
import java.util.Set; | |
/** | |
* See {@link org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot ListSerializerSnapshot}. | |
*/ | |
public class SetSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<Set<T>, SetSerializer<T>> { | |
private static final int CURRENT_VERSION = 1; | |
public SetSerializerSnapshot() { | |
super(SetSerializer.class); | |
} | |
public SetSerializerSnapshot(SetSerializer<T> serializerInstance) { | |
super(serializerInstance); | |
} | |
@Override | |
protected int getCurrentOuterSnapshotVersion() { | |
return CURRENT_VERSION; | |
} | |
@Override | |
protected TypeSerializer<?>[] getNestedSerializers(SetSerializer<T> outerSerializer) { | |
return new TypeSerializer<?>[]{outerSerializer.getElementSerializer()}; | |
} | |
@Override | |
protected SetSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { | |
@SuppressWarnings("unchecked") | |
TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0]; | |
return new SetSerializer<>(elementSerializer); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.common.typeutils.TypeSerializer; | |
import java.util.Set; | |
/** | |
* See {@link org.apache.flink.api.java.typeutils.ListTypeInfo ListTypeInfo}. | |
* | |
* @param <T> The type of the elements contained in the set. | |
*/ | |
public class SetTypeInfo<T> extends TypeInformation<Set<T>> { | |
private static final long serialVersionUID = 1L; | |
private final TypeInformation<T> elementTypeInfo; | |
public SetTypeInfo(TypeInformation<T> elementTypeInfo) { | |
this.elementTypeInfo = elementTypeInfo; | |
} | |
@Override | |
public boolean isBasicType() { | |
return false; | |
} | |
@Override | |
public boolean isTupleType() { | |
return false; | |
} | |
@Override | |
public int getArity() { | |
return 0; | |
} | |
@Override | |
public int getTotalFields() { | |
return 1; | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public Class<Set<T>> getTypeClass() { | |
return (Class<Set<T>>) (Class<?>) Set.class; | |
} | |
@Override | |
public boolean isKeyType() { | |
return false; | |
} | |
@Override | |
public TypeSerializer<Set<T>> createSerializer(ExecutionConfig config) { | |
TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config); | |
return new SetSerializer<>(elementTypeSerializer); | |
} | |
@Override | |
public String toString() { | |
return "Set<" + elementTypeInfo + '>'; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (obj == this) { | |
return true; | |
} else if (obj instanceof SetTypeInfo) { | |
final SetTypeInfo<?> other = (SetTypeInfo<?>) obj; | |
return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo); | |
} else { | |
return false; | |
} | |
} | |
@Override | |
public int hashCode() { | |
return 31 * elementTypeInfo.hashCode() + 1; | |
} | |
@Override | |
public boolean canEqual(Object obj) { | |
return obj != null && obj.getClass() == getClass(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment