reactor-rabbitmq小试牛刀

本文主要研究一下如何使用reactor-rabbitmqreact

maven

<dependency>
            <groupId>io.projectreactor.rabbitmq</groupId>
            <artifactId>reactor-rabbitmq</artifactId>
            <version>1.0.0.M2</version>
        </dependency>

rabbitmq

实例

@Test
    public void testProducer() throws InterruptedException {
        int count = 100;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.useNio();
        connectionFactory.setUsername("myuser");
        connectionFactory.setPassword("mypass");
        SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-sender"))
                .resourceCreationScheduler(Schedulers.elastic());
        Sender sender = ReactorRabbitMq.createSender(senderOptions);
        Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
                .map(i -> new OutboundMessage("", QUEUE, ("Message_" + i).getBytes())));

        CountDownLatch latch = new CountDownLatch(count);

        sender.declareQueue(QueueSpecification.queue(QUEUE))
                .thenMany(confirmations)
                .doOnError(e -> LOGGER.error("Send failed", e))
                .subscribe(r -> {
                    if (r.isAck()) {
                        LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
                        latch.countDown();
                    }
                });

        latch.await(10, TimeUnit.SECONDS);
        sender.close();
    }

    @Test
    public void testConsumer() throws InterruptedException {
        int count = 100;
        CountDownLatch latch = new CountDownLatch(count);

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.useNio();
        connectionFactory.setUsername("myuser");
        connectionFactory.setPassword("mypass");
        SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-sender"))
                .resourceCreationScheduler(Schedulers.elastic());

        Sender sender = ReactorRabbitMq.createSender(senderOptions);
        Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));

        ReceiverOptions receiverOptions = new ReceiverOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-receiver"))
                .connectionSubscriptionScheduler(Schedulers.elastic());
        Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions);
        Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
        Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> {
            LOGGER.info("Received message {}", new String(m.getBody()));
            latch.countDown();
        });

        latch.await(10, TimeUnit.SECONDS);

        disposable.dispose();
        sender.close();
        receiver.close();
    }
  • 因为设置了帐号密码,于是须要在ConnectionFactory那里指定帐号密码
  • 另外因为使用了rabbitmq集群,于是经过connectionSupplier指定要链接的多个rabbitmq地址
  • 这里无论是producer仍是consumer,都经过queueDeclaration进行操做

小结

reactor-rabbitmq对rabbitmq的api进行封装,改造为reactive streams模式,提供了Non-blocking Back-pressure以及End-to-end Reactive Pipeline特性。git

doc

相关文章
相关标签/搜索