Skip to content

Instantly share code, notes, and snippets.

@yanfenglee
Last active December 8, 2019 02:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yanfenglee/74f955d9edc36667d5ddc448330fe740 to your computer and use it in GitHub Desktop.
Save yanfenglee/74f955d9edc36667d5ddc448330fe740 to your computer and use it in GitHub Desktop.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
class QueueCachedTaskRunner<T> implements Runnable {
private final BlockingQueue<T> queue;
private ExecutorService executor;
private AtomicLong taskCount = new AtomicLong();
QueueCachedTaskRunner(int queueSize, int numThread) {
this.queue = new LinkedBlockingQueue<>(queueSize);
this.executor = Executors.newFixedThreadPool(numThread);
this.executor.execute(this);
}
public final long getCount() {
return taskCount.get();
}
private T getTask() {
boolean interrupted = false;
try {
for (;;) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
@Override
public final void run() {
try {
for (;;) {
T task = getTask();
taskCount.getAndIncrement();
runTask(task);
}
} finally {
//异常退出,重新添加到executor
this.executor.submit(this);
}
}
public final void add(T task) throws InterruptedException {
queue.put(task);
}
public void runTask(T task){}
}
class MongoLogTaskRunner extends QueueCachedTaskRunner<String> {
public MongoLogTaskRunner() {
super(1024, 1);
}
@Override
public void runTask(String task) {
System.out.println(" receive msg index: " + getCount() + " msg: " + task);
if (getCount() % 1000 == 0) {
throw new RuntimeException("test exception");
}
}
}
public class QueuedTaskTest {
public static void main(String[] args) {
MongoLogTaskRunner mongoLogTaskRunner = new MongoLogTaskRunner();
final int numThread = 40;
for (int it = 0; it < numThread; it++) {
final int threadidx = it;
Thread t = new Thread(() -> {
try {
for (int i = 0; i < 100000; i++) {
mongoLogTaskRunner.add("hello, 世界,log to mongo now : " + threadidx + ":" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
t.start();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment