Skip to content

Instantly share code, notes, and snippets.

@lzxz1234
Last active August 29, 2015 14:23
Show Gist options
  • Save lzxz1234/2512b13b07dde0e69540 to your computer and use it in GitHub Desktop.
Save lzxz1234/2512b13b07dde0e69540 to your computer and use it in GitHub Desktop.
Exec
package com.bj58.ecat.emc.diywebsite.message.utils;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import com.bj58.ecat.redis.utils.StringUtils;
public class Exec {
private static final int SIZE = 10;
private static final Worker[] threads = new Worker[SIZE];
private static final ErrorListener listener;
private static final Semaphore hasFinish = new Semaphore(0);
static {
listener = new DefaultErrorListener();
for(int i = 0; i < SIZE; i ++) {
threads[i] = new Worker(i);
threads[i].start();
}
}
/**
* 提交任务,可以保证相当 hashKey 传入的任务按严格时间顺序执行
* @param hashKey
* @param run
*/
public static void submit(String hashKey, Runnable run) {
if(StringUtils.isEmpty(hashKey)) hashKey = "DEFAULT_KEY";
threads[(int)(Math.abs(hashKey.hashCode()) % SIZE)].submit(run);
}
/**
* 等待下一次任务全部完成的状态
*/
public static void waitFinish() {
while(true) {
try {
hasFinish.acquire();
return;
} catch (InterruptedException e) {
continue;
}
}
}
/**
* 失败任务监听器
*/
public static interface ErrorListener {
public void onEvent(Exception e);
}
/**
* 默认失败监听,只记录日志
*/
private static final class DefaultErrorListener implements ErrorListener {
private Logger log = Logger.getLogger(DefaultErrorListener.class);
@Override
public void onEvent(Exception e) {
log.error("自定义线程池错误", e);
Executors.newCachedThreadPool();
}
}
private static final class Worker extends Thread {
private static final AtomicInteger totalTask = new AtomicInteger();
private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
public Worker(int id) {
this.setName("Exec-Worker-" + id);
}
@Override
public void run() {
while(true) {
Runnable target = take();
try {
if(target != null) target.run();
} catch (Exception e) {
listener.onEvent(e);
}
if(totalTask.decrementAndGet() == 0)
hasFinish.release();
}
}
public void submit(Runnable run) {
this.queue.add(run);
totalTask.incrementAndGet();
}
private Runnable take() {
while(true) {
try {
return queue.take();
} catch (Exception e) {
continue;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment