RocketMQ(6)---发送普通消息(三种方式)

发送普通消息(三种方式)

RocketMQ 发送普通消息有三种实现方式:可靠同步发送可靠异步发送单向(Oneway)发送html

注意顺序消息只支持可靠同步发送java

GitHub地址: https://github.com/yudiandemingzi/SpringBootBloggit

1、概念

一、可靠同步发送

原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应以后才发下一个数据包的通信方式。github

应用场景:此种方式应用场景很是普遍,例如重要通知邮件、报名短信通知、营销短信系统等。服务器

二、可靠异步发送

原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通信方式。 消息队列 RocketMQ 的异步发送,须要用户实现异步发送回调接口(SendCallback)。app

应用场景:异步发送通常用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如批量发货等操做。异步

三、单向(Oneway)发送

原理:单向(Oneway)发送特色为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时很是短,通常在微秒级别。async

应用场景:适用于某些耗时很是短,但对可靠性要求并不高的场景,例如日志收集。ide

四、三种对比

下表归纳了三者的特色和主要区别。函数

发送方式 发送 TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失


2、代码示例

一、三种方式代码示例

@Slf4j
@RestController
public class Controller {
    /**
     * 生产者组
     */
    private static String PRODUCE_RGROUP = "test_producer";
    /**
     * 建立生产者对象
     */
    private static DefaultMQProducer producer = null;

    static {
        producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr("47.99.03.25:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    @GetMapping("/message")
    public  void  message() throws Exception {
        //一、同步
        sync();
        //二、异步
        async();
        //三、单项发送
        oneWay();
    }
    /**
     * 一、同步发送消息
     */
    private  void sync() throws Exception {
        //建立消息
        Message message = new Message("topic_family", ("  同步发送  ").getBytes());
        //同步发送消息
        SendResult sendResult = producer.send(message);
        log.info("Product-同步发送-Product信息={}", sendResult);
    }
    /**
     * 二、异步发送消息
     */
    private  void async() throws Exception {
        //建立消息
        Message message = new Message("topic_family", ("  异步发送  ").getBytes());
        //异步发送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Product-异步发送-输出信息={}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //补偿机制,根据业务状况进行使用,看是否进行重试
            }
        });
    }
    /**
     * 三、单项发送消息
     */
    private  void oneWay() throws Exception {
        //建立消息
        Message message = new Message("topic_family", (" 单项发送 ").getBytes());
        //同步发送消息
        producer.sendOneway(message);
    }
}

二、测试结果

这里消费者代码就不贴出来了。

经过这个很明显能够看出三种方式都被 Consumer 消费了。只不过对于 Product 同步和异步发送是有返回信息的,单项发送是没有返回信息的。


参考

一、RocketMQ 阿里云官网文档



只要本身变优秀了,其余的事情才会跟着好起来(上将3)
相关文章
相关标签/搜索