记录一下阿里云消息服务与Spring的整合,以及ProducerId与ConsumerId的管理,其余的消息服务也是相似(RocketMQ、Kafka),阿里云消息服务性能仍是很可观的,虽然收费,单也推荐使用。html
消息服务的概念就不想多说了,须要的能够去看官方文档,参考文档。spring
首先建立topic,以下图填好信息就OK了。express
建立成功以后是这样api
上面那些步骤信息填完整以后topic、ProducerId、ConsumerId都建立好了就可使用消息队列了服务器
Producer的整合tcp
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown"> <property name="properties"> <!--生产者配置信息--> <props> <!-- 生成者ID,须要提早在阿里云建立 --> <prop key="ProducerId">PID-SIT-TransitHub-NotifyUnbind</prop> <!--请替换为本身的帐户信息--> <!-- AccessKey、SecretKey由阿里云分配 --> <prop key="AccessKey">LTAIqfzogBNFeohh11</prop> <prop key="SecretKey">zoahuhZKscEk5Q8Qtr</prop> <!-- 根据本身服务器选择不一样的tcp接入url,此处选择公网 --> <prop key="ONSAddr">http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet</prop> </props> </property> </bean>
Consumer的整合性能
<!-- 建立Listener将消费者处于阻塞状态,只要有本身topic订阅的消息发布消息立刻就会订阅到--> <bean id="tsmDeleteAidMsgListener" class="com.snowball.hub.msg.DataMessageListener" /> Listener配置 <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"> <property name="properties"> <props> <prop key="ConsumerId">CID-SIT-OPS-NotifyUnbind</prop> <prop key="AccessKey">${access_key}</prop> <prop key="SecretKey">${secret_key}</prop> <!--将消费者线程数固定为50个,该线程不会和主业务线程耦合--> <prop key="ConsumeThreadNums">50</prop> </props> </property> <property name="subscriptionTable"> <map> <entry value-ref="tsmDeleteAidMsgListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <!-- 此处填将以前建立的topic --> <property name="topic" value="snb-test-topic4" /> <property name="expression" value="*" /> <!-- expression即Tag,能够设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。 *仅表明订阅全部Tag,不支持通配 --> </bean> </key> </entry> 更多的订阅添加entry节点便可 </map> </property> </bean>
Consumer的整合和Producer基本一致,不一样的是须要建立一个Listener,做用已经在注释中说明。学习
上面只是整合了普通消息,阿里云MQ消息分很四种,每一种的整合API都不同,具体整合细节能够参考文章开始出的参考文档。阿里云
public class ProducerTest { //若是和spring整合了,那就直接注入就行了,本次使用传统的发布方式 //@Autowired //private Producer producer; //topic的管理最好作成可配置,能够对应不一样的环境管理不一样的topic,本次仍是使用传统的//方式发布 //@Value("#{configProperties['send_unbind_topic']}") //private String send_unbind_topic; public static void main(String[] args) { Properties properties = new Properties(); // 您在MQ控制台建立的Producer ID properties.put(PropertyKeyConst.ProducerId, "XXX"); // 鉴权用AccessKey,在阿里云服务器管理控制台建立 properties.put(PropertyKeyConst.AccessKey,"XXX"); // 鉴权用SecretKey,在阿里云服务器管理控制台建立 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置 TCP 接入域名(此处以公共云的公网接入为例) properties.put(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次便可 producer.start(); //循环发送消息 while(true){ Message msg = new Message( // // 在控制台建立的Topic,即该消息所属的Topic名称 "TopicTestMQ", // Message Tag, // 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 "TagA", // Message Body // 任何二进制形式的数据, MQ不作任何干预, // 须要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置表明消息的业务关键属性,请尽量全局惟一,以方便您在没法正常收到消息状况下,可经过MQ控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 // 打印Message ID,以便用于消息发送状态查询 SendResult sendResult = producer.send(msg); System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId()); } // 在应用退出前,能够销毁Producer对象 // 注意:若是不销毁也没有问题 producer.shutdown(); } }
消息发布成功能够看到sendResult是这样的信息url
{"messageId":"0200010546D011E87BD078ACF4180003","topic":"TPC-SIT-COM-TransitHub-NotifyUnbind"}
根据messageId能够定位这条消息的轨迹,能够很清晰的定位消息的消费轨迹。
public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在MQ控制台建立的Consumer ID properties.put(PropertyKeyConst.ConsumerId, "XXX"); // 鉴权用AccessKey,在阿里云服务器管理控制台建立 properties.put(PropertyKeyConst.AccessKey, "XXX"); // 鉴权用SecretKey,在阿里云服务器管理控制台建立 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置 TCP 接入域名(此处以公共云公网环境接入为例) properties.put(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"); Consumer consumer = ONSFactory.createConsumer(properties); //这个Listener若是以前已经在spring容器中注册过直接使用就行了,这里就不演示了 consumer.subscribe("TopicTestMQ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
消息的发布与订阅就这么多,要使用消息服务总结起来就四步。
消息的产品不少,阿里云的消息服务是目前互联网公司使用占比很大的,本次只是很简单介绍消息服务的使用,具体实现细节笔者也在学习中。