Skip to content

Instantly share code, notes, and snippets.

@pschichtel
Created September 20, 2018 21:49
Show Gist options
  • Save pschichtel/e2ec3093e0c3bf84fa2f8f2a05951f3f to your computer and use it in GitHub Desktop.
Save pschichtel/e2ec3093e0c3bf84fa2f8f2a05951f3f to your computer and use it in GitHub Desktop.
BlockingQueue<byte[]> implementation based on tape2's QueueFile
package tel.schich.carla.collection;
import com.squareup.tape2.QueueFile;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingFileQueue implements BlockingQueue<byte[]> {
private final Lock lock = new ReentrantLock();
private final Condition nonEmpty = lock.newCondition();
private final QueueFile queue;
public BlockingFileQueue(QueueFile queue) {
this.queue = queue;
}
@Override
public boolean add(byte[] bytes) {
try {
queue.add(bytes);
nonEmpty.signal();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public boolean offer(byte[] bytes) {
return add(bytes);
}
@Override
public void put(byte[] bytes) {
add(bytes);
}
@Override
public boolean offer(byte[] bytes, long timeout, TimeUnit unit) {
return add(bytes);
}
@Override
public byte[] take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
nonEmpty.await();
}
byte[] peek = queue.peek();
if (peek == null) {
throw new IllegalStateException("Queue empty!");
}
queue.remove();
return peek;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
nonEmpty.await(timeout, unit);
}
byte[] peek = queue.peek();
if (peek == null) {
throw new NoSuchElementException();
}
queue.remove();
return peek;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public boolean remove(Object o) {
lock.lock();
try {
Iterator<byte[]> it = queue.iterator();
while (it.hasNext()) {
if (o.equals(it.next())) {
it.remove();
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
@Override
public boolean contains(Object o) {
lock.lock();
try {
for (byte[] entry : queue) {
if (o.equals(entry)) {
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super byte[]> c) {
lock.lock();
try {
int size = queue.size();
Iterator<byte[]> it = queue.iterator();
while (it.hasNext()) {
c.add(it.next());
it.remove();
}
return size;
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super byte[]> c, int maxElements) {
if (maxElements == 0) {
return 0;
}
lock.lock();
try {
Iterator<byte[]> it = queue.iterator();
int i = 0;
while (it.hasNext() && i < maxElements) {
c.add(it.next());
it.remove();
i++;
}
return i;
} finally {
lock.unlock();
}
}
@Override
public byte[] remove() {
lock.lock();
try {
if (queue.isEmpty()) {
throw new NoSuchElementException();
}
byte[] peek = queue.peek();
queue.remove();
return peek;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public byte[] poll() {
lock.lock();
try {
if (queue.isEmpty()) {
return null;
}
byte[] peek = queue.peek();
queue.remove();
return peek;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public byte[] element() {
lock.lock();
try {
if (queue.isEmpty()) {
throw new NoSuchElementException();
}
return queue.peek();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public byte[] peek() {
lock.lock();
try {
if (queue.isEmpty()) {
return null;
}
return queue.peek();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
lock.lock();
try {
return queue.isEmpty();
} finally {
lock.unlock();
}
}
@Override
public Iterator<byte[]> iterator() {
lock.lock();
try {
return queue.iterator();
} finally {
lock.unlock();
}
}
@Override
public Object[] toArray() {
lock.lock();
try {
Object[] out = new Object[queue.size()];
int i = 0;
for (byte[] e : queue) {
out[i++] = e;
}
return out;
} finally {
lock.unlock();
}
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c) {
lock.lock();
try {
for (Object e : c) {
if (!contains(e)) {
return false;
}
}
return true;
} finally {
lock.unlock();
}
}
@Override
public boolean addAll(Collection<? extends byte[]> c) {
if (c.isEmpty()) {
return false;
}
lock.lock();
try {
for (byte[] e : c) {
queue.add(e);
}
nonEmpty.signal();
return true;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public boolean removeAll(Collection<?> c) {
lock.lock();
boolean changed = false;
try {
for (Object e : c) {
if (remove(e)) {
changed = true;
}
}
return changed;
} finally {
lock.unlock();
}
}
@Override
public boolean retainAll(Collection<?> c) {
lock.lock();
if (c.isEmpty()) {
if (queue.isEmpty()) {
return false;
} else {
clear();
return true;
}
}
try {
boolean changed = false;
Iterator<byte[]> it = queue.iterator();
while (it.hasNext()) {
if (!c.contains(it.next())) {
it.remove();
changed = true;
}
}
return changed;
} finally {
lock.unlock();
}
}
@Override
public void clear() {
lock.lock();
try {
queue.clear();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment