说实话,最近仍是比较忙的,手上素材却是一大把,可是大多只是初步整理了。可是博客这种东西仍是要写的,果真后面仍是要放低一下排版要求(扩展性的一些东西也少提一些)。java
消息队列这个东西,其实网上的资料仍是不少的。我就简单说一些本身的认识与源代码哈。spring
我是很喜欢了解技术演进的,由于演进的过程展示了前辈们的智慧。框架
最先的程序串行执行就不说了。异步
程序调用中的方法调用,每每调用方与被调用方都存在与同一内存空间(从Java角度说,都是在同一JVM中),因此方法调用的逻辑不会太复杂。简单来讲,就是调用方(Java中其实就是目标对象)将被调用方压入Java虚拟机栈,从而执行(详见JVM)。或者等我何时,把我有关JVM的笔记贴出来(嘿嘿)。分布式
后来呢,就是出现了对非本地JVM方法调用的需求(举个例子,我须要调用第三方的方法,若是每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了RPC与RMI的一个须要。那么在Java中就出现了一个stub的技术,定义好后,相关方法就像调用本地同样(详见《Head First Java》相关章节)。固然了,这个时候已经有了中间件的概念了,因此也就有了CORBA等框架。谈到中间件,感兴趣的,能够去查询一下当时主流的中间件分类(如RPC,RMI,MOM,TPM,ORB)。ide
那么到了如今呢,分布式系统的通讯能够按照同步与异步分为两大支柱。之因此这么理解,是由于分布式系统每每同步通讯与异步通讯都是须要的。简单提一下,同步通讯业务逻辑相对简单,实现快速,能够实时得到回应,但耦合度较高。异步通讯耦合度低,并能够进行消息堆积,消峰,但没法实时获取回应,业务逻辑复杂,从而提升系统复杂度(尤为当一条业务线与多层异步逻辑)等。以后有机会,我会举例细述。函数
固然了,在本篇中,只简单谈一下异步通讯的主流实现-消息队列。学习
选择方面,我就很少说了,目前只用过RabbitMq,RocketMq,Kafka。网上有关消息队列选择的文章不少,很细致,我就不赘述了。fetch
这里贴出来的都是实际生产代码(若是内部版本也算的话,嘿嘿),因此若是有一些不是很熟悉的类,请查看import,是不是项目自身的类。或者也能够直接询问我。this
这里的初步实现,是根据RabbitMq的原生方法进行编写(详细参考:《RabbitMQ实战指南》第一章的两个代码清单及第二章的相关解释)。
package com.renewable.gateway.rabbitmq.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.renewable.gateway.pojo.Terminal; import com.renewable.gateway.util.JsonUtil; import com.renewable.gateway.util.PropertiesUtil; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.renewable.gateway.common.constant.RabbitmqConstant.*; /** * @Description: * @Author: jarry */ @Component("TerminalProducer") public class TerminalProducer { private static final String IP_ADDRESS = PropertiesUtil.getProperty(RABBITMQ_HOST); private static final int PORT = Integer.parseInt(PropertiesUtil.getProperty(RABBITMQ_PORT)); private static final String USER_NAME = PropertiesUtil.getProperty(RABBITMQ_USER_NAME); private static final String USER_PASSWORD = PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD); private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol"; public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(USER_PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null); channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null); channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY); String terminalStr = JsonUtil.obj2String(terminal); channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes()); channel.close(); connection.close(); } }
package com.renewable.gateway.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.gateway.common.GuavaCache; import com.renewable.gateway.common.ServerResponse; import com.renewable.gateway.pojo.Terminal; import com.renewable.gateway.service.ITerminalService; import com.renewable.gateway.util.JsonUtil; import com.renewable.gateway.util.PropertiesUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.renewable.gateway.common.constant.CacheConstant.TERMINAL_MAC; import static com.renewable.gateway.common.constant.RabbitmqConstant.*; /** * @Description: * @Author: jarry */ @Component public class TerminalConsumer { @Autowired private ITerminalService iTerminalService; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE = "queue-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_BINDINGKEY = "terminal.config.centcontrol2terminal"; @PostConstruct public void messageOnTerminal() throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[]{ new Address(PropertiesUtil.getProperty(RABBITMQ_HOST)) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(PropertiesUtil.getProperty(RABBITMQ_USER_NAME)); factory.setPassword(PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD)); Connection connection = factory.newConnection(addresses); final Channel channel = connection.createChannel(); channel.basicQos(64); // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(经常使用于限流) Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1.接收数据,并反序列化出对象 Terminal receiveTerminalConfig = JsonUtil.string2Obj(new String(body), Terminal.class); // 2.验证是不是该终端的消息的消息 // 避免ACK其余终端的消息 if (receiveTerminalConfig.getMac() == GuavaCache.getKey(TERMINAL_MAC)) { // 业务代码 ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(receiveTerminalConfig); if (response.isSuccess()) { channel.basicAck(envelope.getDeliveryTag(), false); } } } }; channel.basicConsume(TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE, consumer); // 等回调函数执行完毕后,关闭资源 // 想了想仍是不关闭资源,保持一个监听的状态,从而确保配置的实时更新 // TimeUnit.SECONDS.sleep(5); // channel.close(); // connection.close(); } }
这是早期写的一个demo代码,是直接参照源码的。若是是学习RabbitMq的话,仍是建议手写一下这种比较原始的程序,了解其中每一个方法的做用,从而理解RabbitMq的思路。若是条件容许的话,还能够查看一下RabbitMq的底层通讯协议-AMQP(若是不方便下载,也能够私聊我)。
固然,此处能够经过@Value直接导入相关配置(乃至到了SpringCloud后,能够经过@Refreshscope等实现配置自动更新)。
package com.renewable.terminal.rabbitmq.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.renewable.terminal.pojo.Terminal; import com.renewable.terminal.util.JsonUtil; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Description: * @Author: jarry */ @Component("TerminalProducer") public class TerminalProducer { private static String rabbitmqHost = "47.92.249.250"; private static String rabbitmqUser = "admin"; private static String rabbitmqPassword = "123456"; private static String rabbitmqPort = "5672"; private static final String IP_ADDRESS = rabbitmqHost; private static final int PORT = Integer.parseInt(rabbitmqPort); private static final String USER_NAME = rabbitmqUser; private static final String USER_PASSWORD = rabbitmqPassword; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol"; public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(USER_PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null); channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null); channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY); String terminalStr = JsonUtil.obj2String(terminal); channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes()); channel.close(); connection.close(); } }
package com.renewable.terminal.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.terminal.Init.SerialSensorInit; import com.renewable.terminal.Init.TerminalInit; import com.renewable.terminal.common.GuavaCache; import com.renewable.terminal.common.ServerResponse; import com.renewable.terminal.pojo.Terminal; import com.renewable.terminal.service.ITerminalService; import com.renewable.terminal.util.JsonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException; import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_ID; import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_MAC; /** * @Description: * @Author: jarry */ @Component @Slf4j public class TerminalConsumer { @Autowired private ITerminalService iTerminalService; @Autowired private SerialSensorInit serialSensorInit; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.centcontrol2terminal"; //TODO_FINISHED 2019.05.16 完成终端机TerminalConfig的接收与判断(ID是否为长随机数,是否须要从新分配) @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, declare = "true"), exchange = @Exchange(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, declare = "true", type = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE), key = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY )) @RabbitHandler public void messageOnTerminal(@Payload String terminalStr, @Headers Map<String, Object> headers, Channel channel) throws IOException { Terminal terminal = JsonUtil.string2Obj(terminalStr, Terminal.class); if (terminal == null){ log.info("consume the null terminal config !"); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } if (!GuavaCache.getKey(TERMINAL_MAC).equals(terminal.getMac())){ log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getMac(), GuavaCache.getKey(TERMINAL_MAC)); return; } // 2.业务逻辑 ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(terminal); log.info("start serialSensorInit"); serialSensorInit.init(); // 3.确认 if (response.isSuccess()) { Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } } }
# rabbitmq 消费端配置 spring: rabbitmq: listener: simple: concurrency: 5 max-concurrency: 10 acknowledge-mode: manual # 限流 prefetch: 1 host: "localhost" port: 5672 username: "admin" password: "123456" virtual-host: "/" connection-timeout: 15000
这里不得不赞一下Spring,它经过提供RabbitMq地封装API-ampq,极大地简化了消息队列的代码。其实上述方法就是经过ampq的注解与yml配置来迅速实现RabbitMq的使用。
固然,这里还有不少的提高空间。好比说,经过@Bean注解(创建目标配置)与公用方法提取,能够有效提升代码复用性。
这段代码并非线上的代码,而是慕课网学习时留下的代码。主要实际生产中并无使用SpringStream,但这确实是认识事件驱动模型的要给很好途径。
```java
package com.imooc.order.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @Description: * @Author: jarry */ public interface StreamClient { String INPUT = "myMessage"; String INPUT2 = "myMessageACK"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); @Input(StreamClient.INPUT2) SubscribableChannel input2(); @Output(StreamClient.INPUT2) MessageChannel output2(); }
```
package com.imooc.order; import org.junit.Assert; import org.junit.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * @Author: jarry */ @Component public class MqSenderTest extends OrderApplicationTests{ @Autowired private AmqpTemplate amqpTemplate; @Test public void send(){ amqpTemplate.convertAndSend("myQueue", "now: " + new Date()); Assert.assertNotNull(new Date()); } }
package com.imooc.order.message; import com.imooc.order.dto.OrderDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; /** * @Description: * @Author: jarry */ @Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { // @StreamListener(StreamClient.INPUT) // public void process(Object message){ // log.info("StreamReceiver: {}", message); // } @StreamListener(StreamClient.INPUT) // 增长如下注解,能够在INPUT消息消费后,返回一个消息。说白了就是RabbitMq对消息消费后的确认回调函数(貌似叫这个,意思就这样,以后细查) @SendTo(StreamClient.INPUT2) public String process(OrderDTO message){ log.info("StreamReceiver: {}", message); return "received."; } @StreamListener(StreamClient.INPUT2) public void process2(String message){ log.info("StreamReceiver2: {}", message); } }
在学习技术的过程当中,一方面不断地感觉到本身对技术了解的不足,另外一方面则是发现更重要的是系统设计中技术选型的权衡。