disruptor 单生产者多消费者

demo1 单生产者多消费者建立。dom

maven 依赖maven

<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

 

1 对象 - Messageide

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message2 {
    private String id;
    private String name;
    private double price;
}

2 在主函数中建立 disruptor函数

Disruptor<Message2> disruptor = new Disruptor<>(
                new EventFactory<Message2>() {
                    @Override
                    public Message2 newInstance() {
                        return new Message2();
                    }
                },
                1 << 10,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new BusySpinWaitStrategy()
        );

 

3 disruptor 绑定消费者ui

// disruptor 绑定消费者
disruptor.handleEventsWith(new MessageHandler1());


//建立消费者
@Slf4j
public class MessageHandler1 implements EventHandler<Message2> {
    @Override
    public void onEvent(Message2 event, long sequence, boolean endOfBatch) throws Exception {
        event.setId(UUID.randomUUID().toString());
        log.info("【handler1,set id】 id: {}, name: {}, price: {}", event.getId(), event.getName(), event.getPrice());
    }
}

 

4 启动 disruptorthis

RingBuffer<Message2> ringBuffer = disruptor.start();

 

5 disruptor 绑定生产者spa

//绑定生产者
CountDownLatch latch = new CountDownLatch(1);
ExecutorService es = Executors.newFixedThreadPool(4);
es.submit(new MessagePublish2(disruptor, latch));

// 生产者类
public class MessagePublish2 implements Runnable {
    private Disruptor<Message2> disruptor;
    private CountDownLatch latch;

    public MessagePublish2(Disruptor<Message2> disruptor, CountDownLatch latch) {
        this.disruptor = disruptor;
        this.latch = latch;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            disruptor.publishEvent(new MessageEventTranslator());
        }
        latch.countDown();
    }
}

 

6 阻塞等待 & 关闭服务code

        // 阻塞等待
        latch.await();

        // 关闭服务
        es.shutdown();
        disruptor.shutdown();
相关文章
相关标签/搜索