事情的原由源于在使用微信公众号服务的时候,做为一个第三方的服务商,腾讯会将各类业务消息推送到第三方开发者的服务器上,而以前的方案是消息直接进到服务上,当使用到一些业务,好比发券等操做时,腾讯服务器会向开发者发送大量的消息,因为消息服务的处理能力有限,尤为是高峰的时候,消息请求会直接压到服务上,致使服务线程繁忙,这时候会报大量服务超时,触发微信的服务报警,服务不可用,或者服务超时,这时公众号内的消息服务将没法继续为用户提供服务。鉴于此问题,咱们从新梳理并构建了基于Spring Cloud Stream的消息驱动的微服务spring
咱们采用 Spring Cloud Finchley.SR2,Spring Boot 2.0.6.RELEASE 版原本开发,系统的初步设计思路,是利用json
消息队列rabbitmq来解耦服务,来减缓消息直接到服务上的压力,咱们没有直接对接mq来使用,而是采用了Spring Cloud Stream, 简单的来讲,Spring Cloud Stream是构建消息驱动的微服务应用程序的框架,将消息整合的处理操做进行了进一步的抽象操做, 实现了更加简化的消息处理, 可使用不一样的代理中间件,它抽象了事件驱动的一些概念,对于消息中间件的进一步封装,能够作到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务能够关注更多本身的业务流程。缓存
Spring Cloud Stream的一些基本概念能够自行搜索,这里不作过多描述,下面只是讲述一下具体方案和配置方法及遇到的问题。服务器
基本结构是一个很是简单的消息订阅发布模式微信
message-center 做为接受微信消息处理中心,为消息生产者,多线程
message-for-all 做为消息队列消息处理服务,为消息消费者,并发
message-center 从微信的服务器接受到消息后,采用异步多线程的方式,处理部分业务逻辑,好比多客服,好比第三方的全网发布检测等,须要在5秒内返回给微信服务器消息的一些及时性消息,同时根据消息类型讲消息分类,并发送给消息队列中间件,消息生产者message-center经过SpringCloudStream来做为和消息队列中间件的粘合剂,将消息传递给消息队列中间件,这里能够随意切换消息中间件而不用考虑代码的变动,咱们这里默认采用的rabbitmq做为消息队列中间件服务,app
具体配置方法,新建一个接口,这个接口中能够定义多个管道用来发送消息,能够实现向不一样的exchange发送消息框架
public interface SendOutputChannel {异步
// 这里能够定义不一样的通道
String MSG_SENDER = "msgSender"; // 通道名
@Output(SendOutputChannel.MSG_SENDER)
MessageChannel msgSender();
}
启动的类中要加入@EnableBinding
@SpringBootApplication
@EnableBinding(SendOutputChannel.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
application.yml文件中配置
#---- mq的基本配置信息
spring:
rabbitmq:
host:
port:
username:
password:
virtual-host:
cloud:
stream:
bindings:
msgSender: #生产者绑定,这个是消息通道的名称
destination: message-exchange # 这里对应的是rabbitmq中的exchange
content-type: application/json #定义消息类型
rabbit:
bindings:
msgSender:
producer:
delivery-mode: non-persistent #消息不持久化
发送消息的类中要注入发送消息的接口,当接到微信发送的消息后,通过业务逻辑处理后,在须要向mq发消息的地方,调用发送消息的方法,经过Spring Cloud Stream来实现消息发送。
@Autowired
private SendOutputChannel sendOutputChannel;
public void sendMessage(Message<?> message) {
if (!sendOutputChannel.msgSender().send(message, TimeUnit.SECONDS.toMillis(4))) {
log.error("生产者消息发送失败:" + message.toString());
}
}
这就完成了消息发送者的基本开发
这样服务启动之后,rabbitmq的exchange中将会出现一个 message-exchange的 交换机
消息接受者 message-for-all
一样须要定义一个消息接收的管道接口,这个接口中能够定义多个管道用来接受消息,能够接受对应不一样的exchange接受到的消息
public interface ReceiveInputChannel {
// 这里能够定义不一样的通道
String MSG_RECEIVER = "msgReceiver"; // 通道名
@Input(ReceiveInputChannel.MSG_RECEIVER)
SubscribableChannel msgReceiver();
}
application.yml中配置
#---- mq的基本配置信息
spring:
rabbitmq:
host:
port:
username:
password:
virtual-host:
cloud:
stream:
bindings:
msgReceiver: #消费者绑定 这个是接受消息通道的名称
group: for-all #持久化, 也就是指定队列名称,等同于rabbitmq中的 queue, 同一个服务不一样的实例,使用相同的队列名称,处理消息
destination: message-exchange #和生产者的消息交换机要相同
content-type: application/json
consumer:
max-attempts: 3 # The number of attempts to process the message (including the first) in the event of processing failures,Default: 3
concurrency: 1 # The concurrency setting of the consumer. Default: 1.
rabbit:
bindings:
msgReceiver:
consumer:
max-concurrency: 10 # maxumum concurrency of this consumer (threads)
prefetch: 50 # number of prefetched messages pre consumer thread
requeue-rejected: false # true to requeue rejected messages, false to discard (or route to DLQ)
republish-to-dlq: true # republish failures to the DLQ with diagnostic headers
# durable-subscription: false #队列是否要持久化
其余一些配置都是消息每次有几个线程处理,每一个线程处理多少数量的消息,失败后从新尝试处理几回等,一些配置方案,
max-concurrency:是并发消费者数量,能够并发处理消息。
若是对队列的消费顺序要求特备苛刻,不但愿并发消费,则max-concurrency须要设置为1,exclusive: true #惟一性 仅建立者可使用的私有队列,断开后自动删除, 固然必须是在autoDelete和exclusive都为false的时候。队列是能够被持久化, Exclusive参数,默认为false, 若是设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)
启动类里要使用@EnableBinding绑定消息接收管道
@SpringBootApplication
@EnableBinding(ReceiveInputChannel.class)
public class Application {
@StreamListener(ReceiveInputChannel.MSG_RECEIVER)
public void handle(Message<String> message) throws Exception {
// 在这里再去整合消息处理的业务逻辑
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
这样服务启动之后,将会出现message-exchange.for-all的一个消息队列,根据消费者服务启动数量的不一样,也将会出现对应的消费者
至此整个消息处理的基本结构就描述完成了,固然实际的开发过程当中,还要考虑消息的异步处理,多线程去处理等,这里就不详尽描述了,须要根据本身的业务须要来实现相应的开发,
有几点注意的状况:
@StreamListener(ReceiveInputChannel.MSG_RECEIVER) 这个方法只能放到Application 服务启动的类中,放到别的地方会报错:Disp org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,可能和类的加载顺序有关系。这样在启动类中接受消息,而后能够再经过业务拆分,将消息转到其余的类中实现各自业务逻辑开发。
<!-- 添加Spring Cloud Stream与RabbitMQ消息中间件的依赖。 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 添加Spring Cloud Stream与Kafaka消息中间件的依赖。 -->
<!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> -->
根据不一样的消息中间件,选择不一样的依赖。
当接收消息过多的时候,能够增长消息生产者实例来加大消息的接受能力,当消费者处理大量阻塞消息时,处理能力降低,能够经过增长负载的消费者服务实例数量来加大消费能力,这个须要经过实际状况找到平衡点,消息队列做为缓存,下降了因为消息直接压到服务器上而致使的服务崩溃问题的风险。