Last active
November 17, 2020 03:45
-
-
Save yiding-he/3e9c348a49168572dddf073f3167460b to your computer and use it in GitHub Desktop.
基于 Disruptor 的队列/线程池封装
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.lmax.disruptor.RingBuffer; | |
import com.lmax.disruptor.WorkHandler; | |
import com.lmax.disruptor.YieldingWaitStrategy; | |
import com.lmax.disruptor.dsl.Disruptor; | |
import com.lmax.disruptor.dsl.ProducerType; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
/** | |
* 基于 Disruptor 的事件处理队列。使用方法: | |
* <pre> | |
* DisruptorPool<String> pool = new DisruptorPool<>(2 << 10, 8); // 构建实例 | |
* pool.setEventHandler(s -> {}); // 设置事件处理逻辑 | |
* pool.start(); // 启动队列 | |
* pool.feed(""); // 发送事件对象 | |
* pool.stop(); // 关闭队列,在应用停止时调用 | |
* </pre> | |
* | |
* @param <T> 事件类型 | |
*/ | |
public class DisruptorPool<T> { | |
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { | |
final private AtomicInteger threadCounter = new AtomicInteger(); | |
@Override | |
public Thread newThread(Runnable r) { | |
return new Thread(r, "DisruptorPool-" + threadCounter.incrementAndGet()); | |
} | |
}; | |
private final int concurrency; | |
private final Disruptor<Event<T>> eventDisruptor; | |
private RingBufferProducer<Event<T>> eventProducer; | |
///////////////////////////////////////////////////////////////// | |
/** | |
* 构造方法 | |
* | |
* @param ringBufferSize 缓冲区长度,越大效率越高 | |
* @param concurrency 并发量,超过核数无意义 | |
*/ | |
public DisruptorPool(int ringBufferSize, int concurrency) { | |
this.concurrency = concurrency; | |
this.eventDisruptor = new Disruptor<>( | |
Event::new, ringBufferSize, THREAD_FACTORY, ProducerType.SINGLE, new YieldingWaitStrategy() | |
); | |
} | |
/** | |
* 设置事件处理逻辑 | |
* | |
* @param handler 事件处理逻辑 | |
*/ | |
@SuppressWarnings("unchecked") | |
public void setEventHandler(Consumer<T> handler) { | |
EventHandler<T>[] handlers = new EventHandler[concurrency]; | |
for (int i = 0; i < concurrency; i++) { | |
handlers[i] = new EventHandler<>(handler); | |
} | |
this.eventDisruptor.handleEventsWithWorkerPool(handlers); | |
} | |
/** | |
* 启动队列 | |
*/ | |
public void start() { | |
this.eventDisruptor.start(); | |
this.eventProducer = new RingBufferProducer<>(eventDisruptor); | |
} | |
/** | |
* 向队列提交消息 | |
* | |
* @param item 消息 | |
*/ | |
public void feed(T item) { | |
this.eventProducer.produce(event -> event.value = item); | |
} | |
/** | |
* 停止队列,会阻塞直到所有消息处理完毕。 | |
*/ | |
public void stop() { | |
this.eventDisruptor.shutdown(); | |
} | |
///////////////////////////////////////////////////////////////// | |
static class EventHandler<T> implements WorkHandler<Event<T>> { | |
final Consumer<T> delegate; | |
EventHandler(Consumer<T> delegate) { | |
this.delegate = delegate; | |
} | |
@Override | |
public void onEvent(Event<T> event) throws Exception { | |
delegate.accept(event.value); | |
} | |
} | |
static class Event<T> { | |
public T value; | |
} | |
static class RingBufferProducer<T> { | |
private final RingBuffer<T> ringBuffer; | |
public RingBufferProducer(RingBuffer<T> ringBuffer) { | |
this.ringBuffer = ringBuffer; | |
} | |
public RingBufferProducer(Disruptor<T> disruptor) { | |
this(disruptor.getRingBuffer()); | |
} | |
public void produce(Consumer<T> consumer) { | |
final long next = ringBuffer.next(); | |
try { | |
final T t = ringBuffer.get(next); | |
consumer.accept(t); | |
} finally { | |
ringBuffer.publish(next); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment