【From】 http://blog.csdn.net/w_x_z_/article/details/53316618java
pring Ingegration 提供了基于Spring的EIP(Enterprise Integration Patterns,企业集成模式)的实现。Spring Integration 主要解决的问题是不一样系统之间交互的问题,经过异步消息驱动来达到系统交互时系统之间的松耦合。spring
Spring Integration 主要有Message、Channel、Message EndPoint组成。数组
Message多线程
Message是用来在不一样部分之间传递的数据。Message有两部分组成:消息体(payload)与消息头(header)。消息体能够是任何数据类型;消息头表示的元数据就是解释消息体的内容。异步
/** * A generic message representation with headers and body. * * @author Mark Fisher * @author Arjen Poutsma * @since 4.0 * @see org.springframework.messaging.support.MessageBuilder */ public interface Message<T> { /** * Return the message payload. */ T getPayload(); /** * Return message headers for the message (never {@code null} but may be empty). */ MessageHeaders getHeaders(); }
Channelide
在消息系统中,消息发送者发送消息到通道(Channel),消息接受者从通道(Channel)接收消息。post
一、顶级接口ui
(1) MessageChannelthis
MessageChannel 是Spring Integration消息通道的顶级接口:spa
public interface MessageChannel { /** * Constant for sending a message without a prescribed timeout. */ long INDEFINITE_TIMEOUT = -1; /** * Send a {@link Message} to this channel. If the message is sent successfully, * the method returns {@code true}. If the message cannot be sent due to a * non-fatal reason, the method returns {@code false}. The method may also * throw a RuntimeException in case of non-recoverable errors. * <p>This method may block indefinitely, depending on the implementation. * To provide a maximum wait time, use {@link #send(Message, long)}. * @param message the message to send * @return whether or not the message was sent */ boolean send(Message<?> message); /** * Send a message, blocking until either the message is accepted or the * specified timeout period elapses. * @param message the message to send * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT} * @return {@code true} if the message is sent, {@code false} if not * including a timeout of an interrupt of the send */ boolean send(Message<?> message, long timeout); }
MessageChannel 有两大子接口,分别是PollableChannel (可轮询)和SubscribableChannel(可订阅)。咱们全部的消息通道类都是现实这两个接口。
(2) PollableChannel
PollableChannel 具有轮询得到消息的能力。
public interface PollableChannel extends MessageChannel { /** * Receive a message from this channel, blocking indefinitely if necessary. * @return the next available {@link Message} or {@code null} if interrupted */ Message<?> receive(); /** * Receive a message from this channel, blocking until either a message is available * or the specified timeout period elapses. * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}. * @return the next available {@link Message} or {@code null} if the specified timeout * period elapses or the message reception is interrupted */ Message<?> receive(long timeout); }
(3) SubscribableChannel
SubscribableChannel 发送消息给订阅了MessageHanlder的订阅者
public interface SubscribableChannel extends MessageChannel { /** * Register a message handler. * @return {@code true} if the handler was subscribed or {@code false} if it * was already subscribed. */ boolean subscribe(MessageHandler handler); /** * Un-register a message handler. * @return {@code true} if the handler was un-registered, or {@code false} * if was not registered. */ boolean unsubscribe(MessageHandler handler); }
二、经常使用消息通道
(1)、PublishSubscribeChannel
PublishSubscribeChannel容许广播消息给全部订阅者,配置方式以下:
/** * 容许广播消息给全部订阅者,当前消息通道的id为publishSubscribeChannel * @return */ @Bean public PublishSubscribeChannel publishSubscribeChannel(){ PublishSubscribeChannel channel = new PublishSubscribeChannel(); return channel; }
其中,当前消息通道的id为publishSubscribeChannel。
(2)、QueueChannel
QueueChannel容许消息接收者轮询得到消息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式以下:
@Bean public QueueChannel queueChannel(){ QueueChannel channel = new QueueChannel(10); return channel; }
其中,QueueChannel构造参数10即为队列的容量。
(3)、PriorityChannel
PriorityChannel可按照优先级将数据存储到队列,它依据于消息的消息头priority属性,配置方式以下:
@Bean public PriorityChannel priorityChannel(){ PriorityChannel channel = new PriorityChannel(10); return channel; }
(4)、RendezvousChannel
RendezvousChannel确保每个接收者都接收到消息后再发送消息,配置方式以下:
@Bean public RendezvousChannel rendezvousChannel(){ RendezvousChannel channel = new RendezvousChannel(); return channel; }
(5) DirectChannel
DirectChannel是Spring Integration默认的消息通道,它容许将消息发送给为一个订阅者,而后阻碍发送直到消息被接收,配置方式以下:
@Bean public DirectChannel directChannel(){ DirectChannel channel = new DirectChannel(); return channel; }
(6)、ExecutorChannel
ExecutorChannel可绑定一个多线程的task executor,配置方式以下:
@Bean public ExecutorChannel executorChannel(){ ExecutorChannel channel = new ExecutorChannel(executor()); return channel; } @Bean public Executor executor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(25); taskExecutor.initialize(); return taskExecutor; }
三、通道拦截器
Spring Integration给消息通道提供了通道拦截器(ChannelInterceptor),用来拦截发送和接收消息的操做.
ChannelInterceptor接口定义以下,咱们只须要实现这个接口便可:
public interface ChannelInterceptor { Message<?> preSend(Message<?> message, MessageChannel channel); void postSend(Message<?> message, MessageChannel channel, boolean sent); void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex); boolean preReceive(MessageChannel channel); Message<?> postReceive(Message<?> message, MessageChannel channel); void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex); }
经过以下代码给全部的channel增长拦截器
channel.addInterceptor(someInterceptor);
Message EndPoint
消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还能够控制通道的路由。咱们可用的消息端点包含以下:
(1) Channel Adapter
通道适配器(Channel Adapter)是一种链接外部系统或传输协议的端点(EndPoint),能够分为入站(inbound)和出站(outbound)。
通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。
Spring Integration内置了以下的适配器:
RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI Twitter、XMPP、WebServices(SOAP、REST)、WebSocket
(2) Gateway
消息网关(Gateway)相似于Adapter,可是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。
Spring Integration 对响应的Adapter都提供了Gateway。
(3) Service Activator
Service Activator 可调用Spring的Bean来处理消息,并将处理后的结果输出到指定的消息通道。
(4) Router
路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router) 以及定义好的接收表(Recipient List Router) 做为条件,来决定消息传递到的通道。
(5) Filter
过滤器(Filter) 相似于路由(Router),不一样的是过滤器不决定消息路由到哪里,而是决定消息是否能够传递给消息通道。
(6) Splitter
拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。
(7) Aggregator
聚合器(Aggregator)与拆分器相反,它接收一个java.util.List做为参数,将多个消息合并为一个消息。
(8) Enricher
当咱们从外部得到消息后,须要增长额外的消息到已有的消息中,这时就须要使用消息加强器(Enricher)。消息加强器主要有消息体
加强器(Payload Enricher)和消息头加强器(Header Enricher)两种。
(9) Transformer
转换器(Transformer)是对得到的消息进行必定的转换处理(如数据格式转换).
(10) Bridge
使用链接桥(Bridge)能够简单的将两个消息通道链接起来。