Skip to content

Instantly share code, notes, and snippets.

@anandabhishek73
Created August 18, 2021 07:51
Show Gist options
  • Save anandabhishek73/49db2f044eaa44accd19cb602e77644b to your computer and use it in GitHub Desktop.
Save anandabhishek73/49db2f044eaa44accd19cb602e77644b to your computer and use it in GitHub Desktop.
Java Multi Producer Multi Consumer, Thread Safe, Blocking Delayed Queue
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class DelayQueue<E> implements BlockingQueue<E> {
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
final Condition newInsertion = writeLock.newCondition();
final Condition newRemoval = writeLock.newCondition();
TreeMap<LocalDateTime, MessageEvent> taskQueue = new TreeMap<>();
private int messageCounter = 0;
/**
* Returns the number of elements in this collection, inclusive of future elements. If this collection
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of elements in this collection
*/
@Override
public int size() {
return taskQueue.size();
}
/**
* Returns <tt>true</tt> if this collection contains no elements.
*
* @return <tt>true</tt> if this collection contains no elements
*/
@Override
public boolean isEmpty() {
return taskQueue.isEmpty();
}
/**
* Returns <tt>true</tt> if this collection contains the specified element.
* More formally, returns <tt>true</tt> if and only if this collection
* contains at least one element <tt>e</tt> such that
* <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
*
* @param o element whose presence in this collection is to be tested
* @return <tt>true</tt> if this collection contains the specified
* element
* @throws ClassCastException if the type of the specified element
* is incompatible with this collection
* (<a href="#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null and this
* collection does not permit null elements
* (<a href="#optional-restrictions">optional</a>)
*/
@Override
public boolean contains(Object o) {
return taskQueue.containsValue(o);
}
/**
* Removes all available elements from this queue and adds them
* to the given collection. This operation may be more
* efficient than repeatedly polling this queue. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
* <p>
* Note that it will drain all the future elements too,
* without considering the delay.
*
* @param c the collection to transfer elements into
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
@Override
public int drainTo(Collection<? super E> c) {
writeLock.lock();
try {
return drainTo(c, taskQueue.size());
} finally {
writeLock.unlock();
}
}
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
* <p>
* Note that it will drain all the future elements too,
* without considering the delay.
*
* @param c the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int count = 0;
writeLock.lock();
try {
for (MessageEvent event : taskQueue.values()) {
if ((count >= maxElements)) {
break;
} else {
count++;
}
c.add(event.getMessage());
}
} finally {
newRemoval.signalAll();
writeLock.unlock();
}
return count;
}
/**
* Returns an iterator over the AVAILABLE elements in this collection. The
* order in which the elements are returned guaranteed to be sorted in ascending
* order of available time.
* <p>
* NOTE: Only available(execution time < current time) elements at the time of iterator
* creation will be iterated by this iterator.
*
* @return an <tt>Iterator</tt> over the available elements in this queue
*/
@Override
public Iterator<E> iterator() {
readLock.lock();
try {
Iterator<MessageEvent> internalIterator = taskQueue.headMap(LocalDateTime.now()).values().iterator();
return new Iterator<E>() {
@Override
public boolean hasNext() {
return internalIterator.hasNext();
}
@Override
public E next() {
return internalIterator.next().getMessage();
}
};
} finally {
readLock.unlock();
}
}
/**
* Returns an array containing all of the elements in this collection.
* Guaranteed to be in order its elements' availability time. Same elements
* are returned by its iterator, and return the elements in
* the same order.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this collection. (In other words, this method must
* allocate a new array even if this collection is backed by an array).
* The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this collection
*/
@Override
public Object[] toArray() {
ArrayList<E> list = new ArrayList<>(size());
for (E element :
this) {
list.add(element);
}
return list.toArray();
}
/**
* Returns an array containing all of the elements in this collection;
* the runtime type of the returned array is that of the specified array.
* If the collection fits in the specified array, it is returned therein.
* Otherwise, a new array is allocated with the runtime type of the
* specified array and the size of this collection.
*
* <p>If this collection fits in the specified array with room to spare
* (i.e., the array has more elements than this collection), the element
* in the array immediately following the end of the collection is set to
* <tt>null</tt>. (This is useful in determining the length of this
* collection <i>only</i> if the caller knows that this collection does
* not contain any <tt>null</tt> elements.)
*
* <p>If this collection makes any guarantees as to what order its elements
* are returned by its iterator, this method must return the elements in
* the same order.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>Suppose <tt>x</tt> is a collection known to contain only strings.
* The following code can be used to dump the collection into a newly
* allocated array of <tt>String</tt>:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
* <p>
* Note that <tt>toArray(new Object[0])</tt> is identical in function to
* <tt>toArray()</tt>.
*
* @param a the array into which the elements of this collection are to be
* stored, if it is big enough; otherwise, a new array of the same
* runtime type is allocated for this purpose.
* @return an array containing all of the elements in this collection
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this collection
* @throws NullPointerException if the specified array is null
*/
@SuppressWarnings("unchecked")
@Override
public <T> T[] toArray(T[] a) {
return (T[]) toArray();
}
/**
* Inserts the specified element into this queue if it is possible to do so
* immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an {@code IllegalStateException}
* if no space is currently available.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
@Override
public boolean add(E e) {
return add(e, Duration.ZERO);
}
@SuppressWarnings("unchecked")
public boolean add(E e, Duration delay) {
MessageEvent event = new MessageEvent(e, delay, LocalDateTime.now());
log.trace("Created new event to insert : {}", event);
writeLock.lock();
log.trace("Acquired writeLock in add()");
try {
while (taskQueue.containsKey(event.getAvailabilityTime())) {
log.debug("Event ({}) cannot be inserted in queue", event);
event.setDelay(event.getDelay().plusNanos(10));
}
taskQueue.put(event.getAvailabilityTime(), event);
newInsertion.signal();
return true;
} finally {
log.trace("Releasing writeLock in add()");
writeLock.unlock();
}
}
/**
* Not efficient.
* <p>
* Removes a single instance of the specified element from this
* collection, if it is present (optional operation). More formally,
* removes an element <tt>e</tt> such that
* <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>, if
* this collection contains one or more such elements. Returns
* <tt>true</tt> if this collection contained the specified element (or
* equivalently, if this collection changed as a result of the call).
*
* @param o element to be removed from this collection, if present
* @return <tt>true</tt> if an element was removed as a result of this call
* @throws ClassCastException if the type of the specified element
* is incompatible with this collection
* (<a href="#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null and this
* collection does not permit null elements
* (<a href="#optional-restrictions">optional</a>)
* @throws UnsupportedOperationException if the <tt>remove</tt> operation
* is not supported by this collection
*/
@Override
public boolean remove(Object o) {
writeLock.lock();
try {
for (MessageEvent event : taskQueue.values()) {
if (event.getMessage().equals(o)) {
newRemoval.notify();
return taskQueue.remove(event.getAvailabilityTime(), event);
}
}
return false;
} finally {
writeLock.unlock();
}
}
/**
* Returns <tt>true</tt> if this collection contains all of the elements
* in the specified collection.
*
* @param c collection to be checked for containment in this collection
* @return <tt>true</tt> if this collection contains all of the elements
* in the specified collection
* @throws ClassCastException if the types of one or more elements
* in the specified collection are incompatible with this
* collection
* (<a href="#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified collection contains one
* or more null elements and this collection does not permit null
* elements
* (<a href="#optional-restrictions">optional</a>),
* or if the specified collection is null.
* @see #contains(Object)
*/
@Override
public boolean containsAll(Collection<?> c) {
return taskQueue.values().containsAll(c);
}
/**
* Adds all of the elements in the specified collection to this collection
* (optional operation). The behavior of this operation is undefined if
* the specified collection is modified while the operation is in progress.
* (This implies that the behavior of this call is undefined if the
* specified collection is this collection, and this collection is
* nonempty.)
*
* @param c collection containing elements to be added to this collection
* @return <tt>true</tt> if this collection changed as a result of the call
* @throws UnsupportedOperationException if the <tt>addAll</tt> operation
* is not supported by this collection
* @throws ClassCastException if the class of an element of the specified
* collection prevents it from being added to this collection
* @throws NullPointerException if the specified collection contains a
* null element and this collection does not permit null elements,
* or if the specified collection is null
* @throws IllegalArgumentException if some property of an element of the
* specified collection prevents it from being added to this
* collection
* @throws IllegalStateException if not all the elements can be added at
* this time due to insertion restrictions
* @see #add(Object)
*/
@Override
public boolean addAll(Collection<? extends E> c) {
for (E element : c) {
if (!add(element)) {
return false;
}
}
return true;
}
/**
* Very inefficient.
* <p>
* Removes all of this collection's elements that are also contained in the
* specified collection (optional operation). After this call returns,
* this collection will contain no elements in common with the specified
* collection.
*
* @param c collection containing elements to be removed from this collection
* @return <tt>true</tt> if this collection changed as a result of the
* call
* @throws UnsupportedOperationException if the <tt>removeAll</tt> method
* is not supported by this collection
* @throws ClassCastException if the types of one or more elements
* in this collection are incompatible with the specified
* collection
* (<a href="#optional-restrictions">optional</a>)
* @throws NullPointerException if this collection contains one or more
* null elements and the specified collection does not support
* null elements
* (<a href="#optional-restrictions">optional</a>),
* or if the specified collection is null
* @see #remove(Object)
* @see #contains(Object)
*/
@Override
public boolean removeAll(Collection<?> c) {
for (Object element : c) {
if (!remove(element)) {
return false;
}
}
return true;
}
/**
* Retains only the elements in this collection that are contained in the
* specified collection (optional operation). In other words, removes from
* this collection all of its elements that are not contained in the
* specified collection.
*
* @param c collection containing elements to be retained in this collection
* @return <tt>true</tt> if this collection changed as a result of the call
* @throws UnsupportedOperationException if the <tt>retainAll</tt> operation
* is not supported by this collection
* @throws ClassCastException if the types of one or more elements
* in this collection are incompatible with the specified
* collection
* (<a href="#optional-restrictions">optional</a>)
* @throws NullPointerException if this collection contains one or more
* null elements and the specified collection does not permit null
* elements
* (<a href="#optional-restrictions">optional</a>),
* or if the specified collection is null
* @see #remove(Object)
* @see #contains(Object)
*/
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported");
}
/**
* Removes all of the elements from this collection (optional operation).
* The collection will be empty after this method returns.
*
* @throws UnsupportedOperationException if the <tt>clear</tt> operation
* is not supported by this collection
*/
@Override
public void clear() {
writeLock.lock();
try {
taskQueue.clear();
} finally {
newRemoval.signalAll();
writeLock.unlock();
}
}
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions.
* When using a capacity-restricted queue, this method is generally
* preferable to {@link #add}, which can fail to insert an element only
* by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
@Override
public boolean offer(E e) {
return add(e);
}
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
@Override
public void put(E e) throws InterruptedException {
add(e);
}
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
@Override
public E take() throws InterruptedException {
return getNext(Long.MAX_VALUE, TimeUnit.SECONDS);
}
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return getNext(timeout, unit);
}
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
* @return the remaining capacity
*/
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE - size();
}
/**
* Retrieves and removes the head of this queue. This method differs
* from {@link #poll poll} only in that it throws an exception if this
* queue is empty.
*
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
@Override
public E remove() {
return poll();
}
/**
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
@Override
public E poll() {
try {
return getNext();
} catch (InterruptedException e) {
log.warn("Interrupted while removing from head of queue", e);
return null;
}
}
/**
* Retrieves, but does not remove, the head of this queue. This method
* differs from {@link #peek peek} only in that it throws an exception
* if this queue is empty.
*
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
@Override
public E element() {
return Optional.ofNullable(peek()).orElseThrow(NoSuchElementException::new);
}
/**
* Retrieves, but does not remove, the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
@Override
public E peek() {
readLock.lock();
try {
return Optional.ofNullable(taskQueue.firstEntry())
.filter(entry -> entry.getKey().isBefore(LocalDateTime.now()))
.map(entry -> entry.getValue().getMessage())
.orElse(null);
} finally {
readLock.unlock();
}
}
private boolean isAvailable() {
if (taskQueue.isEmpty()) return false;
return Optional.ofNullable(taskQueue.firstEntry())
.map(entry -> entry.getKey().isBefore(LocalDateTime.now()))
.orElse(false);
}
public E getNext() throws InterruptedException {
return getNext(0, TimeUnit.MILLISECONDS);
}
/**
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
public E getNext(long timeout, TimeUnit unit) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now();
Duration totalWaitTime = Duration.ofMillis(unit.toMillis(timeout));
LocalDateTime expiryTime = startTime.plus(totalWaitTime);
while (LocalDateTime.now().isBefore(expiryTime)) {
writeLock.lock();
log.trace("Locked writeLock in getNext()");
try {
if (!isAvailable()) {
if (taskQueue.isEmpty()) {
log.debug("Nothing to consume. Waiting for next insertion.");
newInsertion.await(timeout, unit);
} else {
Duration nextAvailableAfter = Duration.between(LocalDateTime.now(), taskQueue.firstKey());
Duration maxWaitDuration = Duration.between(LocalDateTime.now(), expiryTime);
Duration finalDuration = nextAvailableAfter.compareTo(maxWaitDuration) <= 0 ? nextAvailableAfter : maxWaitDuration;
finalDuration = finalDuration.isNegative() ? Duration.ZERO : finalDuration;
log.debug("Sleeping for ({}) till next availability.", finalDuration);
newInsertion.await(finalDuration.toMillis(), TimeUnit.MILLISECONDS);
}
}
if (!isAvailable()) {
continue;
}
MessageEvent event = taskQueue.pollFirstEntry().getValue();
log.debug("Next Element is : {}", event);
newRemoval.signal();
return event.getMessage();
} finally {
log.trace("Unlocking writeLock in getNext()");
writeLock.unlock();
}
}
log.info("expiryTime of duration ({}) elapsed", totalWaitTime);
return null;
}
@Data
public class MessageEvent implements Comparable<MessageEvent> {
E message;
Duration delay;
LocalDateTime creationTime;
Integer serialNumber;
public MessageEvent(E message, Duration delay, LocalDateTime creationTime) {
this.message = message;
this.delay = delay;
this.creationTime = creationTime;
this.serialNumber = messageCounter++;
}
public LocalDateTime getAvailabilityTime() {
return creationTime.plus(delay);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o instanceof DelayQueue.MessageEvent) {
MessageEvent messageEvent = (MessageEvent) o;
return this.message.equals(messageEvent.message);
} else {
return this.message.equals(o);
}
}
@Override
public int hashCode() {
return Objects.hash(message);
}
@Override
public String toString() {
return "MessageEvent{" +
"serialNumber=" + serialNumber +
", delay=" + delay +
", availabilityTime=" + getAvailabilityTime() +
'}';
}
@Override
public int compareTo(DelayQueue<E>.MessageEvent o) {
return this.getAvailabilityTime().compareTo(o.getAvailabilityTime());
}
}
}
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
@Slf4j
class DelayQueueTest {
DelayQueue<String> queue;
@BeforeEach
void setup() {
queue = new DelayQueue<>();
}
@Test
void add() {
assertTrue(queue.add("First Message", Duration.of(3, ChronoUnit.SECONDS)));
assertEquals(1, queue.size());
assertTrue(queue.add("Second Message", Duration.of(1, ChronoUnit.SECONDS)));
assertEquals(2, queue.size());
}
@Test
void getNext() throws InterruptedException {
queue.add("First Message", Duration.of(3, ChronoUnit.SECONDS));
queue.add("Second Message", Duration.of(3, ChronoUnit.SECONDS));
queue.add("Third Message", Duration.of(1, ChronoUnit.SECONDS));
log.info("#1 message : {}", queue.getNext());
log.info("#2 message : {}", queue.getNext());
log.info("#3 message : {}", queue.getNext());
log.info("#4 message : {}", queue.getNext());
}
@Test
void singleProducerConsumer() throws InterruptedException {
int messageCount = 10;
Thread producer = new Thread(() -> {
Random random = new Random(3246576432143L);
for (int i = messageCount; i > 0; i--) {
queue.add("Message : " + i, Duration.of(i * 100, ChronoUnit.MILLIS));
}
});
producer.start();
Random random = new Random(134514325443L);
for (int i = 0; i < messageCount; i++) {
// String message = queue.getNext(random.nextInt(10), TimeUnit.SECONDS);
String message = queue.getNext(10, TimeUnit.SECONDS);
assertNotNull(message);
log.info("#{}th Message is :{}", i, message);
}
producer.join();
}
@Test
void multiConsumerProducer() {
final ScheduledExecutorService producerService = Executors.newScheduledThreadPool(4);
final ExecutorService consumerService = Executors.newFixedThreadPool(4);
Random random = new Random(134514325443L);
DelayQueue<LocalDateTime> queue = new DelayQueue<>();
int count = 1000;
List<LocalDateTime> eventsAdded = new LinkedList<>();
for (int i = 0; i < count; i++) {
final int order = i;
long scheduleTime = (long) random.nextInt(1000);
Duration delay = Duration.ofMillis((long) random.nextInt(1000) + 1000);
producerService.schedule((Runnable) () -> {
log.info("Adding ({})th event with delay {}", order, delay);
// synchronized (queue) {
LocalDateTime expectedRunTime = LocalDateTime.now().plus(delay);
queue.add(expectedRunTime, delay);
eventsAdded.add(expectedRunTime);
// }
}, scheduleTime, TimeUnit.MILLISECONDS);
log.info("Adding ({})th schedule with delay {}", i, scheduleTime);
}
producerService.shutdown();
List<LocalDateTime> eventsReceived = new LinkedList<>();
List<CompletableFuture<LocalDateTime>> eventsFuture = new LinkedList<>();
for (int i = 0; i < count; i++) {
final int order = i;
eventsFuture.add(CompletableFuture.supplyAsync(() -> {
try {
// synchronized (queue) {
LocalDateTime retrievedEvent = queue.getNext(30, TimeUnit.SECONDS);
eventsReceived.add(retrievedEvent);
log.info("Receiving ({})th event with expected execution time {}", order, retrievedEvent);
return retrievedEvent;
// }
} catch (InterruptedException e) {
log.error("", e);
return null;
}
}, consumerService));
}
CompletableFuture.allOf(eventsFuture.toArray(new CompletableFuture[eventsFuture.size()])).join();
eventsAdded.sort(LocalDateTime::compareTo);
log.info("Insertion order list of size {} is \n{}", eventsAdded.size(), eventsAdded);
// log.info("Insertion list sorted is \n{}", eventsAdded);
log.info("Execution order list of size {} is \n{}", eventsReceived.size(), eventsReceived);
List<LocalDateTime> roundedOffList = eventsReceived.stream()
.map(localDateTime -> {
Duration roundOff = Duration.ofMillis((localDateTime.getLong(ChronoField.MILLI_OF_DAY) / 100L) * 100L);
return localDateTime.truncatedTo(ChronoUnit.SECONDS).plus(roundOff);
})
.collect(Collectors.toList());
assertThat(roundedOffList).isSorted();
log.info("Insertion rounded list of size {} is \n{}", roundedOffList.size(), roundedOffList);
assertThat(eventsReceived).containsExactlyInAnyOrderElementsOf(eventsAdded);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment