只有发送消息设置了tags,消费方在订阅消息时,才能够利用tags在broker作消息过滤java
message.setTags("TagA");
● 少数生产者使用异步发送方式(3~5个就够了)
● 经过setInstanceName方法,给每一个生产者设置一个实例名shell
sendStatus
类里定义
● SEND_ OK : 消息发送成功
● FLUSH_ DISK_ _TIMEOUT: 消息发送成功, 可是服务器刷盘超时,消息已经进入
服务器队列,只有此时服务器宕机,消息才会丢失
● FLUSH_ SLAVE_ TIMEOUT: 消息发送成功,可是服务器同步到Slave时超时,
消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
● SLAVE_ NOT_ AVAILABLE: 消息发送成功, 可是此时slave不可用, 消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失编程
● 若是状态是FLUSH_ DISK_ TIMEOUT或FLUSH SLAVE_ _TIMEOUT,而且Broker正好关闭
此时,能够丢弃这条消息,或者重发。但建议最好重发,由消费端去重segmentfault
● Producer向Broker发送请求会等待响应,但若是达到最大等待时间,未获得响应,则客户端将抛出RemotingTimeoutException
● 默认等待时间是3秒,若是使用send(msg, timeout),则能够本身设定超时时间,
但超时时间不能设置过小,应为Borker须要一些时间来刷新磁盘或与从属设备同步
● 若是该值超过syncFlushTimeout,则该值可能影响不大,由于Broker可能会在超时以前返回FLUSH_ SLAVE_ TIMEOUT或FLUSH_ SLAVE_ TIMEOUT的响应bash
Producer的send方法自己支持内部重试:
● 至多重试3次
● 若是发送失败,则轮转到下一-个Broker
● 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s
因此,若是自己向broker发送消息产生超时异常,就不会再作重试服务器
以上策略仍然不能保证消息必定发送成功,为保证消息必定成功,建议将消息存储到db,由后台线程定时重试,保证消息必定到达Broker
每一个消息在业务层面的惟一标识码,要设置到keys字段,方便未来定位消息丢失问题dom
服务器会为每一个消息建立索引(哈希索引),应用能够经过topic, key来查询这条消息内容,以及消息被谁消费异步
因为是哈希索引,请务必保证key尽量惟一,这样能够避免潜在的哈希冲突maven
String orderld =“1250689524981"; message.setKeys(orderld);
console客户端使GUI分布式
--server.port=8081 --rocketmq.config. namesrvAddr=192.168.1.17:9876
不一样的消费群体能够独立地消费一样的主题,而且每一个消费者都有本身的消费偏移量(offsets) 。
确保同一组中的每一个消费者订阅相同的主题
消费者将锁定每一个MessageQueue,以确保每一个消息被一个按顺序使用。
这将致使性能损失
若是关心消息的顺序时,它就颇有用了。不建议抛出异常,能够返回
ConsumeOrderlyStatus. SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT代替
对于MessageListenerConcurrently,能够返回RECONSUME_ LATER告诉消费者,当前不能消费它而且但愿之后从新消费。而后能够继续使用其余消息
对于MessageListenerOrderly, 若是关心顺序,就不能跳过消息,能够返回SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT来告诉消费者等待片刻。
不建议阻塞Listener,由于它会阻塞线程池,最终可能会中止消费程序
DefaultMQPushConsumer
消费者使用一个ThreadPoolExecutor来处理内部的消费,所以能够经过设
置更改它
● 当创建一个新的Consumer Group时,须要决定是否须要消费Broker中已经
存在的历史消息。
● CONSUME_ FROM LAST_ OFFSET将忽略历史消息,并消费此后生成的任何
内容。
● CONSUME_ FROM_ FIRST_ OFFSET将消耗Broker中存在的全部消息。还可使用CONSUME_ FROM_ TIMESTAMP 来消费在指定的时间戳以后生成的消息。
RocketMQ没法避免消息重复,若是业务对重复消费很是敏感,务必在业务层面作去重:
● 经过记录消息惟一键进行去重
● 使用业务层面的状态机制去重
在Apache RocketMQ中,NameServer用于协调分布式系统的每一个组件,主要经过管理主题路由信息
来实现协调。
管理由两部分组成:
所以,在启动brokers和clients以前,咱们须要告诉他们如何经过给他们提
供的一个名称服务器地址列表来访问名称服务器。
在Apache RocketMQ中,能够用四种方式完成。
namesrvAddr=name-server-ip1:port;name-server-ip2:port
DefaultMQProducer producer = new DefaultMQProducer(" please_ rename_ unique_ group name"); producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" please_ rename_ unique_ _group_ name"); consumer.setNamesrvAddr(" name-server1-ip:port;name-server2-ip:port");
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
sh mqadmin -n localhost:9876 clusterList
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(" please_ rename_ _unique_ group_ _name"); defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
NameServer的地址列表也能够经过java参数rocketmq.namesrv.addr
在启动以前指定
能够设置NAMESRV_ ADDR环境变量。若是设置了,Broker和clients将检 查并使用其值
若是没有使用前面提到的方法指定NameServer地址列表,Apache RocketMQ将每2分钟发送一次HTTP请求,以获取和更新NameServer地址列表,初始延迟10秒。
默认状况下,访问的HTTP地址是:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
经过Java参数rocketmq.namesrv.domain,能够修改jmenv.tbsite.net
经过Java参数rocketmq.namesrv.domain.subgroup,能够修改nsaddr
编程方式> Java参数>环境变量> HTTP方式
推荐使用JDK 1.8版本,使用服务器编译器和8g堆。
设置相同的Xms和Xmx值,以防止JVM动态调整堆大小以得到更好的性能。
简单的JVM配置以下所示:
-server -Xms8g -Xmx8g -Xmn4g
若是不关心Broker的启动时间,能够预先触摸Java堆,以确保在JVM初始化期间分配页是更好的选择。
-XX:+AlwaysPreTouch
-XX: UseBiasedL ocking
-XX:+UseG1GC -XX:G1HeapRegionSize= 16m -XX:G lReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
这些GC选项看起来有点激进,但事实证实它在生产环境中具备良好的性能。
-XX:MaxGCPauseMillis不要设置过小的值,不然JVM将使用一个小的新生代,这将致使很是频繁的新生代GC。
-XX:+UseGCLogFileRotation -Xx:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
-Xloggc:/dev/shm/mq_ gc. _%p.log
https://www.kernel.org/doc/Do...
本文由博客一文多发平台 OpenWrite 发布!