Skip to content

Instantly share code, notes, and snippets.

@yiding-he
Last active November 17, 2020 03:45
Show Gist options
  • Save yiding-he/3e9c348a49168572dddf073f3167460b to your computer and use it in GitHub Desktop.
Save yiding-he/3e9c348a49168572dddf073f3167460b to your computer and use it in GitHub Desktop.
基于 Disruptor 的队列/线程池封装
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* 基于 Disruptor 的事件处理队列。使用方法:
* <pre>
* DisruptorPool<String> pool = new DisruptorPool<>(2 << 10, 8); // 构建实例
* pool.setEventHandler(s -> {}); // 设置事件处理逻辑
* pool.start(); // 启动队列
* pool.feed(""); // 发送事件对象
* pool.stop(); // 关闭队列,在应用停止时调用
* </pre>
*
* @param <T> 事件类型
*/
public class DisruptorPool<T> {
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
final private AtomicInteger threadCounter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "DisruptorPool-" + threadCounter.incrementAndGet());
}
};
private final int concurrency;
private final Disruptor<Event<T>> eventDisruptor;
private RingBufferProducer<Event<T>> eventProducer;
/////////////////////////////////////////////////////////////////
/**
* 构造方法
*
* @param ringBufferSize 缓冲区长度,越大效率越高
* @param concurrency 并发量,超过核数无意义
*/
public DisruptorPool(int ringBufferSize, int concurrency) {
this.concurrency = concurrency;
this.eventDisruptor = new Disruptor<>(
Event::new, ringBufferSize, THREAD_FACTORY, ProducerType.SINGLE, new YieldingWaitStrategy()
);
}
/**
* 设置事件处理逻辑
*
* @param handler 事件处理逻辑
*/
@SuppressWarnings("unchecked")
public void setEventHandler(Consumer<T> handler) {
EventHandler<T>[] handlers = new EventHandler[concurrency];
for (int i = 0; i < concurrency; i++) {
handlers[i] = new EventHandler<>(handler);
}
this.eventDisruptor.handleEventsWithWorkerPool(handlers);
}
/**
* 启动队列
*/
public void start() {
this.eventDisruptor.start();
this.eventProducer = new RingBufferProducer<>(eventDisruptor);
}
/**
* 向队列提交消息
*
* @param item 消息
*/
public void feed(T item) {
this.eventProducer.produce(event -> event.value = item);
}
/**
* 停止队列,会阻塞直到所有消息处理完毕。
*/
public void stop() {
this.eventDisruptor.shutdown();
}
/////////////////////////////////////////////////////////////////
static class EventHandler<T> implements WorkHandler<Event<T>> {
final Consumer<T> delegate;
EventHandler(Consumer<T> delegate) {
this.delegate = delegate;
}
@Override
public void onEvent(Event<T> event) throws Exception {
delegate.accept(event.value);
}
}
static class Event<T> {
public T value;
}
static class RingBufferProducer<T> {
private final RingBuffer<T> ringBuffer;
public RingBufferProducer(RingBuffer<T> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public RingBufferProducer(Disruptor<T> disruptor) {
this(disruptor.getRingBuffer());
}
public void produce(Consumer<T> consumer) {
final long next = ringBuffer.next();
try {
final T t = ringBuffer.get(next);
consumer.accept(t);
} finally {
ringBuffer.publish(next);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment