Skip to content

Instantly share code, notes, and snippets.

@bremac
Created September 17, 2019 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bremac/1b3365bc0257dfbd33bcd0b7a7627c00 to your computer and use it in GitHub Desktop.
Save bremac/1b3365bc0257dfbd33bcd0b7a7627c00 to your computer and use it in GitHub Desktop.
How to write high-availability tests that do not require the HA backend to keep checkpoints in memory
package com.sightmachine.streaming.ha;
import com.google.common.collect.MapMaker;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.framework.qual.DefaultQualifier;
/**
* Attempts to deserialize subclasses into the same instance that was serialized.
*
* In-memory objects that inherit from {@code IdentityDeserializable} will deserialize to existing
* instances of those objects as long as the original instances are still referenced. On the
* other hand, if the object has been garbage collected then a new instance will be constructed
* instead.
*
* This is helpful for testing the behavior of objects that are serialized and deserialized, such
* as checkpoints and operators in Flink jobs. Normally these objects would deserialize as new
* instances, which makes it hard to store data in-memory. However if the object inherits from
* IdentityDeserializable then objects will deserialize to the same instance as long as the test
* holds at least one reference to the object in question.
*/
@DefaultQualifier(NonNull.class)
public class IdentityDeserializable implements Serializable {
// Live instances of all objects that inherit from IdentityDeserializable.
private static final ConcurrentMap<UUID, IdentityDeserializable> INSTANCES =
new MapMaker().weakValues().makeMap();
private final UUID uuid; // Universally unique identifier for the object.
public IdentityDeserializable() {
uuid = UUID.randomUUID();
// This is safe since no other objects can access the instance without its UID -- the
// instance list doesn't escape this class, so there's no way for someone to access a
// partially-constructed object.
INSTANCES.put(uuid, this);
}
public final Object readResolve() throws ObjectStreamException {
@Nullable Object oldInstance = INSTANCES.putIfAbsent(uuid, this);
return oldInstance == null ? this : oldInstance;
}
}
package com.sightmachine.streaming.ha;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
/**
* A test {@link CompletedCheckpoint}. We want to verify that the correct class loader is used when
* discarding. Spying on a regular {@link CompletedCheckpoint} instance with Mockito doesn't work,
* because it it breaks serializability.
*/
public class TestCompletedCheckpoint extends CompletedCheckpoint {
/**
* A state object holds all of the in-memory state for a test checkpoint. This is used to test
* against backends that do not keep all of the checkpoints in memory at all times. If the test
* maintains at least one reference to the checkpoint then all deserialized copies of that
* checkpoint will reference the original state object.
*/
private static final class State extends IdentityDeserializable {
/** Has the checkpoint been discarded? */
private transient boolean isDiscarded;
/** Latch for implementations which discard asynchronously */
private final transient CountDownLatch discardLatch;
State() {
isDiscarded = false;
discardLatch = new CountDownLatch(1);
}
public void discard() {
isDiscarded = true;
discardLatch.countDown();
}
public boolean isDiscarded() {
return isDiscarded;
}
public void awaitDiscard() throws InterruptedException {
discardLatch.await();
}
public boolean awaitDiscard(long timeout) throws InterruptedException {
return discardLatch.await(timeout, TimeUnit.MILLISECONDS);
}
}
private final State state = new State();
@Override
public boolean discardOnSubsume() throws Exception {
if (super.discardOnSubsume()) {
state.discard();
return true;
} else {
return false;
}
}
// etc.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment