Disruptor (5) - 使用场景

handleEventsWith & handleEventsWithWorkerPool

在disruptor框架调用start方法以前,须要将消息的消费者指定给disruptor框架。java

  1. disruptor.handleEventsWith(EventHandler... handlers),将多个EventHandler的实现类传入方法,封装成一个EventHandlerGroup
  2. disruptor.handleEventsWithWorkerPool(WorkHandler... handlers),将多个WorkHandler的实现类传入方法,封装成一个EventHandlerGroup

不一样点

  1. handleEventsWith方法的EventHandlerGroup中的每一个消费者都会对同一条消息m进行消费,各个消费者之间不存在竞争。
  2. handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消费者对于同一条消息m不重复消费;若是c0消费了消息m,则c1再也不消费消息m。

对于独立消费的消费者,应当实现EventHandler接口。对于不重复消费的消费者,应当实现WorkHandler接口。
从代码层面而言, 有不一样的具体实现来支持不一样的模式app

  1. ConsumerInfo
  2. EventProcessor

消费场景

此处的测试代码的对接口WorkHandler 进行了改造。
框架

package com.lmax.disruptor.noob;

import java.time.Instant;
import java.time.format.DateTimeFormatter;

public class CompareTest {
	public static int THREAD = 2; // 线程数量
	public static int PER = 1; // 单个线程生产数量
	public static int TOTAL_COUNT = THREAD * PER; // 数据总量
	public static int SIZE = 4; // 最大容量

	public static void main(String[] args) {
		println("线程数:" + THREAD + " 单线程生产量: " + PER + " 容量:" + SIZE + " 数据总量:" + TOTAL_COUNT);
		 DisruptorTest.execute();
	}

	public static void println(String msg) {
		System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "[" + Thread.currentThread().getName() + "] " + msg);
	}
}

---------
import java.util.concurrent.ThreadFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorTest {

	public static void execute() {

		Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE,
				new ThreadFactory() {
			       AtomicInteger count = new AtomicInteger(0);

					@Override
					public Thread newThread(Runnable eventProcessor) {
						CompareTest.println("EventProcessor wrapper");// 对事件处理总线的封装
						Thread thread = new Thread(eventProcessor);
						thread.setName("EventProcessor" + count.incrementAndGet());
						return thread;
					}
				});
		/**
		 * 建立EventProcessors<Runnable>.
		 * 子过程Disruptor.checkNotStarted()事件处理handler必须在启动以前绑定.
		 */
		disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),new DataEventHandler("dataEventHandler2"));
		 // disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"),new DataWorkHandler("dataWorkHandler2"));

		disruptor.start();
		CompareTest.println("disruptor start success!");

		RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();
		DataProducer producer = new DataProducer(ringBuffer);
		DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer);
		long start = System.currentTimeMillis();

		for (int l = 0; l < CompareTest.THREAD; l++) {
			new Thread(() -> {
				for (int m = 0; m < CompareTest.PER; m++) {
					producer.onData(start);
					// translator.onData(start); 推荐用这种方式作。
				}
			}).start();
		}
	}
}

----------
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.EventHandler;

public class DataEventHandler implements EventHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);
	public String name = null;

	public DataEventHandler(String name) {
		this.name = name;
	}

	@Override
	public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
		Thread.sleep(name.contentEquals("dataEventHandler1") ? 1000 : 100);
		CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence
				+ " count:" + count.incrementAndGet() + "  Disruptor 总耗时:"
				+ (System.currentTimeMillis() - event.getStartTime()));
	}

}

----------
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.WorkHandler;

public class DataWorkHandler implements WorkHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);
	public String name = null;

	public DataWorkHandler(String name) {
		this.name = name;
	}

	@Override
	public void onEvent(DataEvent event, long sequence) throws Exception {
		Thread.sleep(name.contentEquals("dataWorkHandler2") ? 100 :1000);
		CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence + " count:" + count.incrementAndGet()
				+ "  Disruptor 总耗时:" + (System.currentTimeMillis() - event.getStartTime()));
	}
}
  1. handleEventsWith 同一消息被不一样handler独立消费。 此时handler处理是无序的。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));

  2. 依赖串行.  对于同一消息前handler处理完结,后handler才处理。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1")).then(new DataEventHandler("dataEventHandler2")).then(new DataEventHandler("dataEventHandler3"));

  3. handleEventsWithWorkerPool 不重复消费。 
    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"), new DataWorkHandler("dataWorkHandler2"));
  4. 组合方式ide

    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4")).then(new DataEventHandler("dataEventHandler1"),
    				new DataEventHandler("dataEventHandler2"));

     

    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),
    				new DataEventHandler("dataEventHandler2")).thenHandleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4"));

相关文章
相关标签/搜索