[转] Spring Integration 系统集成

【From】 http://blog.csdn.net/w_x_z_/article/details/53316618java

 

pring Ingegration 提供了基于Spring的EIP(Enterprise Integration Patterns,企业集成模式)的实现。Spring Integration 主要解决的问题是不一样系统之间交互的问题,经过异步消息驱动来达到系统交互时系统之间的松耦合。spring

Spring Integration 主要有Message、Channel、Message EndPoint组成。数组

 

Message多线程

Message是用来在不一样部分之间传递的数据。Message有两部分组成:消息体(payload)与消息头(header)。消息体能够是任何数据类型;消息头表示的元数据就是解释消息体的内容。异步

/**
 * A generic message representation with headers and body.
 *
 * @author Mark Fisher
 * @author Arjen Poutsma
 * @since 4.0
 * @see org.springframework.messaging.support.MessageBuilder
 */
public interface Message<T> {

    /**
     * Return the message payload.
     */
    T getPayload();

    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();

}

 

Channelide

在消息系统中,消息发送者发送消息到通道(Channel),消息接受者从通道(Channel)接收消息。post

 

一、顶级接口ui

(1) MessageChannelthis

MessageChannel 是Spring Integration消息通道的顶级接口:spa

public interface MessageChannel {

    /**
     * Constant for sending a message without a prescribed timeout.
     */
    long INDEFINITE_TIMEOUT = -1;


    /**
     * Send a {@link Message} to this channel. If the message is sent successfully,
     * the method returns {@code true}. If the message cannot be sent due to a
     * non-fatal reason, the method returns {@code false}. The method may also
     * throw a RuntimeException in case of non-recoverable errors.
     * <p>This method may block indefinitely, depending on the implementation.
     * To provide a maximum wait time, use {@link #send(Message, long)}.
     * @param message the message to send
     * @return whether or not the message was sent
     */
    boolean send(Message<?> message);

    /**
     * Send a message, blocking until either the message is accepted or the
     * specified timeout period elapses.
     * @param message the message to send
     * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
     * @return {@code true} if the message is sent, {@code false} if not
     * including a timeout of an interrupt of the send
     */
    boolean send(Message<?> message, long timeout);

}

 

MessageChannel 有两大子接口,分别是PollableChannel (可轮询)和SubscribableChannel(可订阅)。咱们全部的消息通道类都是现实这两个接口。

 

(2) PollableChannel

PollableChannel 具有轮询得到消息的能力。

public interface PollableChannel extends MessageChannel {

    /**
     * Receive a message from this channel, blocking indefinitely if necessary.
     * @return the next available {@link Message} or {@code null} if interrupted
     */
    Message<?> receive();

    /**
     * Receive a message from this channel, blocking until either a message is available
     * or the specified timeout period elapses.
     * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
     * @return the next available {@link Message} or {@code null} if the specified timeout
     * period elapses or the message reception is interrupted
     */
    Message<?> receive(long timeout);

}

 

(3) SubscribableChannel

SubscribableChannel 发送消息给订阅了MessageHanlder的订阅者

public interface SubscribableChannel extends MessageChannel {

    /**
     * Register a message handler.
     * @return {@code true} if the handler was subscribed or {@code false} if it
     * was already subscribed.
     */
    boolean subscribe(MessageHandler handler);

    /**
     * Un-register a message handler.
     * @return {@code true} if the handler was un-registered, or {@code false}
     * if was not registered.
     */
    boolean unsubscribe(MessageHandler handler);

}

 

 二、经常使用消息通道

(1)、PublishSubscribeChannel

PublishSubscribeChannel容许广播消息给全部订阅者,配置方式以下:

    /**
     * 容许广播消息给全部订阅者,当前消息通道的id为publishSubscribeChannel
     * @return
     */
    @Bean
    public PublishSubscribeChannel publishSubscribeChannel(){
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        return channel;
    }

 其中,当前消息通道的id为publishSubscribeChannel。

 

(2)、QueueChannel

QueueChannel容许消息接收者轮询得到消息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式以下:

    @Bean
    public QueueChannel queueChannel(){
        QueueChannel channel = new QueueChannel(10);
        return channel;
    }

 其中,QueueChannel构造参数10即为队列的容量。

 

(3)、PriorityChannel

PriorityChannel可按照优先级将数据存储到队列,它依据于消息的消息头priority属性,配置方式以下:

    @Bean
    public PriorityChannel priorityChannel(){
        PriorityChannel channel = new PriorityChannel(10);
        return channel;
    }

 

(4)、RendezvousChannel

RendezvousChannel确保每个接收者都接收到消息后再发送消息,配置方式以下:

    @Bean
    public RendezvousChannel rendezvousChannel(){
        RendezvousChannel channel = new RendezvousChannel();
        return channel;
    }

 

(5) DirectChannel

DirectChannel是Spring Integration默认的消息通道,它容许将消息发送给为一个订阅者,而后阻碍发送直到消息被接收,配置方式以下:

    @Bean
    public DirectChannel directChannel(){
        DirectChannel channel = new DirectChannel();
        return channel;
    }

 

(6)、ExecutorChannel

ExecutorChannel可绑定一个多线程的task executor,配置方式以下:

    @Bean
    public ExecutorChannel executorChannel(){
        ExecutorChannel channel = new ExecutorChannel(executor());
        return channel;
    }

    @Bean
    public Executor executor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }

 

三、通道拦截器

Spring Integration给消息通道提供了通道拦截器(ChannelInterceptor),用来拦截发送和接收消息的操做.

ChannelInterceptor接口定义以下,咱们只须要实现这个接口便可:

 public interface ChannelInterceptor {

        Message<?> preSend(Message<?> message, MessageChannel channel);

        void postSend(Message<?> message, MessageChannel channel, boolean sent);

        void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

        boolean preReceive(MessageChannel channel);

        Message<?> postReceive(Message<?> message, MessageChannel channel);

        void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);

    }

 经过以下代码给全部的channel增长拦截器

channel.addInterceptor(someInterceptor);

 

Message EndPoint

消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还能够控制通道的路由。咱们可用的消息端点包含以下:

 

(1) Channel Adapter

通道适配器(Channel Adapter)是一种链接外部系统或传输协议的端点(EndPoint),能够分为入站(inbound)和出站(outbound)。 
通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。

Spring Integration内置了以下的适配器:

RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket

 

(2) Gateway

消息网关(Gateway)相似于Adapter,可是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。 
Spring Integration 对响应的Adapter都提供了Gateway。

 

(3) Service Activator

Service Activator 可调用Spring的Bean来处理消息,并将处理后的结果输出到指定的消息通道。

 

(4) Router

路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router) 以及定义好的接收表(Recipient List Router) 做为条件,来决定消息传递到的通道。

 

(5) Filter

过滤器(Filter) 相似于路由(Router),不一样的是过滤器不决定消息路由到哪里,而是决定消息是否能够传递给消息通道。

 

(6) Splitter

拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。

 

(7) Aggregator

聚合器(Aggregator)与拆分器相反,它接收一个java.util.List做为参数,将多个消息合并为一个消息。

 

(8) Enricher

当咱们从外部得到消息后,须要增长额外的消息到已有的消息中,这时就须要使用消息加强器(Enricher)。消息加强器主要有消息体 
加强器(Payload Enricher)和消息头加强器(Header Enricher)两种。

 

(9) Transformer

转换器(Transformer)是对得到的消息进行必定的转换处理(如数据格式转换).

 

(10) Bridge

使用链接桥(Bridge)能够简单的将两个消息通道链接起来。

相关文章
相关标签/搜索