Created
August 18, 2021 07:51
-
-
Save anandabhishek73/49db2f044eaa44accd19cb602e77644b to your computer and use it in GitHub Desktop.
Java Multi Producer Multi Consumer, Thread Safe, Blocking Delayed Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.Data; | |
import lombok.extern.slf4j.Slf4j; | |
import java.time.Duration; | |
import java.time.LocalDateTime; | |
import java.util.*; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
@Slf4j | |
public class DelayQueue<E> implements BlockingQueue<E> { | |
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); | |
final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); | |
final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); | |
final Condition newInsertion = writeLock.newCondition(); | |
final Condition newRemoval = writeLock.newCondition(); | |
TreeMap<LocalDateTime, MessageEvent> taskQueue = new TreeMap<>(); | |
private int messageCounter = 0; | |
/** | |
* Returns the number of elements in this collection, inclusive of future elements. If this collection | |
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns | |
* <tt>Integer.MAX_VALUE</tt>. | |
* | |
* @return the number of elements in this collection | |
*/ | |
@Override | |
public int size() { | |
return taskQueue.size(); | |
} | |
/** | |
* Returns <tt>true</tt> if this collection contains no elements. | |
* | |
* @return <tt>true</tt> if this collection contains no elements | |
*/ | |
@Override | |
public boolean isEmpty() { | |
return taskQueue.isEmpty(); | |
} | |
/** | |
* Returns <tt>true</tt> if this collection contains the specified element. | |
* More formally, returns <tt>true</tt> if and only if this collection | |
* contains at least one element <tt>e</tt> such that | |
* <tt>(o==null ? e==null : o.equals(e))</tt>. | |
* | |
* @param o element whose presence in this collection is to be tested | |
* @return <tt>true</tt> if this collection contains the specified | |
* element | |
* @throws ClassCastException if the type of the specified element | |
* is incompatible with this collection | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws NullPointerException if the specified element is null and this | |
* collection does not permit null elements | |
* (<a href="#optional-restrictions">optional</a>) | |
*/ | |
@Override | |
public boolean contains(Object o) { | |
return taskQueue.containsValue(o); | |
} | |
/** | |
* Removes all available elements from this queue and adds them | |
* to the given collection. This operation may be more | |
* efficient than repeatedly polling this queue. A failure | |
* encountered while attempting to add elements to | |
* collection {@code c} may result in elements being in neither, | |
* either or both collections when the associated exception is | |
* thrown. Attempts to drain a queue to itself result in | |
* {@code IllegalArgumentException}. Further, the behavior of | |
* this operation is undefined if the specified collection is | |
* modified while the operation is in progress. | |
* <p> | |
* Note that it will drain all the future elements too, | |
* without considering the delay. | |
* | |
* @param c the collection to transfer elements into | |
* @return the number of elements transferred | |
* @throws UnsupportedOperationException if addition of elements | |
* is not supported by the specified collection | |
* @throws ClassCastException if the class of an element of this queue | |
* prevents it from being added to the specified collection | |
* @throws NullPointerException if the specified collection is null | |
* @throws IllegalArgumentException if the specified collection is this | |
* queue, or some property of an element of this queue prevents | |
* it from being added to the specified collection | |
*/ | |
@Override | |
public int drainTo(Collection<? super E> c) { | |
writeLock.lock(); | |
try { | |
return drainTo(c, taskQueue.size()); | |
} finally { | |
writeLock.unlock(); | |
} | |
} | |
/** | |
* Removes at most the given number of available elements from | |
* this queue and adds them to the given collection. A failure | |
* encountered while attempting to add elements to | |
* collection {@code c} may result in elements being in neither, | |
* either or both collections when the associated exception is | |
* thrown. Attempts to drain a queue to itself result in | |
* {@code IllegalArgumentException}. Further, the behavior of | |
* this operation is undefined if the specified collection is | |
* modified while the operation is in progress. | |
* <p> | |
* Note that it will drain all the future elements too, | |
* without considering the delay. | |
* | |
* @param c the collection to transfer elements into | |
* @param maxElements the maximum number of elements to transfer | |
* @return the number of elements transferred | |
* @throws UnsupportedOperationException if addition of elements | |
* is not supported by the specified collection | |
* @throws ClassCastException if the class of an element of this queue | |
* prevents it from being added to the specified collection | |
* @throws NullPointerException if the specified collection is null | |
* @throws IllegalArgumentException if the specified collection is this | |
* queue, or some property of an element of this queue prevents | |
* it from being added to the specified collection | |
*/ | |
@Override | |
public int drainTo(Collection<? super E> c, int maxElements) { | |
int count = 0; | |
writeLock.lock(); | |
try { | |
for (MessageEvent event : taskQueue.values()) { | |
if ((count >= maxElements)) { | |
break; | |
} else { | |
count++; | |
} | |
c.add(event.getMessage()); | |
} | |
} finally { | |
newRemoval.signalAll(); | |
writeLock.unlock(); | |
} | |
return count; | |
} | |
/** | |
* Returns an iterator over the AVAILABLE elements in this collection. The | |
* order in which the elements are returned guaranteed to be sorted in ascending | |
* order of available time. | |
* <p> | |
* NOTE: Only available(execution time < current time) elements at the time of iterator | |
* creation will be iterated by this iterator. | |
* | |
* @return an <tt>Iterator</tt> over the available elements in this queue | |
*/ | |
@Override | |
public Iterator<E> iterator() { | |
readLock.lock(); | |
try { | |
Iterator<MessageEvent> internalIterator = taskQueue.headMap(LocalDateTime.now()).values().iterator(); | |
return new Iterator<E>() { | |
@Override | |
public boolean hasNext() { | |
return internalIterator.hasNext(); | |
} | |
@Override | |
public E next() { | |
return internalIterator.next().getMessage(); | |
} | |
}; | |
} finally { | |
readLock.unlock(); | |
} | |
} | |
/** | |
* Returns an array containing all of the elements in this collection. | |
* Guaranteed to be in order its elements' availability time. Same elements | |
* are returned by its iterator, and return the elements in | |
* the same order. | |
* | |
* <p>The returned array will be "safe" in that no references to it are | |
* maintained by this collection. (In other words, this method must | |
* allocate a new array even if this collection is backed by an array). | |
* The caller is thus free to modify the returned array. | |
* | |
* <p>This method acts as bridge between array-based and collection-based | |
* APIs. | |
* | |
* @return an array containing all of the elements in this collection | |
*/ | |
@Override | |
public Object[] toArray() { | |
ArrayList<E> list = new ArrayList<>(size()); | |
for (E element : | |
this) { | |
list.add(element); | |
} | |
return list.toArray(); | |
} | |
/** | |
* Returns an array containing all of the elements in this collection; | |
* the runtime type of the returned array is that of the specified array. | |
* If the collection fits in the specified array, it is returned therein. | |
* Otherwise, a new array is allocated with the runtime type of the | |
* specified array and the size of this collection. | |
* | |
* <p>If this collection fits in the specified array with room to spare | |
* (i.e., the array has more elements than this collection), the element | |
* in the array immediately following the end of the collection is set to | |
* <tt>null</tt>. (This is useful in determining the length of this | |
* collection <i>only</i> if the caller knows that this collection does | |
* not contain any <tt>null</tt> elements.) | |
* | |
* <p>If this collection makes any guarantees as to what order its elements | |
* are returned by its iterator, this method must return the elements in | |
* the same order. | |
* | |
* <p>Like the {@link #toArray()} method, this method acts as bridge between | |
* array-based and collection-based APIs. Further, this method allows | |
* precise control over the runtime type of the output array, and may, | |
* under certain circumstances, be used to save allocation costs. | |
* | |
* <p>Suppose <tt>x</tt> is a collection known to contain only strings. | |
* The following code can be used to dump the collection into a newly | |
* allocated array of <tt>String</tt>: | |
* | |
* <pre> | |
* String[] y = x.toArray(new String[0]);</pre> | |
* <p> | |
* Note that <tt>toArray(new Object[0])</tt> is identical in function to | |
* <tt>toArray()</tt>. | |
* | |
* @param a the array into which the elements of this collection are to be | |
* stored, if it is big enough; otherwise, a new array of the same | |
* runtime type is allocated for this purpose. | |
* @return an array containing all of the elements in this collection | |
* @throws ArrayStoreException if the runtime type of the specified array | |
* is not a supertype of the runtime type of every element in | |
* this collection | |
* @throws NullPointerException if the specified array is null | |
*/ | |
@SuppressWarnings("unchecked") | |
@Override | |
public <T> T[] toArray(T[] a) { | |
return (T[]) toArray(); | |
} | |
/** | |
* Inserts the specified element into this queue if it is possible to do so | |
* immediately without violating capacity restrictions, returning | |
* {@code true} upon success and throwing an {@code IllegalStateException} | |
* if no space is currently available. | |
* | |
* @param e the element to add | |
* @return {@code true} (as specified by {@link Collection#add}) | |
* @throws IllegalStateException if the element cannot be added at this | |
* time due to capacity restrictions | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null and | |
* this queue does not permit null elements | |
* @throws IllegalArgumentException if some property of this element | |
* prevents it from being added to this queue | |
*/ | |
@Override | |
public boolean add(E e) { | |
return add(e, Duration.ZERO); | |
} | |
@SuppressWarnings("unchecked") | |
public boolean add(E e, Duration delay) { | |
MessageEvent event = new MessageEvent(e, delay, LocalDateTime.now()); | |
log.trace("Created new event to insert : {}", event); | |
writeLock.lock(); | |
log.trace("Acquired writeLock in add()"); | |
try { | |
while (taskQueue.containsKey(event.getAvailabilityTime())) { | |
log.debug("Event ({}) cannot be inserted in queue", event); | |
event.setDelay(event.getDelay().plusNanos(10)); | |
} | |
taskQueue.put(event.getAvailabilityTime(), event); | |
newInsertion.signal(); | |
return true; | |
} finally { | |
log.trace("Releasing writeLock in add()"); | |
writeLock.unlock(); | |
} | |
} | |
/** | |
* Not efficient. | |
* <p> | |
* Removes a single instance of the specified element from this | |
* collection, if it is present (optional operation). More formally, | |
* removes an element <tt>e</tt> such that | |
* <tt>(o==null ? e==null : o.equals(e))</tt>, if | |
* this collection contains one or more such elements. Returns | |
* <tt>true</tt> if this collection contained the specified element (or | |
* equivalently, if this collection changed as a result of the call). | |
* | |
* @param o element to be removed from this collection, if present | |
* @return <tt>true</tt> if an element was removed as a result of this call | |
* @throws ClassCastException if the type of the specified element | |
* is incompatible with this collection | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws NullPointerException if the specified element is null and this | |
* collection does not permit null elements | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws UnsupportedOperationException if the <tt>remove</tt> operation | |
* is not supported by this collection | |
*/ | |
@Override | |
public boolean remove(Object o) { | |
writeLock.lock(); | |
try { | |
for (MessageEvent event : taskQueue.values()) { | |
if (event.getMessage().equals(o)) { | |
newRemoval.notify(); | |
return taskQueue.remove(event.getAvailabilityTime(), event); | |
} | |
} | |
return false; | |
} finally { | |
writeLock.unlock(); | |
} | |
} | |
/** | |
* Returns <tt>true</tt> if this collection contains all of the elements | |
* in the specified collection. | |
* | |
* @param c collection to be checked for containment in this collection | |
* @return <tt>true</tt> if this collection contains all of the elements | |
* in the specified collection | |
* @throws ClassCastException if the types of one or more elements | |
* in the specified collection are incompatible with this | |
* collection | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws NullPointerException if the specified collection contains one | |
* or more null elements and this collection does not permit null | |
* elements | |
* (<a href="#optional-restrictions">optional</a>), | |
* or if the specified collection is null. | |
* @see #contains(Object) | |
*/ | |
@Override | |
public boolean containsAll(Collection<?> c) { | |
return taskQueue.values().containsAll(c); | |
} | |
/** | |
* Adds all of the elements in the specified collection to this collection | |
* (optional operation). The behavior of this operation is undefined if | |
* the specified collection is modified while the operation is in progress. | |
* (This implies that the behavior of this call is undefined if the | |
* specified collection is this collection, and this collection is | |
* nonempty.) | |
* | |
* @param c collection containing elements to be added to this collection | |
* @return <tt>true</tt> if this collection changed as a result of the call | |
* @throws UnsupportedOperationException if the <tt>addAll</tt> operation | |
* is not supported by this collection | |
* @throws ClassCastException if the class of an element of the specified | |
* collection prevents it from being added to this collection | |
* @throws NullPointerException if the specified collection contains a | |
* null element and this collection does not permit null elements, | |
* or if the specified collection is null | |
* @throws IllegalArgumentException if some property of an element of the | |
* specified collection prevents it from being added to this | |
* collection | |
* @throws IllegalStateException if not all the elements can be added at | |
* this time due to insertion restrictions | |
* @see #add(Object) | |
*/ | |
@Override | |
public boolean addAll(Collection<? extends E> c) { | |
for (E element : c) { | |
if (!add(element)) { | |
return false; | |
} | |
} | |
return true; | |
} | |
/** | |
* Very inefficient. | |
* <p> | |
* Removes all of this collection's elements that are also contained in the | |
* specified collection (optional operation). After this call returns, | |
* this collection will contain no elements in common with the specified | |
* collection. | |
* | |
* @param c collection containing elements to be removed from this collection | |
* @return <tt>true</tt> if this collection changed as a result of the | |
* call | |
* @throws UnsupportedOperationException if the <tt>removeAll</tt> method | |
* is not supported by this collection | |
* @throws ClassCastException if the types of one or more elements | |
* in this collection are incompatible with the specified | |
* collection | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws NullPointerException if this collection contains one or more | |
* null elements and the specified collection does not support | |
* null elements | |
* (<a href="#optional-restrictions">optional</a>), | |
* or if the specified collection is null | |
* @see #remove(Object) | |
* @see #contains(Object) | |
*/ | |
@Override | |
public boolean removeAll(Collection<?> c) { | |
for (Object element : c) { | |
if (!remove(element)) { | |
return false; | |
} | |
} | |
return true; | |
} | |
/** | |
* Retains only the elements in this collection that are contained in the | |
* specified collection (optional operation). In other words, removes from | |
* this collection all of its elements that are not contained in the | |
* specified collection. | |
* | |
* @param c collection containing elements to be retained in this collection | |
* @return <tt>true</tt> if this collection changed as a result of the call | |
* @throws UnsupportedOperationException if the <tt>retainAll</tt> operation | |
* is not supported by this collection | |
* @throws ClassCastException if the types of one or more elements | |
* in this collection are incompatible with the specified | |
* collection | |
* (<a href="#optional-restrictions">optional</a>) | |
* @throws NullPointerException if this collection contains one or more | |
* null elements and the specified collection does not permit null | |
* elements | |
* (<a href="#optional-restrictions">optional</a>), | |
* or if the specified collection is null | |
* @see #remove(Object) | |
* @see #contains(Object) | |
*/ | |
@Override | |
public boolean retainAll(Collection<?> c) { | |
throw new UnsupportedOperationException("Not supported"); | |
} | |
/** | |
* Removes all of the elements from this collection (optional operation). | |
* The collection will be empty after this method returns. | |
* | |
* @throws UnsupportedOperationException if the <tt>clear</tt> operation | |
* is not supported by this collection | |
*/ | |
@Override | |
public void clear() { | |
writeLock.lock(); | |
try { | |
taskQueue.clear(); | |
} finally { | |
newRemoval.signalAll(); | |
writeLock.unlock(); | |
} | |
} | |
/** | |
* Inserts the specified element into this queue if it is possible to do | |
* so immediately without violating capacity restrictions. | |
* When using a capacity-restricted queue, this method is generally | |
* preferable to {@link #add}, which can fail to insert an element only | |
* by throwing an exception. | |
* | |
* @param e the element to add | |
* @return {@code true} if the element was added to this queue, else | |
* {@code false} | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null and | |
* this queue does not permit null elements | |
* @throws IllegalArgumentException if some property of this element | |
* prevents it from being added to this queue | |
*/ | |
@Override | |
public boolean offer(E e) { | |
return add(e); | |
} | |
/** | |
* Inserts the specified element into this queue, waiting if necessary | |
* for space to become available. | |
* | |
* @param e the element to add | |
* @throws InterruptedException if interrupted while waiting | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null | |
* @throws IllegalArgumentException if some property of the specified | |
* element prevents it from being added to this queue | |
*/ | |
@Override | |
public void put(E e) throws InterruptedException { | |
add(e); | |
} | |
/** | |
* Inserts the specified element into this queue, waiting up to the | |
* specified wait time if necessary for space to become available. | |
* | |
* @param e the element to add | |
* @param timeout how long to wait before giving up, in units of | |
* {@code unit} | |
* @param unit a {@code TimeUnit} determining how to interpret the | |
* {@code timeout} parameter | |
* @return {@code true} if successful, or {@code false} if | |
* the specified waiting time elapses before space is available | |
* @throws InterruptedException if interrupted while waiting | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null | |
* @throws IllegalArgumentException if some property of the specified | |
* element prevents it from being added to this queue | |
*/ | |
@Override | |
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { | |
return offer(e); | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting if necessary | |
* until an element becomes available. | |
* | |
* @return the head of this queue | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
@Override | |
public E take() throws InterruptedException { | |
return getNext(Long.MAX_VALUE, TimeUnit.SECONDS); | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting up to the | |
* specified wait time if necessary for an element to become available. | |
* | |
* @param timeout how long to wait before giving up, in units of | |
* {@code unit} | |
* @param unit a {@code TimeUnit} determining how to interpret the | |
* {@code timeout} parameter | |
* @return the head of this queue, or {@code null} if the | |
* specified waiting time elapses before an element is available | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
@Override | |
public E poll(long timeout, TimeUnit unit) throws InterruptedException { | |
return getNext(timeout, unit); | |
} | |
/** | |
* Returns the number of additional elements that this queue can ideally | |
* (in the absence of memory or resource constraints) accept without | |
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic | |
* limit. | |
* | |
* <p>Note that you <em>cannot</em> always tell if an attempt to insert | |
* an element will succeed by inspecting {@code remainingCapacity} | |
* because it may be the case that another thread is about to | |
* insert or remove an element. | |
* | |
* @return the remaining capacity | |
*/ | |
@Override | |
public int remainingCapacity() { | |
return Integer.MAX_VALUE - size(); | |
} | |
/** | |
* Retrieves and removes the head of this queue. This method differs | |
* from {@link #poll poll} only in that it throws an exception if this | |
* queue is empty. | |
* | |
* @return the head of this queue | |
* @throws NoSuchElementException if this queue is empty | |
*/ | |
@Override | |
public E remove() { | |
return poll(); | |
} | |
/** | |
* Retrieves and removes the head of this queue, | |
* or returns {@code null} if this queue is empty. | |
* | |
* @return the head of this queue, or {@code null} if this queue is empty | |
*/ | |
@Override | |
public E poll() { | |
try { | |
return getNext(); | |
} catch (InterruptedException e) { | |
log.warn("Interrupted while removing from head of queue", e); | |
return null; | |
} | |
} | |
/** | |
* Retrieves, but does not remove, the head of this queue. This method | |
* differs from {@link #peek peek} only in that it throws an exception | |
* if this queue is empty. | |
* | |
* @return the head of this queue | |
* @throws NoSuchElementException if this queue is empty | |
*/ | |
@Override | |
public E element() { | |
return Optional.ofNullable(peek()).orElseThrow(NoSuchElementException::new); | |
} | |
/** | |
* Retrieves, but does not remove, the head of this queue, | |
* or returns {@code null} if this queue is empty. | |
* | |
* @return the head of this queue, or {@code null} if this queue is empty | |
*/ | |
@Override | |
public E peek() { | |
readLock.lock(); | |
try { | |
return Optional.ofNullable(taskQueue.firstEntry()) | |
.filter(entry -> entry.getKey().isBefore(LocalDateTime.now())) | |
.map(entry -> entry.getValue().getMessage()) | |
.orElse(null); | |
} finally { | |
readLock.unlock(); | |
} | |
} | |
private boolean isAvailable() { | |
if (taskQueue.isEmpty()) return false; | |
return Optional.ofNullable(taskQueue.firstEntry()) | |
.map(entry -> entry.getKey().isBefore(LocalDateTime.now())) | |
.orElse(false); | |
} | |
public E getNext() throws InterruptedException { | |
return getNext(0, TimeUnit.MILLISECONDS); | |
} | |
/** | |
* @param timeout how long to wait before giving up, in units of | |
* {@code unit} | |
* @param unit a {@code TimeUnit} determining how to interpret the | |
* {@code timeout} parameter | |
* @return the head of this queue, or {@code null} if the | |
* specified waiting time elapses before an element is available | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
public E getNext(long timeout, TimeUnit unit) throws InterruptedException { | |
LocalDateTime startTime = LocalDateTime.now(); | |
Duration totalWaitTime = Duration.ofMillis(unit.toMillis(timeout)); | |
LocalDateTime expiryTime = startTime.plus(totalWaitTime); | |
while (LocalDateTime.now().isBefore(expiryTime)) { | |
writeLock.lock(); | |
log.trace("Locked writeLock in getNext()"); | |
try { | |
if (!isAvailable()) { | |
if (taskQueue.isEmpty()) { | |
log.debug("Nothing to consume. Waiting for next insertion."); | |
newInsertion.await(timeout, unit); | |
} else { | |
Duration nextAvailableAfter = Duration.between(LocalDateTime.now(), taskQueue.firstKey()); | |
Duration maxWaitDuration = Duration.between(LocalDateTime.now(), expiryTime); | |
Duration finalDuration = nextAvailableAfter.compareTo(maxWaitDuration) <= 0 ? nextAvailableAfter : maxWaitDuration; | |
finalDuration = finalDuration.isNegative() ? Duration.ZERO : finalDuration; | |
log.debug("Sleeping for ({}) till next availability.", finalDuration); | |
newInsertion.await(finalDuration.toMillis(), TimeUnit.MILLISECONDS); | |
} | |
} | |
if (!isAvailable()) { | |
continue; | |
} | |
MessageEvent event = taskQueue.pollFirstEntry().getValue(); | |
log.debug("Next Element is : {}", event); | |
newRemoval.signal(); | |
return event.getMessage(); | |
} finally { | |
log.trace("Unlocking writeLock in getNext()"); | |
writeLock.unlock(); | |
} | |
} | |
log.info("expiryTime of duration ({}) elapsed", totalWaitTime); | |
return null; | |
} | |
@Data | |
public class MessageEvent implements Comparable<MessageEvent> { | |
E message; | |
Duration delay; | |
LocalDateTime creationTime; | |
Integer serialNumber; | |
public MessageEvent(E message, Duration delay, LocalDateTime creationTime) { | |
this.message = message; | |
this.delay = delay; | |
this.creationTime = creationTime; | |
this.serialNumber = messageCounter++; | |
} | |
public LocalDateTime getAvailabilityTime() { | |
return creationTime.plus(delay); | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o instanceof DelayQueue.MessageEvent) { | |
MessageEvent messageEvent = (MessageEvent) o; | |
return this.message.equals(messageEvent.message); | |
} else { | |
return this.message.equals(o); | |
} | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(message); | |
} | |
@Override | |
public String toString() { | |
return "MessageEvent{" + | |
"serialNumber=" + serialNumber + | |
", delay=" + delay + | |
", availabilityTime=" + getAvailabilityTime() + | |
'}'; | |
} | |
@Override | |
public int compareTo(DelayQueue<E>.MessageEvent o) { | |
return this.getAvailabilityTime().compareTo(o.getAvailabilityTime()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.extern.slf4j.Slf4j; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Test; | |
import java.time.Duration; | |
import java.time.LocalDateTime; | |
import java.time.temporal.ChronoField; | |
import java.time.temporal.ChronoUnit; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.*; | |
import java.util.stream.Collectors; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.junit.jupiter.api.Assertions.*; | |
@Slf4j | |
class DelayQueueTest { | |
DelayQueue<String> queue; | |
@BeforeEach | |
void setup() { | |
queue = new DelayQueue<>(); | |
} | |
@Test | |
void add() { | |
assertTrue(queue.add("First Message", Duration.of(3, ChronoUnit.SECONDS))); | |
assertEquals(1, queue.size()); | |
assertTrue(queue.add("Second Message", Duration.of(1, ChronoUnit.SECONDS))); | |
assertEquals(2, queue.size()); | |
} | |
@Test | |
void getNext() throws InterruptedException { | |
queue.add("First Message", Duration.of(3, ChronoUnit.SECONDS)); | |
queue.add("Second Message", Duration.of(3, ChronoUnit.SECONDS)); | |
queue.add("Third Message", Duration.of(1, ChronoUnit.SECONDS)); | |
log.info("#1 message : {}", queue.getNext()); | |
log.info("#2 message : {}", queue.getNext()); | |
log.info("#3 message : {}", queue.getNext()); | |
log.info("#4 message : {}", queue.getNext()); | |
} | |
@Test | |
void singleProducerConsumer() throws InterruptedException { | |
int messageCount = 10; | |
Thread producer = new Thread(() -> { | |
Random random = new Random(3246576432143L); | |
for (int i = messageCount; i > 0; i--) { | |
queue.add("Message : " + i, Duration.of(i * 100, ChronoUnit.MILLIS)); | |
} | |
}); | |
producer.start(); | |
Random random = new Random(134514325443L); | |
for (int i = 0; i < messageCount; i++) { | |
// String message = queue.getNext(random.nextInt(10), TimeUnit.SECONDS); | |
String message = queue.getNext(10, TimeUnit.SECONDS); | |
assertNotNull(message); | |
log.info("#{}th Message is :{}", i, message); | |
} | |
producer.join(); | |
} | |
@Test | |
void multiConsumerProducer() { | |
final ScheduledExecutorService producerService = Executors.newScheduledThreadPool(4); | |
final ExecutorService consumerService = Executors.newFixedThreadPool(4); | |
Random random = new Random(134514325443L); | |
DelayQueue<LocalDateTime> queue = new DelayQueue<>(); | |
int count = 1000; | |
List<LocalDateTime> eventsAdded = new LinkedList<>(); | |
for (int i = 0; i < count; i++) { | |
final int order = i; | |
long scheduleTime = (long) random.nextInt(1000); | |
Duration delay = Duration.ofMillis((long) random.nextInt(1000) + 1000); | |
producerService.schedule((Runnable) () -> { | |
log.info("Adding ({})th event with delay {}", order, delay); | |
// synchronized (queue) { | |
LocalDateTime expectedRunTime = LocalDateTime.now().plus(delay); | |
queue.add(expectedRunTime, delay); | |
eventsAdded.add(expectedRunTime); | |
// } | |
}, scheduleTime, TimeUnit.MILLISECONDS); | |
log.info("Adding ({})th schedule with delay {}", i, scheduleTime); | |
} | |
producerService.shutdown(); | |
List<LocalDateTime> eventsReceived = new LinkedList<>(); | |
List<CompletableFuture<LocalDateTime>> eventsFuture = new LinkedList<>(); | |
for (int i = 0; i < count; i++) { | |
final int order = i; | |
eventsFuture.add(CompletableFuture.supplyAsync(() -> { | |
try { | |
// synchronized (queue) { | |
LocalDateTime retrievedEvent = queue.getNext(30, TimeUnit.SECONDS); | |
eventsReceived.add(retrievedEvent); | |
log.info("Receiving ({})th event with expected execution time {}", order, retrievedEvent); | |
return retrievedEvent; | |
// } | |
} catch (InterruptedException e) { | |
log.error("", e); | |
return null; | |
} | |
}, consumerService)); | |
} | |
CompletableFuture.allOf(eventsFuture.toArray(new CompletableFuture[eventsFuture.size()])).join(); | |
eventsAdded.sort(LocalDateTime::compareTo); | |
log.info("Insertion order list of size {} is \n{}", eventsAdded.size(), eventsAdded); | |
// log.info("Insertion list sorted is \n{}", eventsAdded); | |
log.info("Execution order list of size {} is \n{}", eventsReceived.size(), eventsReceived); | |
List<LocalDateTime> roundedOffList = eventsReceived.stream() | |
.map(localDateTime -> { | |
Duration roundOff = Duration.ofMillis((localDateTime.getLong(ChronoField.MILLI_OF_DAY) / 100L) * 100L); | |
return localDateTime.truncatedTo(ChronoUnit.SECONDS).plus(roundOff); | |
}) | |
.collect(Collectors.toList()); | |
assertThat(roundedOffList).isSorted(); | |
log.info("Insertion rounded list of size {} is \n{}", roundedOffList.size(), roundedOffList); | |
assertThat(eventsReceived).containsExactlyInAnyOrderElementsOf(eventsAdded); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment