MQ:
消息总线(Message Queue),是一种跨进程的通讯机制,用于上下游传递消息。在互联网架构中,MQ是一种很是常见的上下游“逻辑解耦+物理解耦”的消息通讯服务。使用MQ以后,消息发送上游只须要依赖MQ,逻辑上和物理上都不用依赖其余服务。
MQ的不足
(1)系统更加复杂,多了一个MQ组件
(2)消息传递路径更长,延时会增长
(3)消息可能会被重复消费
(4)上游没法知道下游的执行结果(所以,调用方实时依赖执行结果的业务场景,请使用调用,而不是MQ)docker
使用场景
(1)上游不关注执行结果
(2)上游关注结果,但执行时间比较长。举个例子,微信支付,跨公网调用微信的接口,执行时间会比较长,但调用方又很是关注执行结果,此时通常怎么玩呢?
通常采用“回调网关+MQ”方案来解耦:微信
a、调用方直接跨公网调用微信接口 b、微信返回调用成功,此时并不表明返回成功 c、微信执行完成后,回调统一网关 d、网关将返回结果通知MQ e、请求方收到结果通知
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,知足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard架构
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //声明并初始化一个producer //须要一个producer group名字做为构造方法的参数,这里为producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //设置NameServer地址,此处应改成实际NameServer地址,多个地址之间用;分隔 //NameServer的地址必须有,可是也能够经过环境变量的方式设置,不必定非得写死在代码里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //调用start()方法启动一个producer实例 producer.start(); //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //调用producer的send()方法发送消息 //这里调用的是同步的方式,因此会有返回结果 SendResult sendResult = producer.send(msg); //打印返回结果,能够看到消息发送的状态以及一些相关信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //发送完消息以后,调用shutdown()方法关闭producer producer.shutdown(); } }
在开发过程当中,若是想测试生产者是否发出了mq,能够编写一个消费者进行测试分布式
@Test public void testMqConsumer() throws Exception { String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876"; int threadNum = 5; String topics = "WechatUnionCoreTemplateNotifyTopic"; String instanceName = "TemplateComsumer"; String groupName = "wechatUnionTemplateNotifyConsumer"; DefaultMQPushConsumer consumer = null; consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(rocketmqAddress);//MQ地址 consumer.setClientCallbackExecutorThreads(threadNum);//消费现场数量 consumer.setInstanceName(instanceName);//实例名称 consumer.subscribe(topics, "*"); //注册监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (int i = 0; i < msgs.size(); i++) { MessageExt msgExt = msgs.get(i); String msgId = msgExt.getMsgId(); Integer flag = msgExt.getFlag(); TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class); logger.info("receive new Msg: " + " msgId=" + msgId + " flag=" + flag + " templateNotifyItem=" + templateNotifyItem); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); logger.info("监听执行中"); Thread.sleep(1000000); }
参考:
http://blog.csdn.net/manzhizh...
https://www.jianshu.com/p/824...
架构师之路ide