Skip to content

Instantly share code, notes, and snippets.

@tulioballari
Created March 2, 2023 12:28
Show Gist options
  • Save tulioballari/d85094e6b951ee2258e3ae67c774b36c to your computer and use it in GitHub Desktop.
Save tulioballari/d85094e6b951ee2258e3ae67c774b36c to your computer and use it in GitHub Desktop.
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class UniqueBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private HashSet<E> set = new HashSet<>();
private LinkedList<E> queue = new LinkedList<>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
public UniqueBlockingQueue() {
this(Integer.MAX_VALUE);
}
public UniqueBlockingQueue(int capacity) {
this.capacity = capacity;
}
@Override
public int size() {
return count.get();
}
@Override
public int remainingCapacity() {
return capacity - count.get();
}
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
@Override
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final var lock = this.lock;
final var count = this.count;
lock.lockInterruptibly();
try {
while (count.get() == capacity || set.contains(e)) {
notFull.await();
}
queue.add(e);
set.add(e);
final var c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
if (c == 0) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions.
* When using a capacity-restricted queue, this method is generally
* preferable to {@link #add}, which can fail to insert an element only
* by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
@Override
public boolean offer(E e) {
Objects.requireNonNull(e);
final var count = this.count;
if (count.get() == capacity) {
return false;
}
final var lock = this.lock;
lock.lock();
try {
if (count.get() == capacity || set.contains(e)) {
return false;
}
queue.add(e);
set.add(e);
final var c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
if (c == 0) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
return true;
}
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final var lock = this.lock;
final var count = this.count;
lock.lock();
try {
while (count.get() == capacity || set.contains(e)) {
if (nanos <= 0L) {
return false;
}
nanos = notFull.awaitNanos(nanos);
}
queue.add(e);
set.add(e);
final var c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
if (c == 0) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
return true;
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
@Override
public E take() throws InterruptedException {
final var lock = this.lock;
final var count = this.count;
lock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
var e = queue.poll();
set.remove(e);
final var c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
if (c == capacity) {
notFull.signal();
}
return e;
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
@Override
public E poll() {
final var count = this.count;
if (count.get() == 0) {
return null;
}
final var lock = this.lock;
lock.lock();
try {
if (count.get() == 0) {
return null;
}
var e = queue.poll();
set.remove(e);
final var c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
if (c == capacity) {
notFull.signal();
}
return e;
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final var lock = this.lock;
final var count = this.count;
lock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L) {
return null;
}
nanos = notEmpty.awaitNanos(nanos);
}
var e = queue.poll();
set.remove(e);
final var c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
if (c == capacity) {
notFull.signal();
}
return e;
} finally {
lock.unlock();
}
}
/**
* Retrieves, but does not remove, the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
@Override
public E peek() {
final var count = this.count;
if (count.get() == 0) {
return null;
}
final var lock = this.lock;
lock.lock();
try {
return (count.get() > 0) ? queue.peek() : null;
} finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) {
return false;
}
final var lock = this.lock;
lock.lock();
try {
if (set.contains(o)) {
queue.remove(o);
set.remove(o);
if (count.getAndDecrement() == capacity) {
notFull.signal();
}
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) {
return false;
}
final var lock = this.lock;
lock.lock();
try {
return set.contains(o);
} finally {
lock.unlock();
}
}
@Override
public Object[] toArray() {
final var lock = this.lock;
lock.lock();
try {
return queue.toArray();
} finally {
lock.unlock();
}
}
@Override
public <T> T[] toArray(T[] a) {
final var lock = this.lock;
lock.lock();
try {
return queue.toArray(a);
} finally {
lock.unlock();
}
}
public String toString() {
return queue.toString();
}
/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
final var lock = this.lock;
lock.lock();
try {
queue.clear();
set.clear();;
if (count.getAndSet(0) == capacity) {
notFull.signal();
}
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
int i = 0;
try {
while (i < n) {
var e = queue.poll();
set.remove(e);
c.add(e);
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
if (count.getAndAdd(-i) == capacity) {
notFull.signal();
}
}
}
} finally {
lock.unlock();
}
}
/**
* Weakly-consistent iterator.
*
* Lazily updated ancestor field provides expected O(1) remove(),
* but still O(n) in the worst case, whenever the saved ancestor
* is concurrently deleted.
*/
@Override
public Iterator<E> iterator() {
var iter = queue.iterator();
var it = new Iterator<E>() {
private E lastElem = null;
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public E next() {
lastElem = iter.next();
return lastElem;
}
@Override
public void remove() {
final var lock = UniqueBlockingQueue.this.lock;
lock.lock();
try {
iter.remove();
if (lastElem != null) {
set.remove(lastElem);
}
lastElem = null;
} finally {
lock.unlock();
}
}
};
return it;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment