在v4版本的MyDisruptor实现多线程生产者后。按照计划,v5版本的MyDisruptor需要支持更便于用户使用的DSL风格的API。
由于该文属于系列博客的一部分,需要先对之前的博客内容有所了解才能更好地理解本篇博客
通过前面4个版本的迭代,MyDisruptor已经实现了disruptor的大多数功能。但对程序可读性有要求的读者可能会注意到,之前给出的demo示例代码中对于构建多个消费者之间的依赖关系时细节有点多。
构建一个有上游消费者依赖的EventProcessor消费者一般来说需要通过以下几步完成:
目前的版本中,每创建一个消费者都需要写一遍上述的模板代码。对于理解Disruptor原理的人来说还勉强能接受,但还是很繁琐且容易在细节上犯错,更遑论对disruptor底层不大了解的普通用户。
基于上述原因,disruptor提供了更加简单易用的DSL风格API,使得对disruptor底层各组件间交互不甚了解的用户也能很方便的使用disruptor,去构建不同消费者组间的依赖关系。
DSL即Domain Specific Language,领域特定语言。DSL是针对特定领域抽象出的一个特定语言,通过进一层的抽象来代替大量繁琐的通用代码段,如sql、shell等都是常见的dsl。
而DSL风格的API最大的特点就是接口的定义贴合业务场景,因此易于理解和使用。
首先要介绍的就是Disruptor类,disruptor类主要用于创建一个符合用户需求的RingBuffer,并提供一组易用的api以屏蔽底层组件交互的细节。
MyDisruptor类的构造函数有五个参数,分别是:
以上都是需要用户自定义或者指定的核心参数,构建好的disruptor的同时,也生成了RingBuffer和指定类型的生产者序列器。
/** * disruptor dsl(仿Disruptor.Disruptor) * */public class MyDisruptor<T> { private final MyRingBuffer<T> ringBuffer; private final Executor executor; private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); public MyDisruptor( final MyEventFactory<T> eventProducer, final int ringBufferSize, final Executor executor, final ProducerType producerType, final MyWaitStrategy myWaitStrategy) { this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy); this.executor = executor; } /** * 获得当亲Disruptor的ringBuffer * */ public MyRingBuffer<T> getRingBuffer() { return ringBuffer; } // 注意:省略了大量无关代码}
创建好Disruptor后,便可以按照需求编排各种消费者的依赖逻辑了。创建消费者时除了用户自定义的消费量逻辑接口(EventHandler/WorkHandler),还有两个关键要素需要指定,一是指定是单线程生产者还是多线程,二是指定当前消费者的上游消费者序列集合(或者没有)。
两两组合四种情况,为此Disruptor类一共提供了四个方法用于创建消费者:
这四个方法的返回值都是EventHandlerGroup对象,其中提供了关键的then/thenHandleEventsWithWorkerPool方法用来链式的编排多个消费者组。
实际上disruptor中的EventHandlerGroup还提供了等更多的dsl风格的方法(如and),限于篇幅MyDisruptor中只实现了最关键的几个方法。
/** * DSL事件处理器组(仿Disruptor.EventHandlerGroup) * */public class MyEventHandlerGroup<T> { private final MyDisruptor<T> disruptor; private final MyConsumerRepository<T> myConsumerRepository; private final MySequence[] sequences; public MyEventHandlerGroup(MyDisruptor<T> disruptor, MyConsumerRepository<T> myConsumerRepository, MySequence[] sequences) { this.disruptor = disruptor; this.myConsumerRepository = myConsumerRepository; this.sequences = sequences; } @SafeVarargs public final MyEventHandlerGroup<T> then(final MyEventHandler<T>... myEventHandlers) { return handleEventsWith(myEventHandlers); } @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); } @SafeVarargs public final MyEventHandlerGroup<T> thenHandleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) { return handleEventsWithWorkerPool(handlers); } @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) { return disruptor.createWorkerPool(sequences, handlers); }}
/** * disruptor dsl(仿Disruptor.Disruptor) * */public class MyDisruptor<T> { private final MyRingBuffer<T> ringBuffer; private final Executor executor; private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); public MyDisruptor( final MyEventFactory<T> eventProducer, final int ringBufferSize, final Executor executor, final ProducerType producerType, final MyWaitStrategy myWaitStrategy) { this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy); this.executor = executor; } /** * 注册单线程消费者 (无上游依赖消费者,仅依赖生产者序列) * */ @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){ return createEventProcessors(new MySequence[0], myEventHandlers); } /** * 注册单线程消费者 (有上游依赖消费者,仅依赖生产者序列) * @param barrierSequences 依赖的序列屏障 * @param myEventHandlers 用户自定义的事件消费者集合 * */ public MyEventHandlerGroup<T> createEventProcessors( final MySequence[] barrierSequences, final MyEventHandler<T>[] myEventHandlers) { final MySequence[] processorSequences = new MySequence[myEventHandlers.length]; final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); int i=0; for(MyEventHandler<T> myEventConsumer : myEventHandlers){ final MyBatchEventProcessor<T> batchEventProcessor = new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier); processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence(); i++; // consumer对象都维护起来,便于后续start时启动 consumerRepository.add(batchEventProcessor); } // 更新当前生产者注册的消费者序列 updateGatingSequencesForNextInChain(barrierSequences,processorSequences); return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences); } /** * 注册多线程消费者 (无上游依赖消费者,仅依赖生产者序列) * */ @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) { return createWorkerPool(new MySequence[0], myWorkHandlers); } /** * 注册多线程消费者 (有上游依赖消费者,仅依赖生产者序列) * @param barrierSequences 依赖的序列屏障 * @param myWorkHandlers 用户自定义的事件消费者集合 * */ public MyEventHandlerGroup<T> createWorkerPool( final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) { final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers); // consumer都保存起来,便于start启动 consumerRepository.add(workerPool); final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences); } private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) { if (processorSequences.length != 0) { // 这是一个优化操作: // 由于新的消费者通过ringBuffer.newBarrier(barrierSequences),已经是依赖于之前ringBuffer中已有的消费者序列 // 消费者即EventProcessor内部已经设置好了老的barrierSequences为依赖,因此可以将ringBuffer中已有的消费者序列去掉 // 只需要保存,依赖当前消费者链条最末端的序列即可(也就是最慢的序列),这样生产者可以更快的遍历注册的消费者序列 for(MySequence sequenceV4 : barrierSequences){ ringBuffer.removeConsumerSequence(sequenceV4); } for(MySequence sequenceV4 : processorSequences){ // 新设置的就是当前消费者链条最末端的序列 ringBuffer.addConsumerSequence(sequenceV4); } } } /** * 启动所有已注册的消费者 * */ public void start(){ // cas设置启动标识,避免重复启动 if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor只能启动一次"); } // 遍历所有的消费者,挨个start启动 this.consumerRepository.getConsumerInfos().forEach( item->item.start(this.executor) ); } /** * 获得当亲Disruptor的ringBuffer * */ public MyRingBuffer<T> getRingBuffer() { return ringBuffer; }}
下面通过一个简单但不失一般性的示例,来展示一下DSL风格API到底简化了多少复杂度。
public class MyRingBufferV5DemoOrginal { /** * 消费者依赖关系图(简单起见都是单线程消费者): * A -> BC -> D * -> E -> F * */ public static void main(String[] args) { // 环形队列容量为16(2的4次方) int ringBufferSize = 16; // 创建环形队列 MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createSingleProducer( new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy()); // 获得ringBuffer的序列屏障(最上游的序列屏障内只维护生产者的序列) MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier(); // ================================== 基于生产者序列屏障,创建消费者A MyBatchEventProcessor<OrderEventModel> eventProcessorA = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier); MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence(); // RingBuffer监听消费者A的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA); // ================================== 通过消费者A的序列号创建序列屏障(构成消费的顺序依赖),创建消费者B MySequenceBarrier mySequenceBarrierB = myRingBuffer.newBarrier(consumeSequenceA); MyBatchEventProcessor<OrderEventModel> eventProcessorB = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerB"), mySequenceBarrierB); MySequence consumeSequenceB = eventProcessorB.getCurrentConsumeSequence(); // RingBuffer监听消费者B的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceB); // ================================== 通过消费者A的序列号创建序列屏障(构成消费的顺序依赖),创建消费者C MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA); MyBatchEventProcessor<OrderEventModel> eventProcessorC = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC); MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence(); // RingBuffer监听消费者C的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC); // ================================== 消费者D依赖上游的消费者B,C,通过消费者B、C的序列号创建序列屏障(构成消费的顺序依赖) MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(consumeSequenceB,consumeSequenceC); // 基于序列屏障,创建消费者D MyBatchEventProcessor<OrderEventModel> eventProcessorD = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD); MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence(); // RingBuffer监听消费者D的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD); // ================================== 通过消费者A的序列号创建序列屏障(构成消费的顺序依赖),创建消费者E MySequenceBarrier mySequenceBarrierE = myRingBuffer.newBarrier(consumeSequenceA); MyBatchEventProcessor<OrderEventModel> eventProcessorE = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerE"), mySequenceBarrierE); MySequence consumeSequenceE = eventProcessorE.getCurrentConsumeSequence(); // RingBuffer监听消费者E的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceE); // ================================== 通过消费者E的序列号创建序列屏障(构成消费的顺序依赖),创建消费者F MySequenceBarrier mySequenceBarrierF = myRingBuffer.newBarrier(consumeSequenceE); MyBatchEventProcessor<OrderEventModel> eventProcessorF = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerF"), mySequenceBarrierF); MySequence consumeSequenceF = eventProcessorF.getCurrentConsumeSequence(); // RingBuffer监听消费者F的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceF); Executor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); // 启动消费者线程A executor.execute(eventProcessorA); // 启动消费者线程B executor.execute(eventProcessorB); // 启动消费者线程C executor.execute(eventProcessorC); // 启动消费者线程D executor.execute(eventProcessorD); // 启动消费者线程E executor.execute(eventProcessorE); // 启动消费者线程F executor.execute(eventProcessorF); // 生产者发布100个事件 for(int i=0; i<100; i++) { long nextIndex = myRingBuffer.next(); OrderEventModel orderEvent = myRingBuffer.get(nextIndex); orderEvent.setMessage(""+i); orderEvent.setPrice(i * 10); System.out.println("生产者发布事件:" + orderEvent); myRingBuffer.publish(nextIndex); } }}
public class MyRingBufferV5DemoUseDSL { /** * 消费者依赖关系图(简单起见都是单线程消费者): * A -> BC -> D * -> E -> F * */ public static void main(String[] args) { // 环形队列容量为16(2的4次方) int ringBufferSize = 16; MyDisruptor<OrderEventModel> myDisruptor = new MyDisruptor<>( new OrderEventProducer(), ringBufferSize, new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()), ProducerType.SINGLE, new MyBlockingWaitStrategy() ); MyEventHandlerGroup<OrderEventModel> hasAHandlerGroup = myDisruptor.handleEventsWith(new OrderEventHandlerDemo("consumerA")); hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerB"),new OrderEventHandlerDemo("consumerC")) .then(new OrderEventHandlerDemo("consumerD")); hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerE")) .then(new OrderEventHandlerDemo("consumerF")); // 启动disruptor中注册的所有消费者 myDisruptor.start(); MyRingBuffer<OrderEventModel> myRingBuffer = myDisruptor.getRingBuffer(); // 生产者发布100个事件 for(int i=0; i<100; i++) { long nextIndex = myRingBuffer.next(); OrderEventModel orderEvent = myRingBuffer.get(nextIndex); orderEvent.setMessage(""+i); orderEvent.setPrice(i * 10); System.out.println("生产者发布事件:" + orderEvent); myRingBuffer.publish(nextIndex); } }}
disruptor无论在整体设计还是最终代码实现上都有很多值得反复琢磨和学习的细节,希望能帮助到对disruptor感兴趣的小伙伴。
本篇博客的完整代码在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab5