Created
September 20, 2018 21:49
-
-
Save pschichtel/e2ec3093e0c3bf84fa2f8f2a05951f3f to your computer and use it in GitHub Desktop.
BlockingQueue<byte[]> implementation based on tape2's QueueFile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tel.schich.carla.collection; | |
import com.squareup.tape2.QueueFile; | |
import java.io.IOException; | |
import java.util.*; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
public class BlockingFileQueue implements BlockingQueue<byte[]> { | |
private final Lock lock = new ReentrantLock(); | |
private final Condition nonEmpty = lock.newCondition(); | |
private final QueueFile queue; | |
public BlockingFileQueue(QueueFile queue) { | |
this.queue = queue; | |
} | |
@Override | |
public boolean add(byte[] bytes) { | |
try { | |
queue.add(bytes); | |
nonEmpty.signal(); | |
return true; | |
} catch (IOException e) { | |
return false; | |
} | |
} | |
@Override | |
public boolean offer(byte[] bytes) { | |
return add(bytes); | |
} | |
@Override | |
public void put(byte[] bytes) { | |
add(bytes); | |
} | |
@Override | |
public boolean offer(byte[] bytes, long timeout, TimeUnit unit) { | |
return add(bytes); | |
} | |
@Override | |
public byte[] take() throws InterruptedException { | |
lock.lock(); | |
try { | |
while (queue.isEmpty()) { | |
nonEmpty.await(); | |
} | |
byte[] peek = queue.peek(); | |
if (peek == null) { | |
throw new IllegalStateException("Queue empty!"); | |
} | |
queue.remove(); | |
return peek; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException { | |
lock.lock(); | |
try { | |
while (queue.isEmpty()) { | |
nonEmpty.await(timeout, unit); | |
} | |
byte[] peek = queue.peek(); | |
if (peek == null) { | |
throw new NoSuchElementException(); | |
} | |
queue.remove(); | |
return peek; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public int remainingCapacity() { | |
return Integer.MAX_VALUE; | |
} | |
@Override | |
public boolean remove(Object o) { | |
lock.lock(); | |
try { | |
Iterator<byte[]> it = queue.iterator(); | |
while (it.hasNext()) { | |
if (o.equals(it.next())) { | |
it.remove(); | |
return true; | |
} | |
} | |
return false; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean contains(Object o) { | |
lock.lock(); | |
try { | |
for (byte[] entry : queue) { | |
if (o.equals(entry)) { | |
return true; | |
} | |
} | |
return false; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public int drainTo(Collection<? super byte[]> c) { | |
lock.lock(); | |
try { | |
int size = queue.size(); | |
Iterator<byte[]> it = queue.iterator(); | |
while (it.hasNext()) { | |
c.add(it.next()); | |
it.remove(); | |
} | |
return size; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public int drainTo(Collection<? super byte[]> c, int maxElements) { | |
if (maxElements == 0) { | |
return 0; | |
} | |
lock.lock(); | |
try { | |
Iterator<byte[]> it = queue.iterator(); | |
int i = 0; | |
while (it.hasNext() && i < maxElements) { | |
c.add(it.next()); | |
it.remove(); | |
i++; | |
} | |
return i; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public byte[] remove() { | |
lock.lock(); | |
try { | |
if (queue.isEmpty()) { | |
throw new NoSuchElementException(); | |
} | |
byte[] peek = queue.peek(); | |
queue.remove(); | |
return peek; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public byte[] poll() { | |
lock.lock(); | |
try { | |
if (queue.isEmpty()) { | |
return null; | |
} | |
byte[] peek = queue.peek(); | |
queue.remove(); | |
return peek; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public byte[] element() { | |
lock.lock(); | |
try { | |
if (queue.isEmpty()) { | |
throw new NoSuchElementException(); | |
} | |
return queue.peek(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public byte[] peek() { | |
lock.lock(); | |
try { | |
if (queue.isEmpty()) { | |
return null; | |
} | |
return queue.peek(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public int size() { | |
lock.lock(); | |
try { | |
return queue.size(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean isEmpty() { | |
lock.lock(); | |
try { | |
return queue.isEmpty(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public Iterator<byte[]> iterator() { | |
lock.lock(); | |
try { | |
return queue.iterator(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public Object[] toArray() { | |
lock.lock(); | |
try { | |
Object[] out = new Object[queue.size()]; | |
int i = 0; | |
for (byte[] e : queue) { | |
out[i++] = e; | |
} | |
return out; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public <T> T[] toArray(T[] a) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public boolean containsAll(Collection<?> c) { | |
lock.lock(); | |
try { | |
for (Object e : c) { | |
if (!contains(e)) { | |
return false; | |
} | |
} | |
return true; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean addAll(Collection<? extends byte[]> c) { | |
if (c.isEmpty()) { | |
return false; | |
} | |
lock.lock(); | |
try { | |
for (byte[] e : c) { | |
queue.add(e); | |
} | |
nonEmpty.signal(); | |
return true; | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean removeAll(Collection<?> c) { | |
lock.lock(); | |
boolean changed = false; | |
try { | |
for (Object e : c) { | |
if (remove(e)) { | |
changed = true; | |
} | |
} | |
return changed; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean retainAll(Collection<?> c) { | |
lock.lock(); | |
if (c.isEmpty()) { | |
if (queue.isEmpty()) { | |
return false; | |
} else { | |
clear(); | |
return true; | |
} | |
} | |
try { | |
boolean changed = false; | |
Iterator<byte[]> it = queue.iterator(); | |
while (it.hasNext()) { | |
if (!c.contains(it.next())) { | |
it.remove(); | |
changed = true; | |
} | |
} | |
return changed; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public void clear() { | |
lock.lock(); | |
try { | |
queue.clear(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment