Created
March 2, 2023 12:28
-
-
Save tulioballari/d85094e6b951ee2258e3ae67c774b36c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.AbstractQueue; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.LinkedList; | |
import java.util.Objects; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
public class UniqueBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { | |
private final ReentrantLock lock = new ReentrantLock(); | |
private final Condition notEmpty = lock.newCondition(); | |
private final Condition notFull = lock.newCondition(); | |
private HashSet<E> set = new HashSet<>(); | |
private LinkedList<E> queue = new LinkedList<>(); | |
private final int capacity; | |
private final AtomicInteger count = new AtomicInteger(); | |
public UniqueBlockingQueue() { | |
this(Integer.MAX_VALUE); | |
} | |
public UniqueBlockingQueue(int capacity) { | |
this.capacity = capacity; | |
} | |
@Override | |
public int size() { | |
return count.get(); | |
} | |
@Override | |
public int remainingCapacity() { | |
return capacity - count.get(); | |
} | |
/** | |
* Inserts the specified element into this queue, waiting if necessary | |
* for space to become available. | |
* | |
* @param e the element to add | |
* @throws InterruptedException if interrupted while waiting | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null | |
* @throws IllegalArgumentException if some property of the specified | |
* element prevents it from being added to this queue | |
*/ | |
@Override | |
public void put(E e) throws InterruptedException { | |
Objects.requireNonNull(e); | |
final var lock = this.lock; | |
final var count = this.count; | |
lock.lockInterruptibly(); | |
try { | |
while (count.get() == capacity || set.contains(e)) { | |
notFull.await(); | |
} | |
queue.add(e); | |
set.add(e); | |
final var c = count.getAndIncrement(); | |
if (c + 1 < capacity) { | |
notFull.signal(); | |
} | |
if (c == 0) { | |
notEmpty.signal(); | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Inserts the specified element into this queue if it is possible to do | |
* so immediately without violating capacity restrictions. | |
* When using a capacity-restricted queue, this method is generally | |
* preferable to {@link #add}, which can fail to insert an element only | |
* by throwing an exception. | |
* | |
* @param e the element to add | |
* @return {@code true} if the element was added to this queue, else | |
* {@code false} | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null and | |
* this queue does not permit null elements | |
* @throws IllegalArgumentException if some property of this element | |
* prevents it from being added to this queue | |
*/ | |
@Override | |
public boolean offer(E e) { | |
Objects.requireNonNull(e); | |
final var count = this.count; | |
if (count.get() == capacity) { | |
return false; | |
} | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
if (count.get() == capacity || set.contains(e)) { | |
return false; | |
} | |
queue.add(e); | |
set.add(e); | |
final var c = count.getAndIncrement(); | |
if (c + 1 < capacity) { | |
notFull.signal(); | |
} | |
if (c == 0) { | |
notEmpty.signal(); | |
} | |
} finally { | |
lock.unlock(); | |
} | |
return true; | |
} | |
/** | |
* Inserts the specified element into this queue, waiting up to the | |
* specified wait time if necessary for space to become available. | |
* | |
* @param e the element to add | |
* @param timeout how long to wait before giving up, in units of | |
* {@code unit} | |
* @param unit a {@code TimeUnit} determining how to interpret the | |
* {@code timeout} parameter | |
* @return {@code true} if successful, or {@code false} if | |
* the specified waiting time elapses before space is available | |
* @throws InterruptedException if interrupted while waiting | |
* @throws ClassCastException if the class of the specified element | |
* prevents it from being added to this queue | |
* @throws NullPointerException if the specified element is null | |
* @throws IllegalArgumentException if some property of the specified | |
* element prevents it from being added to this queue | |
*/ | |
@Override | |
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { | |
Objects.requireNonNull(e); | |
long nanos = unit.toNanos(timeout); | |
final var lock = this.lock; | |
final var count = this.count; | |
lock.lock(); | |
try { | |
while (count.get() == capacity || set.contains(e)) { | |
if (nanos <= 0L) { | |
return false; | |
} | |
nanos = notFull.awaitNanos(nanos); | |
} | |
queue.add(e); | |
set.add(e); | |
final var c = count.getAndIncrement(); | |
if (c + 1 < capacity) { | |
notFull.signal(); | |
} | |
if (c == 0) { | |
notEmpty.signal(); | |
} | |
} finally { | |
lock.unlock(); | |
} | |
return true; | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting if necessary | |
* until an element becomes available. | |
* | |
* @return the head of this queue | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
@Override | |
public E take() throws InterruptedException { | |
final var lock = this.lock; | |
final var count = this.count; | |
lock.lockInterruptibly(); | |
try { | |
while (count.get() == 0) { | |
notEmpty.await(); | |
} | |
var e = queue.poll(); | |
set.remove(e); | |
final var c = count.getAndDecrement(); | |
if (c > 1) { | |
notEmpty.signal(); | |
} | |
if (c == capacity) { | |
notFull.signal(); | |
} | |
return e; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Retrieves and removes the head of this queue, | |
* or returns {@code null} if this queue is empty. | |
* | |
* @return the head of this queue, or {@code null} if this queue is empty | |
*/ | |
@Override | |
public E poll() { | |
final var count = this.count; | |
if (count.get() == 0) { | |
return null; | |
} | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
if (count.get() == 0) { | |
return null; | |
} | |
var e = queue.poll(); | |
set.remove(e); | |
final var c = count.getAndDecrement(); | |
if (c > 1) { | |
notEmpty.signal(); | |
} | |
if (c == capacity) { | |
notFull.signal(); | |
} | |
return e; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting up to the | |
* specified wait time if necessary for an element to become available. | |
* | |
* @param timeout how long to wait before giving up, in units of | |
* {@code unit} | |
* @param unit a {@code TimeUnit} determining how to interpret the | |
* {@code timeout} parameter | |
* @return the head of this queue, or {@code null} if the | |
* specified waiting time elapses before an element is available | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
@Override | |
public E poll(long timeout, TimeUnit unit) throws InterruptedException { | |
long nanos = unit.toNanos(timeout); | |
final var lock = this.lock; | |
final var count = this.count; | |
lock.lockInterruptibly(); | |
try { | |
while (count.get() == 0) { | |
if (nanos <= 0L) { | |
return null; | |
} | |
nanos = notEmpty.awaitNanos(nanos); | |
} | |
var e = queue.poll(); | |
set.remove(e); | |
final var c = count.getAndDecrement(); | |
if (c > 1) { | |
notEmpty.signal(); | |
} | |
if (c == capacity) { | |
notFull.signal(); | |
} | |
return e; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Retrieves, but does not remove, the head of this queue, | |
* or returns {@code null} if this queue is empty. | |
* | |
* @return the head of this queue, or {@code null} if this queue is empty | |
*/ | |
@Override | |
public E peek() { | |
final var count = this.count; | |
if (count.get() == 0) { | |
return null; | |
} | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
return (count.get() > 0) ? queue.peek() : null; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Removes a single instance of the specified element from this queue, | |
* if it is present. More formally, removes an element {@code e} such | |
* that {@code o.equals(e)}, if this queue contains one or more such | |
* elements. | |
* Returns {@code true} if this queue contained the specified element | |
* (or equivalently, if this queue changed as a result of the call). | |
* | |
* @param o element to be removed from this queue, if present | |
* @return {@code true} if this queue changed as a result of the call | |
*/ | |
public boolean remove(Object o) { | |
if (o == null) { | |
return false; | |
} | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
if (set.contains(o)) { | |
queue.remove(o); | |
set.remove(o); | |
if (count.getAndDecrement() == capacity) { | |
notFull.signal(); | |
} | |
return true; | |
} | |
return false; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Returns {@code true} if this queue contains the specified element. | |
* More formally, returns {@code true} if and only if this queue contains | |
* at least one element {@code e} such that {@code o.equals(e)}. | |
* | |
* @param o object to be checked for containment in this queue | |
* @return {@code true} if this queue contains the specified element | |
*/ | |
public boolean contains(Object o) { | |
if (o == null) { | |
return false; | |
} | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
return set.contains(o); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public Object[] toArray() { | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
return queue.toArray(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public <T> T[] toArray(T[] a) { | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
return queue.toArray(a); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public String toString() { | |
return queue.toString(); | |
} | |
/** | |
* Atomically removes all of the elements from this queue. | |
* The queue will be empty after this call returns. | |
*/ | |
public void clear() { | |
final var lock = this.lock; | |
lock.lock(); | |
try { | |
queue.clear(); | |
set.clear();; | |
if (count.getAndSet(0) == capacity) { | |
notFull.signal(); | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public int drainTo(Collection<? super E> c) { | |
return drainTo(c, Integer.MAX_VALUE); | |
} | |
@Override | |
public int drainTo(Collection<? super E> c, int maxElements) { | |
Objects.requireNonNull(c); | |
if (c == this) | |
throw new IllegalArgumentException(); | |
if (maxElements <= 0) | |
return 0; | |
final ReentrantLock lock = this.lock; | |
lock.lock(); | |
try { | |
int n = Math.min(maxElements, count.get()); | |
// count.get provides visibility to first n Nodes | |
int i = 0; | |
try { | |
while (i < n) { | |
var e = queue.poll(); | |
set.remove(e); | |
c.add(e); | |
++i; | |
} | |
return n; | |
} finally { | |
// Restore invariants even if c.add() threw | |
if (i > 0) { | |
if (count.getAndAdd(-i) == capacity) { | |
notFull.signal(); | |
} | |
} | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Weakly-consistent iterator. | |
* | |
* Lazily updated ancestor field provides expected O(1) remove(), | |
* but still O(n) in the worst case, whenever the saved ancestor | |
* is concurrently deleted. | |
*/ | |
@Override | |
public Iterator<E> iterator() { | |
var iter = queue.iterator(); | |
var it = new Iterator<E>() { | |
private E lastElem = null; | |
@Override | |
public boolean hasNext() { | |
return iter.hasNext(); | |
} | |
@Override | |
public E next() { | |
lastElem = iter.next(); | |
return lastElem; | |
} | |
@Override | |
public void remove() { | |
final var lock = UniqueBlockingQueue.this.lock; | |
lock.lock(); | |
try { | |
iter.remove(); | |
if (lastElem != null) { | |
set.remove(lastElem); | |
} | |
lastElem = null; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
}; | |
return it; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment