Skip to content

Instantly share code, notes, and snippets.

@sunng87
Created April 24, 2016 02:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sunng87/285530b9199f7b560f33cb59ee936644 to your computer and use it in GitHub Desktop.
Save sunng87/285530b9199f7b560f33cb59ee936644 to your computer and use it in GitHub Desktop.
A file base blocking queue
import com.squareup.tape.FileObjectQueue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by nsun on 16-4-22.
*/
public class InfQueue<E extends Serializable> extends LinkedBlockingQueue<E> implements BlockingQueue<E> {
private int memoryQueueSize;
private FileObjectQueue<E> backendQueue;
private ReentrantLock backendQueueLock = new ReentrantLock();
public InfQueue(int inMemorySize) throws IOException {
super(inMemorySize);
this.memoryQueueSize = inMemorySize;
File backendFile = File.createTempFile(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
backendQueue = new FileObjectQueue<E>(backendFile, new SerializableConvertor());
}
@Override
public int size() {
return super.size() + backendQueue.size();
}
@Override
public void put(E e) throws InterruptedException {
if (this.size() < this.memoryQueueSize) {
super.put(e);
} else {
backendQueueLock.lock();
backendQueue.add(e);
backendQueueLock.unlock();
}
}
@Override
public boolean offer(E e, long l, TimeUnit timeUnit) throws InterruptedException {
boolean offered = super.offer(e, l, timeUnit);
if (! offered) {
backendQueueLock.lock();
this.backendQueue.add(e);
backendQueueLock.unlock();
}
return true;
}
@Override
public boolean offer(E e) {
try {
this.put(e);
} catch (InterruptedException e1) {
}
return true;
}
@Override
public E poll(long l, TimeUnit timeUnit) throws InterruptedException {
backendQueueLock.lock();
if (this.backendQueue.size() > 0) {
E obj = this.backendQueue.peek();
this.backendQueue.remove();
backendQueueLock.unlock();
return obj;
} else {
backendQueueLock.unlock();
return super.poll(l, timeUnit);
}
}
@Override
public E poll() {
backendQueueLock.lock();
if (this.backendQueue.size() > 0) {
E obj = this.backendQueue.peek();
this.backendQueue.remove();
backendQueueLock.unlock();
return obj;
} else {
backendQueueLock.unlock();
return super.poll();
}
}
@Override
public E peek() {
backendQueueLock.lock();
if (this.backendQueue.size() > 0) {
E obj = this.backendQueue.peek();
backendQueueLock.unlock();
return obj;
} else {
backendQueueLock.unlock();
return super.peek();
}
}
@Override
public E take() throws InterruptedException {
backendQueueLock.lock();
if (this.backendQueue.size() > 0) {
E obj = this.backendQueue.peek();
this.backendQueue.remove();
backendQueueLock.unlock();
return obj;
} else {
backendQueueLock.unlock();
return super.take();
}
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException("Not work for now");
}
@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException("Not work for now");
}
@Override
public void clear() {
throw new UnsupportedOperationException("Not work for now.");
}
@Override
public boolean isEmpty() {
return this.backendQueue.size() == 0 && super.isEmpty();
}
public void close() {
this.backendQueue.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment