概念: java
分布式消息队列web
‘分布式消息队列’包含两个概念ajax
一是‘消息队列’,二是‘分布式’spring
那么就先看下消息队列的概念,和为何须要分布式apache
消息队列的定义vim
“消息”指进程间传送的数据缓存
“队列”是在消息的传输过程当中保存消息的容器安全
消息被发送到队列中,消息队列充当中间人,将消息从源发送给目标服务器
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致时,就须要消息队列,做为抽象层,弥合双方的差别websocket
例如
(1)服务员点菜快,厨师作菜慢,服务员只须要下单给厨师,而后就能够继续去服务顾客,不须要等待厨师把菜作完
点菜单就至关于消息,放单子的位置就至关于队列
(2)业务系统须要发短信,但短信发送模块速度跟不上,业务系统就能够把发送短信的相关信息封装为一个消息,放入队列,短信发送模块从队列中获取消息进行处理
消息队列的好处
(1)提升系统响应速度
使用了消息队列,生产者一方,把消息往队列里一扔,就能够立马返回响应用户了,无需等待处理结果
(2)保证消息的传递
若是发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它
(3)解耦
只要消息格式不变,即便接收者的接口、位置、或者配置改变,也不会给发送者带来任何改变
消息发送者无需知道消息接收者是谁,使得系统设计更清晰
为何须要分布式消息队列
(1)多系统协做须要分布式
例如消息队列中的数据须要在多个系统间共享,因此须要提供分布式通讯机制、协同机制
(2)可靠
消息会被持久化到分布式存储中,这样避免了单台机器存储的消息因为机器问题致使消息的丢失
(3)可扩展
分布式消息队列,会随着访问量的增长而方便的增长处理服务器
如今电商网站某个抢购活动,并发怎么办?消息队列
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。有玩RabbitMq的哥们,多多交流!
用户请求会post到后台一些信息如(用户信息,商品信息)到消息队列中。
消息队列经过Windows服务去处理解析过来的信息,单线程处理(多线程可能会出现问题,你懂得)!
成功的话,插入到本次活动的成功记录表里面;失败的话,插入到有意购买表(方便业务人员销售)!
怎样通知用户?
1,ajax异步去请求(隔两分钟去请求一次成功记录,若是不请求库的话咱们会用Redis缓存)
2,长链接的方式(websocket,signalr之类的)
在整套的微服务架构中, 消息队列是不可或缺的部分, 它可以起到线程内同步或者异步调用没法达到的做用,优缺点分别是:
优势:
缺点:
多出一个环节,须要保证消息队列的可用性。
目前经常使用的消息队列大概有三种类型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它们的使用场景分别是:
1.RabbitMQ: 相对重量级高并发的状况,好比数据的异步处理 任务的串行执行等.
2.Kafka: 基于Pull的模式来处理,具体很高的吞吐量,通常用来进行 日志的存储和收集.
3.Redis: 轻量级高并发,实时性要求高的状况,好比缓存,秒杀,及时的数据分析(ELK日志分析框架,使用的就是Redis).
在SpingBoot中对这个三种都有支持
Redis
请参照 其余做者的 文章 http://www.jianshu.com/p/a2ab17707eff
RabbitMQ
i. 配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
ii.实现方式 RabbitMQDemoConfigration.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * RabbitMQDemo * * @author: sunjie * @date: 16/01/10 */
@Configuration
@SuppressWarnings("SpringJavaAutowiringInspection")
public class RabbitMQDemoConfigration {
@Bean
public CachingConnectionFactory myConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.1.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/myHost");
return connectionFactory;
}
// 生成CachingConnectionFactory 也可使用下面的方式,在application.properties
// 中定义好属性便可
// @Autowired
// ConnectionFactory connectionFactory;
@Bean
public DirectExchange myExchange() {
return new DirectExchange("myExchangeDemo", true, false);
}
@Bean
public Queue myQueue() {
return new Queue("myQueueDemo", true);
}
@Bean
public Binding myExchangeBinding(@Qualifier("myExchange") DirectExchange directExchange, @Qualifier("myQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("routeDemo");
}
@Bean
public RabbitTemplate myExchangeTemlate() {
RabbitTemplate r = new RabbitTemplate(myConnectionFactory());
r.setExchange("myExchangeDemo");
r.setRoutingKey("routeDemo");
return r;
}
/** * 发送消息,工业使用须要本身作个性化实现 */
@Bean
public void sendMessage(RabbitTemplate myExchangeTemlate) {
String string = "Hello RabbitmQ";
myExchangeTemlate.convertAndSend(string);
}
/** * 接受消息,工业使用时须要在监听类中实现process逻辑 */
@RabbitListener(queues = "myQueueDemo")
public void process(Message message) {
System.out.println("__________" + message.getBody().toString() + "__________");
try {
this.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Kafka 使用
i. pom.xml 配置
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
ii. spring-integration-kafka.xml 也能够研究经过bean方式来实现
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="192.168.2.2:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="1_service"
value-encoder="kafkaEncoder"
key-encoder="kafkaEncoder"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
</beans>
iii. 客户端实现代码:
@Autowired
MessageChannel inputToKafka;
String value = "Hello Kafka";
Message<String> mess = MessageBuilder.withPayload(value)
.setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build();
inputToKafka.send(mess);
消息中间件有不少,实际使用中每每是根据架构师的技术栈相关,作到了解使用场景和基本原理,在项目中提高本身细节能力,作到我的和公司共同成长,才是好的方式.