本快速上手手册,指的是在本地计算机上设置RocketMQ消息传递系统,并能作基本生产和消费的详细说明。java
在这里能够下载 4.4.0 正式版的源代码. 也能够在这里下载一个二进制版本git
如今执行如下的命令来解包4.4.0源版本并构建二进制包组件.github
> unzip rocketmq-all-4.4.0-source-release.zip
> cd rocketmq-all-4.4.0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq
复制代码
mvn -Prelease-all -DskipTests clean install -U
命令将会去构建下载的源代码包,而后在distribution目录中生产target目录,构建的包就在这里.sql
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup sh bin/mqnamesrv &
[1] 5273
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup: 忽略输入并把输出追加到'nohup.out'
tail -f ~/logs/rocketmqlogs/namesrv.log
2019-03-31 19:24:10 INFO main - tls.client.keyPath = null
2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null
2019-03-31 19:24:10 INFO main - tls.client.certPath = null
2019-03-31 19:24:10 INFO main - tls.client.authServer = false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created for server
2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started
2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started
2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON
2019-03-31 19:25:11 INFO NSScheduledThread1 - --------------------------------------------------------
2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE: 0
复制代码
能够看到 The Name Server boot success
就能够知道是启动成功了,而且序列化方式是JSON.shell
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ nohup sh bin/mqbroker -n localhost:9876 &
[1] 5784
nohup: 忽略输入并把输出追加到'nohup.out'
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ tail -f ~/logs/rocketmqlogs/broker.log
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
复制代码
能够看到broker已经成功注册到name server中了express
同时,能够经过jps命令来查看服务是否启动成功apache
charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup
复制代码
能够看到,NameServer和Broker都已经启动成功了,因此咱们就能够进行下一步模拟发送者和消费者了。编程
使用编程的方式向topic(TopicTest)中生产消息的时候, Broker发现这个topic是没有的,那么broker默认去建立topic(TopicTest),并配置了默认的配置。 producer中显示的是我本地的局域网的地址(192.168.3.16)promise
2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /192.168.3.16:47538
2019-03-31 20:37:43 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[192.168.3.16:47538]
2019-03-31 20:37:43 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933]
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
复制代码
当建立一个producer的时候能够看到时先会建立一个 CLIENT_INNER_PRODUCER
producer在groupChannelTable
中而后再建立客户端的一个producer 即producer1
注册到groupChannelTable
中,从中能够看到这个producer中的一些信息。但produer关闭shutdown的时候。先关闭客户端的producer1
而后再groupChannelTable从CLIENT_INNER_PRODUCER
移除。 代理会不定时的向nameserver进行注册。bash
当客户端建立一个消费者的时候,以下图代理输出的日志,能够看到当有消费者的时候,会建立一个订阅组,并建立订阅组的配置信息,而后在新额消费者链接以后,会添加topi到对应的group中,其中有你订阅的时topic(Topic), 同时会添加一个重试topicz这个topic是按照%RETRY%消费者group名称
命名的,并添加上订阅。 同时也会建立一个新的producer, 这个producer是CLIENT_INNER_PRODUCER
2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]]] 192.168.3.16:48236
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412]
2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
复制代码
在没有建立任何的topic的时候时候,namesever中输出的日志能够看到, Roacket MQ 中默认建立了许多的topic
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:48210
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:48210]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, charse-thinkpad QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new broker registered, 192.168.3.16:10911 HAServer: 192.168.3.16:10912
2019-03-31 19:40:12 INFO RemotingExecutorThread_4 - new topic registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
复制代码
当用客户端进行建立一个topic(TopicTest)的时候,能够看到,在nameserver中,这个topic已经注册上去了。
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49208
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - new topic registered, TopicTest QueueData [brokerName=charse-thinkpad, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelUnregistered, the channel[127.0.0.1:49208]
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49224
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49224]
2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - new topic registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
复制代码
当消费者关闭时,会将客户端的消费者consumer1
从consumerGroupInfo
中进行注销.而后将CLIENT_INNER_PRODUCER
从groupChannelTable
中进行注销.
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]]] 192.168.3.16:49810
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801]
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: consumer1 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - processRequestWrapper response to /192.168.3.16:49810 failed java.nio.channels.ClosedChannelException: null at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
复制代码
maven 方式:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
复制代码
可靠的同步传输普遍应用于重要的通知消息,短信通知,短信营销系统等场景.
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
复制代码
异步传输一般用于响应时间敏感的业务场景.
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
复制代码
单向传输用于须要中等可靠的状况,如日志收集.
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
复制代码
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
复制代码
关于更多关于rocketmq的使用实例能够查看这里.
RocketMQ提供先进先出的顺序消息队列.下面例子中将会显示全局和部分有序消息的发送/接收.
发送消息例子
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
复制代码
订阅消息例子
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
复制代码
广播是向主的全部订阅者发送消息,若是您但愿全部订阅这都收到有关某个主题的消息,则广播是一个不错的选择.
生产例子
ublic class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
复制代码
消费例子
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
复制代码
定时消息与普通消息不一样,由于他们在设定的延迟时间以后才会传递.
1.启动消费者以等待传入的订阅消息
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
复制代码
2.发送定时消息
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
复制代码
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。 能够观察到,消息被消费的时间将会比它存储的时间晚10s.
为何批量发送?
批量的发送消息能够提升较小消息提传输性能.
使用限制
同一批的消息应该具备: 相同的主题,相同的waitstoremsgok, 而且不支持定时消息.另外, 每次发送消息体的总大小不该该超过1MB。
如何使用批量发送
若是一次只发送不超过1MB字节的消息,那么批量使用就很容易了.
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
复制代码
大数量量拆分为list
批量发送的复杂性只有在发送大批量消息时才会增长,而且可能没法肯定批量发送的消息体它是否超越了1MB的大小限制.这个时候你就须要将消息拆分为List
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
}
复制代码
在大多数状况下,标记是一种简单而有用的设计,用于选择所需的消息.例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
复制代码
上面的实例中,消费者将收到包含TAGA或TAGB或TAGC的消息,但限制是一条消息只能有一个标记,这可能不适用于复杂的场景.在这种状况下,可使用SQL表达式筛选出消息.
原理
SQL功能能够经过发送消息时输入的属性进行一些计算.在rocketmq定义的语法下模.你能够实现一些有趣的逻辑
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
复制代码
语法
rocketmq 只定义了一些基本语法来支持这个特性,你也能够很容易地扩展它.
常量类型:
使用限制
只有推送使用这能够经过sql92选择消息.接口以下:
public void subscribe(final String topic, final MessageSelector messageSelector)
复制代码
生产示例
发送时能够经过方法putUserProperty将属性放入消息中.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
复制代码
消费实例
使用MessageSelector.bySql来消费消息.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
复制代码
开放消息, 其中包括指定行业指南和消息传递,流式规范,为金融,电子商务,物联网和大数据领域提供通用框架.设计原则是面向云,简单,灵活和独立于语言的异构环境.符合这些规范将使跨全部主要平台和操做系统开发异构消息传递应用程序成为可能.
RocketMQ提供了OpenMessaging 0.1.0-alpha的部分实现,下面的实例中演示了基于OpenMessaging访问RocketMQ.
OMSProducer
下面的实例中将展现如何使用RocketMQ发送同步消息,异步消息,单向传输消息.
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
producer.shutdown();
messagingAccessPoint.shutdown();
}
}
复制代码
OMSPullConsumer
使用OMSPullConsumer从一个特殊的队列中拉取消息
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
consumer.startup();
System.out.printf("Consumer startup OK%n");
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}
复制代码
OMSPushConsumer
将OMS PushConsumer附加到指定队列,并按MessageListenner使用消息.
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
}
}
复制代码
能够将其视为两阶段提交消息实现,以确保分布式系统中的最终一致性. 事务性消息保证了本地事务的执行和消息的发送能够自执行.
事务消息的状态(Transactional status)
建立事务消息
使用TransactionMQProducer类建立producer客户端,并指定一个唯一的producerGroup,您能够设置一个自定义线程池来处理检查请求。执行本地事务后,你须要回复MQ根据执行结果,和应答状态是在上面的部分中描述。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
复制代码
消费事务消息
“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回前一节中提到的三个事务状态之一。 “checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三个事务状态之一。
import ...
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
复制代码