-
-
Save bremac/1b3365bc0257dfbd33bcd0b7a7627c00 to your computer and use it in GitHub Desktop.
How to write high-availability tests that do not require the HA backend to keep checkpoints in memory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sightmachine.streaming.ha; | |
import com.google.common.collect.MapMaker; | |
import java.io.ObjectStreamException; | |
import java.io.Serializable; | |
import java.util.UUID; | |
import java.util.concurrent.ConcurrentMap; | |
import org.checkerframework.checker.nullness.qual.NonNull; | |
import org.checkerframework.checker.nullness.qual.Nullable; | |
import org.checkerframework.framework.qual.DefaultQualifier; | |
/** | |
* Attempts to deserialize subclasses into the same instance that was serialized. | |
* | |
* In-memory objects that inherit from {@code IdentityDeserializable} will deserialize to existing | |
* instances of those objects as long as the original instances are still referenced. On the | |
* other hand, if the object has been garbage collected then a new instance will be constructed | |
* instead. | |
* | |
* This is helpful for testing the behavior of objects that are serialized and deserialized, such | |
* as checkpoints and operators in Flink jobs. Normally these objects would deserialize as new | |
* instances, which makes it hard to store data in-memory. However if the object inherits from | |
* IdentityDeserializable then objects will deserialize to the same instance as long as the test | |
* holds at least one reference to the object in question. | |
*/ | |
@DefaultQualifier(NonNull.class) | |
public class IdentityDeserializable implements Serializable { | |
// Live instances of all objects that inherit from IdentityDeserializable. | |
private static final ConcurrentMap<UUID, IdentityDeserializable> INSTANCES = | |
new MapMaker().weakValues().makeMap(); | |
private final UUID uuid; // Universally unique identifier for the object. | |
public IdentityDeserializable() { | |
uuid = UUID.randomUUID(); | |
// This is safe since no other objects can access the instance without its UID -- the | |
// instance list doesn't escape this class, so there's no way for someone to access a | |
// partially-constructed object. | |
INSTANCES.put(uuid, this); | |
} | |
public final Object readResolve() throws ObjectStreamException { | |
@Nullable Object oldInstance = INSTANCES.putIfAbsent(uuid, this); | |
return oldInstance == null ? this : oldInstance; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sightmachine.streaming.ha; | |
import java.util.Map; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.flink.api.common.JobID; | |
import org.apache.flink.runtime.checkpoint.CheckpointProperties; | |
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; | |
import org.apache.flink.runtime.checkpoint.OperatorState; | |
import org.apache.flink.runtime.jobgraph.JobStatus; | |
import org.apache.flink.runtime.jobgraph.OperatorID; | |
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; | |
/** | |
* A test {@link CompletedCheckpoint}. We want to verify that the correct class loader is used when | |
* discarding. Spying on a regular {@link CompletedCheckpoint} instance with Mockito doesn't work, | |
* because it it breaks serializability. | |
*/ | |
public class TestCompletedCheckpoint extends CompletedCheckpoint { | |
/** | |
* A state object holds all of the in-memory state for a test checkpoint. This is used to test | |
* against backends that do not keep all of the checkpoints in memory at all times. If the test | |
* maintains at least one reference to the checkpoint then all deserialized copies of that | |
* checkpoint will reference the original state object. | |
*/ | |
private static final class State extends IdentityDeserializable { | |
/** Has the checkpoint been discarded? */ | |
private transient boolean isDiscarded; | |
/** Latch for implementations which discard asynchronously */ | |
private final transient CountDownLatch discardLatch; | |
State() { | |
isDiscarded = false; | |
discardLatch = new CountDownLatch(1); | |
} | |
public void discard() { | |
isDiscarded = true; | |
discardLatch.countDown(); | |
} | |
public boolean isDiscarded() { | |
return isDiscarded; | |
} | |
public void awaitDiscard() throws InterruptedException { | |
discardLatch.await(); | |
} | |
public boolean awaitDiscard(long timeout) throws InterruptedException { | |
return discardLatch.await(timeout, TimeUnit.MILLISECONDS); | |
} | |
} | |
private final State state = new State(); | |
@Override | |
public boolean discardOnSubsume() throws Exception { | |
if (super.discardOnSubsume()) { | |
state.discard(); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
// etc. | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment