Skip to content

Instantly share code, notes, and snippets.

@pivovarit
Last active May 4, 2022 16:13
Show Gist options
  • Save pivovarit/0cc15abea9ecd75a8c4f742b1f1d4b22 to your computer and use it in GitHub Desktop.
Save pivovarit/0cc15abea9ecd75a8c4f742b1f1d4b22 to your computer and use it in GitHub Desktop.
package com.pivovarit;
import java.util.Arrays;
import java.util.Objects;
/**
* Conveys a piece of data between one producer thread and arbitrarily many
* consumer threads. The producer may at any time call
* {@link #setSerialized(byte[])} to publish an object in serialized form.
* After that any consumer may call {@link #getDeserialized()} any number of
* times and it will retrieve the deserialized object.
*
* <h3>Assumptions on the use case</h3>
* <ol><li>
* The producer thread has many responsibilities, its time spent at any one
* task must be minimized.
* </li><li>
* Deserialization is expensive, therefore the producer must be relieved from
* it.
* </li><li>
* The usage pattern is read-heavy: there are many more invocations of
* {@link #getDeserialized()} than of {@link #setSerialized(byte[])}.
* </li><li>
* Each returned instance will probably be retained on the heap for a long time.
* </li></ol>
*
* <h3>Desired characteristics of the implementation</h3>
* <ol><li>
* Deserialization is lazy: if there is no invocation of {@link #getDeserialized()},
* then no deserialization happens.
* </li><li>
* Once a consumer deserializes the value, this object caches and shares it with
* future consumers.
* </li><li>
* The invocations of {@link #getDeserialized()} will return at most as many
* distinct instances as there were invocations of
* {@link #setSerialized(byte[])}.
* </li><li>
* {@link #setSerialized(byte[])} is wait-free: it always completes in a finite
* number of steps, regardless of any concurrent invocations of
* {@link #getDeserialized()}.
* </li><li>
* {@link #getDeserialized()} is wait-free with respect to
* {@link #setSerialized(byte[])}: the producer thread may do anything, such
* as calling {@link #setSerialized(byte[])} at a very high rate or getting
* indefinitely suspended within an invocation, without affecting the ability
* of {@link #getDeserialized()} to complete in a finite number of steps.
* </li><li>
* {@link #getDeserialized()} is also wait-free against itself (concurrent
* invocations don't interfere with each other), with one allowed exception:
* when it observes a new serialized value, it may choose to block some of
* the other invocations of {@link #getDeserialized()} until it completes.
* More formally, after the following sequence of events has occurred:
* <ol><li>
* the producer completes its last invocation of {@link #setSerialized(byte[])};
* </li><li>
* a consumer starts an invocation of {@link #getDeserialized()};
* </li><li>
* the invocation completes by returning the object deserialized from the
* blob set by that last invocation,
* </li></ol>
* all future invocations of {@link #getDeserialized()} are wait-free. Note
* that an implementation without the above exception is also possible.
* </li><li>
* {@link #getDeserialized()} exhibits (at least) <em>eventually consistent,
* monotonic read</em> behavior: once a consumer has observed an object derived
* from a serialized value S, it will never observe an object derived from a
* serialized value older than S, nor will it observe the initial {@code null}
* value. If at any point the producer invokes {@link #setSerialized(byte[])}
* for the last time and the consumer keeps invoking {@link #getDeserialized()},
* eventually it will return an object deserialized from that final producer's
* invocation.
* </li></ol>
*/
public class ConcurrentDeserializer {
private volatile DeserializedValue value;
/**
* Sets a new serialized value. Exclusively called by a single producer
* thread.
*/
public void setSerialized(byte[] blob) {
Objects.requireNonNull(blob);
value = new DeserializedValue(Arrays.copyOf(blob, blob.length));
}
/**
* Returns the result of deserializing a blob previously set by the
* producer thread. Called by arbitrarily many consumer threads. Initially
* (before the first invocation of {@link #setSerialized(byte[])}) the
* method returns {@code null}.
*/
public Object getDeserialized() {
return value == null ? null : value.get();
}
// Details of deserialization are out of scope for this assignment.
// You may use this mock implementation:
private Object deserialize(byte[] blob) {
if (blob == null) {
return null;
}
return new String(blob);
}
private final class DeserializedValue {
private volatile byte[] payload;
private volatile Object deserialized;
private final Object lock = new Object();
public DeserializedValue(byte[] payload) {
this.payload = payload;
}
public Object get() {
if (deserialized == null) {
synchronized (lock) {
if (deserialized == null) {
deserialized = deserialize(payload);
payload = null;
}
}
}
return deserialized;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment