SIF Reading logs

Spring Integration Framework 从系统文件中读取配置,持久化数据到数据库,发送信息到外部客户端,发布邮件,FTP平常的快照,执行其它的例行任务。java

咱们的应用程序要跟文件系统,数据库系统,邮件系统,FTP服务等打交道。 咱们还须要考虑部署各类不一样的适配器来知足咱们应用程序跟外部其它应用集成的须要。web

共享文件系统:两个以上应用程序共享一个通用的文件系统,一个可能写入,其它的读取。 这时须要将发送者和接收者解耦。可是它短板在于性能,稳定性,以及对文件系统的依赖。正则表达式

单数据库:应用程序共享单一数据库。形成网络延迟,链接锁定问题算法

消息系统:要求发送和接收者解耦,发送消息的程序将一些数据封装发送给一个消息中间体就不须要在关心它了。 一个消费消息的消费者不管什么时候均可以开始它的工做流。好处在于消息能够在这个过程当中被强化,转换,路由,过滤,而后才送达终端处理。spring

消息模式:消息,信道,转换器数据库

pipes and filters 模式 mkonda$ cat just-spring-titles.txt | grep "Just Spring" | wc -l 这里有三个终端处理 cat,grep,wc cat 命令显示文件内容,这里的显示不是显示到屏幕上,而是经过管道发送给grep命令,grep命令会获取文件内容并从中查找 Just Spring字符串,而后将结果经过管道传递给wc,由它简单的在屏幕上显示结果数量。 这就是一个管道和过滤模型,其中 | 表示管道。编程

若是咱们了解JMS或者分布式技术,那就应该据说过企业级消息。 咱们的应用程序在跟其它应用程序经过网络进行交互能够被看做企业应用程序。 咱们须要用有一个应用程序服务器来承载这些应用程序,并暴露服务接口给其它应用程序调用。跨域

Enterprise Integration Pattern:EIP 传统的编程模型: 好比咱们设计一个从外部系统获取交易的系统(好比咱们从一个文件中),须要按照以下步骤处理交易: 须要基于交易的类型(新建,撤销,修正等)对交易进行分类 而后将交易按照类别进行处理和存储 当一个交易被存储后,必须有一个审计工具被通知。 咱们能够将上面的设计编写到一个组件中。数组

//Pseudo code缓存

public class TradesLoader { private List trades = null; .... public void start(){ trades = loadTrades(tradesFile); (for Trade t: trades){ processTrade(t); persistTrades(t); auditNotify(t); } } public void processTrade(Trade trade){ if (t.getStatus().equalsIgnoreCase("new")) { processNewTrade(); } else if (t.getStatus().equalsIgnoreCase("amend")) { processAmmendTrade(); } else { processOtherTrade(); } } public void loadTrades(File fromFile){ .. } public void persistTrades(Trade trade){ .. } public void auditNotify(Trade trade){ .. } public static void main(String[] args) { new TradesLoader().start(); } } 这是一个顺序模型,组件跟整个业务流程高度的耦合。若是咱们想添加另一个工做流,好比咱们将对全部大级别交易发送通知 或者建立一个新任务来收集交易模型,咱们就须要从新构建这个流程,并写添加更多的if-else语句。 这样咱们就看出上面这个TradesLoader组件干的太多了,而不仅仅是夹在交易。 因此咱们须要简化这种设计,让TradesLoader组件感到加载完Trades为止结束。 在上面但进程处理中,TradesLoader首先是获取Trade而后存储到一个事先定义好的内部序列中。 相关的操做好比TradeProcessor,TadePersistor,TradeNotifier都将基于这个序列来完成整个工做流程。 独立的消息模型: 上面的组件应用能够被重构成TradesLoader在从文件中加载了Trades后,发布Trades到一个容器数据结构,在消息模型中,这个容器 被称为destination 或者是 channel,而其它的处理组件都从这里获取Trade。而destination或者channel扮演了一个导管的角色。 Spring Integration 框架是基于Spring的核心组件开发出来的,知足此类模型的编程的优秀框架,它须要咱们提供专门的消息容器程序。 基本概念:消息,信道和端点,Messages,Channels和Endpoints 消息是数据的容器,信道是包含这些消息的地址,端点是一些链接到这些信道消费或者发布消息的组件。 消息是在两个应用程序之间携带信息的对象,它造成于一段,在另外一端被解析处理,消息的生产者或者发布者发布消息到一个信道, 消息的预订者或者消费者链接该信道而后获取消息,从消息中读取其携带的数据,复原会相应的领域对象,进行相关业务处理。 解析一个消息: 消息由两部分组成:payload和header payload至关于信的内容,是对它感兴趣的一方须要处理的内容。 header是头信息,至关于信封。 public interface Message { T getPayLoad(); MessageHeaders getHeaders(); } public final class MessageHeaders implements Map, Serializable { ... } 框架提供了一个Message接口的通用实现GenericMessage 咱们可使用工具类MessageBuilder来生成: // Create payload object Account a = new Account(); // creating a map with header properties Map accountProperties = new HashMap(); // Set our header properties accountProperties.put("ACCOUNT_EXPIRY","NEVER"); // Use MessageBuilder class to create a Message // and set header properties Message m = MessageBuilder.withPayload(a) .setHeader("ACCOUNT_EXPIRY", "NEVER") .build() 消息信道: 消息数据的容器,被称为信道,表示消息被发送的位置。 框架中咱们定义了MessageChannel接口来描述它。 Spring Integration提供了声明信道的模型,咱们不须要经过java类来定义它。 // declaratively creating a channel 同时提供了一些具体的实现,QueueChannel,PriorityChannel和RendezvousChannel等。 虽然它们各有不一样可是底层的设计原则都是同样的,表现为一个端点地址。 端点Endpoints: 它们从一个输入信道消费消息,或者从一个输出信道发布消息。也多是只消费消息,或者只发送消息。 框架提供了不少即插即用的端点实现:Transformers,Splitters,Filters,Routers等。 还提供了一些端点适配器来链接像JMS,FTP,JDBC,Twitter等。 好比咱们有一个Service Activator endpoint 它是一个通用的端点,用于当一个消息抵达输入信道时,就在一个bean上调用一个方法。 //bean to be invoked 这里service-activator 端点会在消息抵达position-channel时会获取消息并调用bean上的processNewPosition方法, 而完成这些所须要的代码,框架已经帮忙实现了。 好比一个交易者须要一个web应用程序生成一个交易,发送到一个JMS目的地,在那里这些交易会被另一个组件预订,它们会 链接JMS目的地并获取交易并处理它们。 咱们可使用一个适配器inbound-channel-adapter来从一个输入JMS Queue获取消息。 tcp://localhost:61616 // bean that would be invoked public class NewTradeProcessor { public void processNewTrade(Trade t){ // Process your trade here. System.out.println("Message received:"+m.getPayload().toString()); } } 测试类: public class NewTradeProcessorTest { private ApplicationContext ctx = null; private MessageChannel channel = null; // Constructor which instantiates the endpoints public NewTradeProcessorTest() { ctx = new ClassPathXmlApplicationContext("basics-example-beans.xml"); channel = ctx.getBean("trades-channel", MessageChannel.class); } private Trade createNewTrade() { Trade t = new Trade(); t.setId("1234"); ... return t; } private void sendTrade() { Trade trade = createNewTrade(); Message tradeMsg = MessageBuilder.withPayload(trade).build(); channel.send(tradeMsg, 10000); System.out.println("Trade Message published."); } public static void main(String[] args) { NewTradeProcessorTest test = new NewTradeProcessorTest(); test.sendTrade(); } } 经过信道咱们能够解耦发送者和接收者,在咱们的管道和过滤模型中,Channel是管道。 咱们能够经过信道和终端组合出复杂的集成方案。 消息信道: MessageChannel接口定义两个主要发方法: boolean send(Message message); boolean send(Message message, long timeout) 第一个方法执行时必须等到消息发送成功才会返回控制权,当一个消息没有被发送时,第二个方法会接管,抛出异常。 这里的timeout变量能够是0,正值或者负值。若是为负值,则线程会被阻塞直到消息发布成功。 若是是0,发送方法不管成功与否都会马上返回控制权,若是正值,则会在发送失败时等待相应的时间抛出错误和异常。 它没有定义任何的接收消息的方法,接收消息大量依赖接收语法Point-to-Point(p2p)或者publish/Subscribe(Pub/Sub). 在P2P模式下,只有一个消息接收者接收消息,即便有多个接收者能链接该信道,可是只能有一个接收,并且可使随机选择。 在Pub/Sub模式下,消息是发送给全部预订了该消息的消费者,就是说消息被拷贝后分发给全部预订者。 这里有个概念叫作message buffering,消息缓存,消息被根据配置按照序列保存到内存或者持久存储区。 客户端选择信道依赖于分发模式(P2P,Pub/Sub)和缓冲或非缓冲情景。 这里有两个接口来处理接收端: PollableChannel 和 SubscribleChannel 它们两个都扩展子MessageChannel,故自动继承了Send方法。 点到点模式: 消费者仅须要使用任意的PollableChannel接口实现便可: public interface PollableChannel extends MessageChannel { // This call blocks the thread until it receives a message Message receive(); // This call will wait for a specified timeout before // throwing a failure if message is not available Message receive(long timeout); } 框架提供了该接口的三个实现:QueueChannel,PriorityChannel和RendezvousChannel QueueChannel具有消息缓冲功能,PriorityChannel和RendezvousChannel是QueueChannel更出色的实现版本。 它们扩展了P2P和缓冲特性。 Spring会在应用程序启动时建立它们。 public class QueueChannelTest{ private ApplicationContext ctx=null; private MessageChannel qChannel=null; public QueueChannelTest(){ ctx=new ClassPathXmlApplicationContext("channels-beans.xml"); qChannel = ctx.getBean("q-channel",MessageChannel.class); } public void receive(){ // This method receives a message, however it blocks // indefinitely until it finds a message //Message m =((QueueChannel)qChannel).receive(); // This method receives a message, however it exists // within the 10 seconds even if doesn't find a message Message m = ((QueueChannel) qChannel).receive(10000); System.out.println("Payload: " + m.getPayload()); } } channels-beans.xml ------------------------------------------------------------------------------------------------------------------- Pub/Sub模型: 使用SubscribableChannel,每一个消息都要被广播给全部注册的订阅者。 public interface SubscribableChannel extends MessageChannel{ //to subscribe a MessageHandler for handling the message boolean subscribe(MessageHandler handler); //unsubscribe boolean unsubscribe(MessageHandler handler); } public interface MessageHandler{ //this method is invoked when a fresh message appears on the channel void handleMessage(Message message) throws MessagingException; } public class ChannelTest{ private ApplicationContext ctx=null; private MessageChannel pubSubChannel=null; public ChannelTest(){ ctx=new ClassPathXmlApplicationContext("channels-beans.xml"); pubSubChannel = ctx.getBean("pubsub-channel",MessageChannel.class); } public void subscribe(){ ((PublishSubscribeChannel)pubSubChannel).subscribe(new TradeMessageHandler()); } class TradeMessageHandler implements MessageHandler{ public void handleMessage(Message message) throws MessagingException{ System.out.println("Handling Message:"+ message); } } } -------------------------------------------------------------------------------------------------------------- Queue Channel: 该信道展示出点对点特性,只有一个消费者能够接收到消息,可是能够建立多个消费者。 该信道同时还支持缓冲消息,由于它使用一个序列结构将消息保存到内存中。 定义了一个100个元素的序列信道。若是省略则会建立一个无限容量的信道,capacity 的会被设置为Integer.MAX_VALUE。 在没有客户消费消息时,消息序列可能会被塞满,消息发送者也会被阻塞,直到序列有空间可用或者超时发生。 QueueChannel实现的事First In First Out(FIFO)顺序。数据会被保存到java.util.concurrent.LinkedBlockingQueue. QueueChannel 还提供了一个purge方法来净化序列,用MessageSelector来预约义条件 public List> purge(MessageSelector selector){} 若是给purge方法传入null参数,将会清空整个序列。 ------------------------------------------------------------------------------------------------------------- Priority Channel: 属于QueueChannel的一个子类,只是添加了一个消息优先级设置。若是咱们须要发送高优先级消息,那么使用PriorityChannel是不错的选择。 咱们使用MessageHeader的PRIORITY属性设置优先级。 public void publishPriorityTrade(Trade t){ Message tradeMsg = MessageBuilder.withPayload(t). setHeader(MessageHeades.PRIORITY,10).build(); priorityChannel.send(tradeMsg,10000); System.out.println("The Message is published successfully"); } 若是咱们须要进一步的控制优先级,咱们须要经过实现Comparator> 来提供比较实现。 public class AccountComparator implements Comparator>{ @Override public int compare(Message msg1,Message msg2){ Account a1 = (Account)msg1.getPayload(); Account a2 = (Account)msg2.getPayload(); Integer i1 = a1.getAccountType(); Integer i2 = a1.getAccountType(); return i1.compareTo(i2); } } 咱们须要让框架直到咱们定义的比较器, ------------------------------------------------------------------------------------------------------------- Rendezvous Channel: 也是QueueChannel的一个子类,展现的是点对点特性。它实现的是一个零容量的序列。 在底层它使用SynchronousQueue数据结构,这就意味着任什么时候间点都只能有一个消息存在于信道中。 当消息生产者发送一个消息给它时,它会被锁定直到消息被消费者消费掉。 一样的,消费者也会被锁定等待消息出如今信道中。 当咱们但愿接收一个请求回复时,RendezvousChannel是个理想的信道。 客户端推送一个在消息头中添加要求回复的头信息的请求, public void sendTradeToRendezvous(Trade t) { Message tradeMsg = MessageBuilder.withPayload(t). etHeader(MessageHeaders.REPLY_CHANNEL, "replyChannel").build(); rendezvousChannel.send(tradeMsg, 10000); System.out.println(t.getStatus() + " Trade published to a Rendezvous channel"); } ----------------------------------------------------------------------------------------------- PublishSubscribe Channel: 若是咱们须要将消息发送给多个消费者,则须要使用SubscribeChannel, 这里没有定义receive 方法。 由于信息接收时有MessageHandler来处理的。 public class PubSubTest{ MessageHandler handler = new TradeMessageHandler(); private ApplicationContext ctx = null; private PublishSubscribeChannel pubSubChannel = null; ... // subscribe to the channel public void subscribe() { boolean handlerAdded = pubSubChannel.subscribe(handler); System.out.println("Handler added?" + handlerAdded); } // Unsubscribe using the same channel and handler references. public void unsubscribe() { boolean handlerRemoved = pubSubChannel.unsubscribe(handler); System.out.println("Handler removed?" + handlerRemoved); } //Handler to handle the messages class TradeMessageHandler implements MessageHandler { public void handleMessage(Message message) throws MessagingException { System.out.println("Handling Message:" + message); } } } 当消息出如今信道时,它会调用注册的handler并传入消息,进行处理。 --------------------------------------------------------------------------------------------------------- Direct Channel: 混合了P2P和Pub/Sub特点,它实现了SubscribableChannel接口,因此咱们须要一个MessageHandler的具体实现来订阅它。 消息能够被订阅它的处理器处理,可是只有一个订阅者会获取消息,呈现了P2P特色。 即便你注册了多个订阅者,信道也只会交付给它们中的一个。 该框架用于循环广播策略从众多预订者中选择一个接收消息。 消息的生产者和消费者都运行在同一个线程中,对于跨域多个资源事务的企业级应用来讲很是有用。 若是多个处理器订阅一个信道,有两个问题,要选择哪一个处理器处理消息和若是选择了一个处理器不能处理消息问题。 一个是load-balancer另外一个是failover 属性。 load-balancer 标记选择一个合适的加载策略来选择处理器。 failover属性是布尔标记,若是设置为true,若是被选中的处理抛出异常,它会让后续的处理器处理消息,默认值为true。 由于DirectChannel将处理订阅者的任务委托给MessageDispatcher, load-balancer默认设置为round-robin,若是要忽略加载平衡策略,则直接能够将load-balancer的值设置为 none ------------------------------------------------------------------------------------------------------------ Executor Channel: 实现SubscribableChannel接口,相似DirectChannel,除了是由java.uti.concurrent.Executor 实例来派发消息。 在DirectChannel实现里,消息发送线程从头至尾彻底掌控,而在ExecutorChannel中发送线程发送完就结束了。 消息消费是由派发器独立的线程处理,派发器经过消费者调用消息处理的执行器。 // define the executor 默认设置: ------------------------------------------------------------------------------------------------------------- Null Channel: 是PollableChannel,主要用于测试目的。其发送方法老是返回true,指定的操做都是成功的。而接收方法老是获取一个空的消息。 底层代码没有建立任何序列,可是send操做返回true,receive返回null。 // This is the framework class implementation public class NullChannel implements PollableChannel { // send will always return true public boolean send(Message message) { if (logger.isDebugEnabled()) { logger.debug("message sent to null channel: " + message); } return true; } // receive will return null always public Message receive() { if (logger.isDebugEnabled()) { logger.debug("receive called on null channel"); } return null; } ... } 总之,消息信道是分隔生产者和消费者的主要组件。 =============================================================================== Endpoints 端点 消息终端是从消息框架中分离业务逻辑的组件。它们对于隐藏消息细节很是重要。 它们负责链接应用程序组件到消息信道来发送和接收消息。 Spring Integration提供的终端有Service Activator,Channel Adapter,Message Bridge,Gateway,Transformer,Filter,Router等。 首先咱们须要在配置文件中引入相应的命名空间定义: xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd" Service Activator: 是一个通用的端点,它在一个消息抵达某个信道时调用某个bean上定义的方法执行。 若是这个方法有返回值,该返回值将会被发送到一个输出信道,前提是已经配置了输出信道。 使用service-activator 元素配置activator,并设置input-channel和ref的bean。 经过上面的定义,任何到达positions-channel的消息都会被传递给NewTradeActivator 并经过由method属性指定的方法来处理它。 若是目标bean只有一个方法,那么这里method属性就不必设置。框架会自动将该惟一方法解析为服务方法并调用它。 NewTradeActivator类做为服务的入口 public class NewTradeActivator { Position position = .. public void processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... } } 该方法返回非空值时,返回值会被包裹在一个Message中并发送给一个output-channel. 若是你想在处理完后发送一个回复给其它信道,咱们就给方法定义返回值。 // Return value will be wrapped in a Message // and sent to an output channel public Position processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... return position; } 咱们能够没有定义output-channel,只要咱们的处理方法带有返回值,框架将会使用消息头中名为replyChannel的属性来发送回复。 若是在消息头中没有发现replyChannel属性,则会抛出异常。 服务方法能够有Message或者一个Java对象做为参数,若是使用一个Java对象做为参数,消息中的负载就会被取出来传递给message。 由于传入消息是一个java对象,该模式将不会绑定咱们的Spring API,因此是个更好的选项。 上例中一个Position对象被包裹到一个Message中传递给信道。 ---------------------------------------------------------------------------------------------------------- Message Bridge: 消息桥是一个简单的端点,它链接不一样的消息模型或者适配器。 经常使用的消息桥有绑定一个P2P模型信道到一个Pub/Sub模型。 在P2P模型中,一个PollableChannel被端点使用,反之,whereas,一个PublishSubscribableChannel用于Pub/Sub模型。 在配置文件中使用bridge元素定义它, 上例中,bridge会获取来自输入信道的消息发布到输出信道中。 输入信道是PublishSubscribeChannel,而输出信道是QueueChannel 要完成上面的设计,须要一个service activator挂到输出信道上。 在消息经过桥端点到达输出信道时,PositionReceiver 会被调用。 --------------------------------------------------------------------------------------------------------- Message Enricher: 一个消息加强组件能够给一个输入的消息添加额外的信息并发送给下一个消费者。 好比一个交易由一些编码信息,好比安全ID,或者客户帐户代码组成。 为了保持可信度咱们可能在不一样的阶段为其附加不一样的信息数据。 框架提供了两种方式来加强消息:Header Enricher和Payload Enricher Header Enricher: 咱们还能够设置一些预约义的属性,好比priority,reply-channel,error-channel等。 框架还支持经过容许header-enricher的头属性来引用一个bean来使用payload设置头属性。 这里的ID是在TradeEnricher的帮助下从payload中获取数据。 public class TradeEnricher { public String enrichHeader(Message m) { Trade t = (Trade)m.getPayload(); return t.getId()+"SRC"; } } Payload Enricher: 使用enricher标签订义添加或者加强到payload的部分的内容。 一样的enricher须要input-channel中的消息,而后将它传递给request-channel并等待回复。 应该有一些其余组件监听request-channel 来加强消息。 在加强了消息的payload后,该组件会发布回复给回复信道。回复信道是经过header属性声明在消息内部的。 一旦加强器enricher得到了回复,它会经过表达式将加强数据设置给属性。 上面的配置中,Price被传入in-channel,Price消息没有任何数据,enricher将它传递给enricher-req-channel并等待回复。 获取消息并加强其信息后返回Price,返回值被发布到reply-channel. enricher一旦接收到来自reply-channel的消息就继续处理, 添加额外的属性,好比price和instrument到消息并发送它们到output-channel。 public void publishPrice(){ //Create a Price object with no values Price p = new Price(); //note the reply-channel as header property Message msg = MessageBuilder.withPayload(p) .setHeader(MessageHeaders.REPLY_CHANNEL, "reply-channel") .build(); channel.send(msg,10000); System.out.println("Price Message published"); } public class PriceEnricher{ public Price enrichHeader(Message m){ Price p = (Price)m.getPayload(); p.setInstrument("IBM"); p.setPrice(111.11); return p; } } Enricher 组件符合Gateway模式。 ------------------------------------------------------------------------------ Gateway:网关 有两种类型的网关,同步网关和异步网关 同步网关中,消息调用会被阻塞直到消息处理完成,而异步网关,消息的调用时非阻塞的。 Synchronous Gateway: 写一个网关的第一步就是定义一个接口来描述跟消息系统的交互方法。 好比咱们定义ITradeGateway接口包含一个单一的processTrade方法 public interface ITradeGateway { public Trade processTrade(Trade t); } 配置网关: 上面配置在应用程序上下文加载时,会用默认请求和回复信道建立一个网关端点,网关有一个service-interface属性 该属性执行咱们的ITradeGateway接口。框架的GatewayProxyFactoryBean 为服务接口建立一个代理(因此你不用为其提供任何实现) 代理可以使用提供的信道来处理客户端输入和输出的请求。 因此若是客户端调用一个processTrade方法,也是由代理来完成。 它发布一个带有Trade对象的消息到trades-in-channel,代理会阻塞调用直到从trades-out-channel收到回复为止。 该回复会被传回客户端。会有另一个组件来获取来trades-in-channel的消息根据业务需求进行处理。 客户端代码: public GatewayEndpointTest() { ... public GatewayEndpointTest() { ctx = new ClassPathXmlApplicationContext("endpoints-gateway-beans.xml"); // obtain our service interface tradeGateway = ctx.getBean("tradeGateway",ITradeGateway.class); } public void publishTrade(Trade t) { // call the method to publish the trade! Trade it = tradeGateway.processTrade(t); System.out.println("Trade Message published (Reply)."+it.getStatus()); } public static void main(String[] args) { GatewayEndpointTest test = new GatewayEndpointTest(); Trade t = new Trade(); test.publishTrade(t); } } 咱们从应用程序上下文中获取一个tradeGateway bean并调用processTrade方法,彻底不依赖于消息框架。 为了完成这个实例,咱们能够配置一个Service Activator来从trades-in-channel(跟代理发布消息是同一个信道)中获取消息,并传递回复给trades-out-channel(跟代理监听回复的信道相同) public class TradeProcessor { public Trade receiveTrade(Trade t) { System.out.println("Received the Trade via Gateway:"+t); t.setStatus("PROCESSED"); return t; } } Asynchronous Gateway: 要获取异步效果,那么服务接口的返回值须要改变,如今咱们让它返回一个Future对象。 import java.util.concurrent.Future; public interface ITradeGatewayAsync { public Future processTrade(Trade t); } public void publishTrade(Trade t) { Future f = tradeGateway.processTrade(t); try { Trade ft = f.get(); } catch (Exception e) { .. } } ------------------------------------------------------------------------------- Delayer:延迟器 用于在发送者和接收者之间引入延迟。 全部的进入in-channel的消息将会被延迟5秒后传送给out-channel,若是default-delay设置为0或者负数时,会马上转发消息。 咱们还能够经过消息的header字段来定义每一个消息的延迟周期,为此咱们须要使用delay-header-name来让框架知道。 上面配置指定全部具备MSG_DELAY属性的消息都将按照本身设置的头字段值进行延迟,没有该属性设置的消息按照系统配置的默认值延迟。 ---------------------------------------------------------------------------------- Spring 表达式: Spring集成支持Spring表达式(SpEL)定义。 咱们能够用表达式来在消息头和负载中求取属性值。 这里headers 属性引用MessageHeaders,因此咱们可使用headers.property_name语法来查属性值。 相似的,消息的payload能够做为payload属性来用,因此咱们可使用点号查询payload对象属性值。 另外端点好比Transformer,Filter,Service Activator以及Splitter都支持SpEL。 ---------------------------------------------------------------------------------------- 脚本支持: 可使用框架扩展来支持脚本语言,咱们可使用框架支持的语言编写脚本,而后被端点调用。 事实上咱们可使用实现了JSR-223的任何语言。Groovy,Python/Jython,Ruby/JRuby,JavaScript等。 下面例子是端点从in-channel中获取消息而后传递给position-transformer.groovy脚本: 在该脚本的执行上下文中,脚本能够经过headers和payload变量访问消息的MessageHeaders和Payload。 咱们还能够直接将脚本以CDATA元素的形态直接嵌入到配置文件中。 ------------------------------------------------------------------------------------------ Consumers:消费者 咱们的信道有两种一种是pollable可轮询的,一种是subscribable可订阅的,一样的咱们的终端消费者也可分为 Polling Consumer和Event-Driven客户。 轮询消费者基于轮询配置为消息轮询信道,它是由客户程序驱动的。 事件驱动消费者预订了一个可预订信道,当消息到达时能够被异步通知。 Polling Consumers: 其特色是定时的为消息进行轮询,框架提供了PollingConsumer类来完成这项工做。 在实例化它是须要给构造函数传入一个可轮询信道和一个消息处理器。 消息处理器是一个处理发布到该信道的消息的接口定义。 private MessageHandler positionsHandler = null; private QueueChannel positionsChannel = null; ... // Instantiating a PollingConsumer PollingConsumer consumer = new PollingConsumer(positionsChannel, positionsHandler); Message m = channel.receive();//or other receive methods System.out.println("Payload: " + m.getPayload()); 消息处理器: public class PositionsHandler implements MessageHandler { public void handleMessage(Message message) throws MessagingException { System.out.println("Handling a message: "+ message.getPayload().toString()); } } public class PositionsPollingConsumer { private PollingConsumer consumer = null; private PositionsHandler positionsHandler = null; public PositionsPollingConsumer(ApplicationContext ctx, QueueChannel positionsChannel) { //instance of handler positionsHandler = new PositionsHandler(); // now create the framework's consumer consumer = new PollingConsumer(positionsChannel, positionsHandler); //You must set the context, or else an error will be thrown consumer.setBeanFactory(ctx); } public void startConsumer() { consumer.start(); } } 调用: PositionsPollingConsumer ppc = new PositionsPollingConsumer(ctx, positionsChannel); ppc.startConsumer(); 使用触发器轮询: 上面例子的调用不是轮询方式的,咱们须要它按时方式轮询,则须要使用框架的Triggers来完成。 框架为咱们提供了两种类型的触发器:PeriodicTrigger 和 CronTrigger PeriodicTrigger是按照固定的时间间隔轮询 CronTrigger则是基于Unix的cron表达式进行轮询,在任务计划须要复杂需求时,该方式更加灵活。 一旦选中了触发器,咱们就须要实例化它并将它安装到consumer,咱们还能够设置它的initialDelay和fixedRate来进一步控制轮询。 PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000); // let the polling kick in after half a second periodicTrigger.setInitialDelay(500); // fixed rate polling? periodicTrigger.setFixedRate(false); 其中initialDelay设置轮询只有在超过期间周期开始轮询 fixedRate是一个布尔变量,指定轮询是否须要在规定的时间间隔以内,若是设置为true,则若是当前的消息处理超过了轮询周期, 则轮询会处理下一个消息。 CronTrigger可让consumer作更多的轮询,好比咱们须要在工做日的午夜叫醒工做,使用corn表达式能够这样复杂的情型。 Cron表达式表示为由空格分开的字段,有六个字段,每一个字段表示时间不一样的内容。声明一个表达式表现咱们的时间需求 // start polling all weekdays at exactly one minute past midnight String cronExpression="* 01 00 * * MON-FRI"; cronTrigger = new CronTrigger(cronExpression); ------------------------------------------------------------------------------------------------------- Event-Driven Consumers: 订阅消息的消费者被归类为Event-Driven Consumers,框架定义该类型消费者为EventDrivenConsumer 它的基本特征是等待某人分发到达信道的消息,SubscribableChannel信道支持这类消费。 private EventDrivenConsumer consumer = null; private PositionsHandler positionsHandler = null; private ApplicationContext ctx = null; public PositionsEventDrivenConsumer(ApplicationContext ctx,PublishSubscribeChannel positionsChannel) { positionsHandler = new PositionsHandler(); // instantiate the event driven consumer consumer = new EventDrivenConsumer(positionsChannel, positionsHandler); consumer.setBeanFactory(ctx); } public void startConsumer() { // EventDrivenConsumer exposes start method consumer.start(); } ===================================================================================================== Transformer:转换器 并非全部的程序都能理解它们消费的数据,好比一个生产者使用Java对象做为它的负载制造一条消息,而消息消费者对非Java对象 数据感兴趣,好比XML或者名值对。为了帮助消息生产者和消费者进行交流,咱们就须要为他们定义转换器。 框架提供了转换器组件,好比Object-to-String,Object-to-Map等。 框架内建的转换器: 好比对象到字符串,map,或者JSON格式,框架直接提供。 String转换器: 这里须要注意须要做为payload的POJO对象重写toString()方法,以免出现意外结果。 Map Transformer: Object --〉Map: Map--〉Object: 序列化和反序列化转换器: JMS的消息在发送时必须序列化并在接收时反序列化,Payload序列化,即将POJO转换为字节数组,反序列化是将字节数组转换为POJO对象。 JSON 转换器: XML转换器:使用了Spring的Object-to-XML(OXM)框架。 org.springframework.oxm.Marshaller 和 org.springframework.oxm.Unmarshaller 咱们须要XML命名空间来访问XML转换器, 声明了空间后,咱们就可使用marshalling-transformer元素来读取一个输入信道的消息。 格式化成XML格式而后传递回给输出信道。 这里设置的可选参数result-type决定者结果类型。 框架有两个内建的结果类型:javax.xml.transfor.dom.DOMResult 和 org.springframework.xml.transform.StringResult DOMResult是默认结果类型。 若是咱们但愿定义本身的结果类型: 这里TradeResultFactory 有一个方法createResult实现,它继承自ResultFactory public class TradeResultFactory implements ResultFactory { public Result createResult(Object payload) { System.out.println("Creating result ->"+payload); //create your own implementation of Result return new TradeResult(); } } XPath 转换器: 使用XPath表达式解码XML payload,要求输入信道传入XML负载的消息。 咱们能够经过以下方式建立XML格式payload消息发布到trades-in-channel信道: private String createNewTradeXml() { return ""; } 自定义转换器: public class TradeMapTransformer { public Map transform(Trade t) { Map tradeNameValuesMap = new HashMap(); tradeNameValuesMap.put("TRADE_ID", t.getId()); tradeNameValuesMap.put("TRADE_ACCOUNT", t.getAccount()); ... return tradeNameValuesMap; } } 接下来就是让框架知道咱们定义的转换类: 这里须要为transformer元素声明的内容有: 一个输入信道,一个输出信道,还有就是转换器实现类。 String转换器: POJS-to-String: public class PojoToStringTransformer { private final String tradeString = "TRADE_ID=%s, TRADE_ACCOUNT=%s, TRADE_SECURITY=%s, TRADE_DIRECTION=%s, TRADE_STATUS=%s" ; public String transform ( Trade t ) { return String.format( tradeString, t.getId(), t.getAccount(), t.getSecurity(), t.getDirection(), t.getStatus() ) ; } } 配置转换器: ------------------------------------------------------------------------------------------------------ 使用标签: 咱们可使用框架的@Transformer 声明标签来引入转换器bean。 component-scan容许容器在transformers包里扫描声明标记了的bean。 此时AnnotatedTradeMapTransformer 类会被实例化: @Component public class AnnotatedTradeMapTransformer { @Transformer public Map transform(Trade t) { Map tradeNameValuesMap = new HashMap(); .... return tradeNameValuesMap; } } @Transformer 标记的方法会被调用。 ========================================================================= 工做流组件: 消息应用有时须要一些额外的组件,好比路由,聚合aggregation,排序sequencing等。 一个应用程序可能有特定的条件来路由信息到多个信道或者分解信息而后聚合他们作更深刻的处理。 Spring Integration框架提供了Filters,Routes,Aggregators,和 Splitters等能够直接使用的组件。 Filters: 消费者有不一样的消息消费需求,Spring Integration框架使用Filters和配置的条件来决定哪一个应用程序应该接收消息。 public class NewTradeFilter { public boolean isNewTrade(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("new")); } } 使用框架的MessageSelector: public class CancelTradeFilter implements MessageSelector{ public boolean accept(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("cancel")); } } 非公用的Filter咱们能够采用内嵌声明: 使用标签: @Component public class AnnotatedNewTradeFilter { @Filter public boolean isTradeCancelled(Message message) { Trade t = (Trade)message.getPayload(); return (t.getStatus().equalsIgnoreCase("cancel")); } } ---------------------------------------------------------------------- 丢弃消息: 框架容许在消息不符合过滤条件时,过滤器抛出异常或者转发另外的信道。 为了能抛出异常,咱们添加throw-exception-on-rejection属性到过滤器元素。 或者我在过滤器中装配一个信道来接收这些被丢弃的消息,咱们使用discard-channel属性来设置丢弃信道 ---------------------------------------------------------------------------- 路由器: 工做流有一个需求就是根据特定条件将消息发送给一个或者多个信道。 一个路由器组件能够用于分配消息到多个目的地。 路由器会从一个信道获取消息并基于payload或者headers内容从新投递到相关的信道。 filter和router不一样,Filter基于简单的布尔测试决定消息是否被发送,Router基于内容来决定如何转发消息。 在Filter里,消息只有两个方向能够去,继续向前传递或者被丢弃,使用Filter时,消息一个消息可能会也可能不会出如今输出信道中。 而使用Router,则单个消息能够被发送给一个或者多个信道。 框架提供了一些内建的路由器:基于消息负载内容的PayloadTypeRouter,和基于消息header值的HeaderValueRouter。 PayloadTypeRouter:基于payload的类型决定将消息路由到哪一个信道。 路由组件会附加到一个输入信道,将获取消息负载类型,并据此分配它们到特定类型需求的其它信道。 假设咱们有一个信道从外部应用流入Account和Trade消息,咱们但愿分离他们到两个不一样的信道,Accounts进入accounts-channel ,Trades进入trades-channel 为此咱们能够在输入信道all-in-channel中装配一个payload-type-router组件,而后使用其mapping属性来设置期待的类型和其相应的 分配信道。 自定义路由器: 咱们定义路由时须要获取消息并解析消息的负载或者头信息,而后根据结果返回相应的信道名称。 public class BigTradeRouter { public String bigTrade(Message message){ Trade t = message.getPayload(); // check if the trade is a big one and if it is // send it to a separate channel to handle them if(t.getQuantity() > 1000000) return "big-trades-channel"; // else send a normal channel return "normal-trades-channel"; } } 在配置文件中配置自定义的路由器: -------------------------------------------------------------------------------------- Recipient List Router 接收表路由器 分配给某信道的消息定义在一个接收列表中: 好比下面的 一个Trade消息被分配到三个子流信道:persistor-channel 来保存全部输入的Trade消息 trades-channel用于处理Trade, audit-channel则用于审计目的。 ---------------------------------------------------------------------------------------- Unqualified Messages:不合格消息处理 对于一些没法经过特定路由逻辑的消息,框架既能够抛出异常也能够把它们推到一个默认的信道里。 default-output-channel ... resolution-required 属性用在路由器上跟默认的输出信道关联。它会基于信道的ID来解析任意消息信道。 若是resolution-required设置为true,可是default-output-channel没有,则会抛出异常。 使用路由器标签:@Router @Component public class AnnotatedBigTradeRouter { @Router public String bigTrade(Message message) { Trade t = message.getPayload(); if (t.getQuantity() > 10000) return "big-trades-channel"; return "trades-stdout"; } } ---------------------------------------------------------------------------------------------- Splitters: 一般用于切分消息到小块,用于更小的自定客户逻辑。 好比将一个大的消息负载切分红小块来并行处理。 框架提供了splitter元素来定义它。 咱们能够经过自定义POJO实现自定义逻辑或者扩展框架提供的AbstractMessageSplitter抽象类实现其splitMessage()方法 来定义切割器。 自定义实现,通常须要咱们定义一个简单的POJO,而后实现切分算法。 好比一个正常的Trade和一些加密数据被传入,需求是获取加密数据到另外的对象EncryptedTrade 新建立的EncryptedTrade和原来的Trade都将发送给输出信道。 咱们就可使用切分器来处理。 Trade和EncryptedTrade都继承自ITrade,其中ITrade只是一个没有任何内容定义的标记接口。 public class Trade implements ITrade{ private String encryptedMsg = null; ... public String getEncryptedMsg() { return encryptedMsg; } public void setEncryptedMsg(String encryptedMsg) { this.encryptedMsg = encryptedMsg; } ... } public class EncryptedTrade implements ITrade{ private String encryptedMsg = null; public EncryptedTrade(String encryptedMsg) { this.encryptedMsg = encryptedMsg; } public String getEncryptedMsg() { ... } public void setEncryptedMsg(String encryptedMsg) { ... } } 其切分过程是从Trade对象中获取encryptedMessage用于构造EncryptedMessage对象。 咱们定义CustomEncryptedTradeSplitter封装上面的逻辑实现: public class CustomEncryptedTradeSplitter{ public List splitMyMessageToTrades(Message message) { List trades = new ArrayList(); TradeImpl t = (TradeImpl)message.getPayload(); //Create a new object from the payload EncryptedTrade et = new EncryptedTrade(t.getEncryptedMsg()); trades.add(t); trades.add(et); System.out.println("Splitting message done, list: "+trades); return trades; } } ------------------------------------------------------------------------------- 使用AbstractMessageSplitter抽象类定义: public class EncryptedTradeSplitter extends AbstractMessageSplitter{ @Override protected Object splitMessage(Message message) { .... return trades; } } 标签 @Splitter 声明让框架知道咱们定义了一个用于切分器的bean。 而后咱们必须为方法提供@Splitter标签: @Component public class AnnonatedEncryptedTradeSplitter{ @Splitter public List splitMyMessageToTrades(Message message) { .. } } 该方法返回一个对象集合,每一个对象都会被包裹在Message中做为payload 每一个子消息都会被印上相同的集合ID:CORRELATION_ID 在SEQUENCE_SIZE参数设置母消息要被切分红子消息的个数。 在每一个子消息上印上单独的SEQUENCE_NUMBER Received a message:[Payload=Trade [...][Headers={sequenceNumber=1, correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}] Received a message:[Payload=EncryptedTrade[...]][Headers={sequenceNumber=2, correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}] 其中correlationId相同说明这两个子消息相互关联,表示CORRELATION_ID。 sequenceSize 说明由几个子消息组成,由SEQUENCE_SIZE 头属性指定的,。 sequenceNumber 表示每一个子消息的序列码。 以上三个属性将在消息聚合时发挥重要做用。 ---------------------------------------------------------------------------------- Aggregation 消息聚合 装配多个消息来建立一个父消息,与Splitter是一个相反的过程。 在装配开始以前要求全部的参与装配的子消息必须都到达。装配它们要基于相关性和其发布策略。 public class TradeAggregator { public ITrade aggregateTrade(List childTrades) { ... } } ------------------------------------------------------------------------------------------------- 策略: 聚合不会单独工做,这种成对算法叫作策略。 这对聚合行为很是重要。 一个父消息被切分红大量的子消息,接下来聚合器须要等待这些子消息所有到达,而后才能聚合它们再次成为一个父消息。 聚合器会遵循某种算法来开始和结束它的工做。这种算法是以correlation和release策略的算法提供给聚合器的。 聚合器使用这些策略来追踪流入的子消息并对它们进行聚合处理。 Correlation Strategy:关联策略 定义了用于分组消息的键。默认分组是基于CORRELATION_ID ,全部具备相同CORRELATION_ID的消息会被放到同一个篮子里等待聚合。 框架提供了HeaderAttributeCorrelationStrategy能够直接使用,也能够自定义本身的策略。 定义本身的策略能够经过实现CorrelationStrategy接口或者本身建立本身的POJO。 public class MyCorrelationStrategy implements CorrelationStrategy { public Object getCorrelationKey(Message message) { // implement your own correlation key here // return .. } } 若是是本身定义的POJO则须要在配置时指定方法名:而且要求方法输入Message参数,返回一个Object对象。 Release Strategy:发布策略 该策略指定集合后的信息在那个点上被发送或者发布。 除了设置了release-on-expire标记状况外,它会等待信号发送。 默认的策略是 SequenceSizeReleaseStrategy,它实现了ReleaseStrategy接口。 它会检查经过SEQUENCE_SIZE分组的消息,好比SEQUENCE_SIZE是10,只有当接收全部10个消息而且序列码在1~10返回内时, 策略会触发一个信号给聚合器。 跟CorrelationStrategy 相似,咱们能够经过实现ReleaseStrategy或者建立本身的POJO类来实现。 public class MyReleaseStrategy implements ReleaseStrategy { public boolean canRelease(MessageGroup group) { // implement your strategy here return false; } } 若是自定义POJO则须要给实现方法输入一个java.util.List对象,返回一个布尔型返回值。 使用release-strategy 和 release-strategy-method 配置: -------------------------------------------------------------------------------------------- 消息存储: 聚合器会暂存消息直到相关联的消息都收到。即便有一子消息没收到,聚合都不能发布(除非存储过时) 这就要求聚合器有一个地方存储消息。 框架提供了相应的选项来该聚合器提供消息存储。 有两种存储选项:内存或者外部数据库 内存存储是默认的,它会经过java.util.Map收集消息存储到内存中。 框架提供了message-store 属性来引用相应的消息存储。 若是是默认的内存存储,则能够忽略。 下列配置是一个数据库存储: ------------------------------------------------------------------------------------------ Resequencer 排序: 消息系统一个重要的特征就是消息的顺序。 尽管排序会伤害性能,可是某些状况下仍是须要强调排序的,好比在灾难恢复时,要求消息按照原来的顺序回复。 Resequencer组件可以对接收的消息进行排序。 它会做用于SEQUENCE_NUMBER头字段来追踪顺序。 若是咱们将其 release-partial-sequences 标记设置为true,它会在已收到消息就发布,而不会等待全部的分组消息成员都到达。 ================================================================================================= Adapter 适配器: 框架提供了不少能够直接使用的适配器,它们分为进项适配器和出项适配器。 Inbound adapters获取文件或者数据库结果集,Outbound Adapter则是从信道中获取消息而后转换它们成为文件传输到一个文件系统中, 或者 转换成数据库记录保存的数据库中。 这一切的基础就是文件适配器,其它适配器工做方式与之相似。 File Adapters: 用于从不一样的文件系统中获取或者拷贝文件,而后转换成框架的Message,发布到一个信道,反之。 框架支持使用文件空间的声明模型,它还提供了一些类来读取和写入文件操做,可是咱们推荐使用命名空间。 使用Namespace file 提供了不少直接使用的元素定义。 在使用以前咱们须要首先引入该Namespace ... 框架为file提供了两个适配器用于读取和写入文件。 inbound-channeladapter 元素用于读取文件并将它们做为File 负责的消息发布到一个信道。 outbound-channel-adapter 元素用于从信道中获取消息的负载的File,将其写入到文件系统。 上面的配置设置了适配器以一秒为频次读取指定文件内容,发布到files-out-channel信道。 这里简单设置了输出适配器,从信道files-out-channel中获取消息而后在控制台打印他们。 File 适配器有一些参数能够设置: 组织重复读取文件的设置: prevent-duplicates 该标记设置只做用于每次会话,若是读取重启则设置失效。 过滤:使用FileListFilter接口的实现 来作。框架提供了一个AcceptOnceFileListFilter,它在当前会话中只接收一个文件一次。 咱们能够经过实现FileListFilter来自定义更多过滤: public class PositionsFilter implements FileListFilter { public List filterFiles(Position[] files) { List filteredList = new ArrayList(); // implement your filtering logic here return filteredList; } } 为了防止重复读取文件,咱们可使用filename-pattern 和 filename-regex属性来阻止: ... 这样适配器只获取扩展名为pos的文件,咱们还能够经过正则表达式加以规范。 文件锁定: 咱们可使用框架提供了FileLocker接口实现来锁定文件防止其它进程访问。 咱们能够添加本身的自定义锁: ------------------------------------------------------------------------------------ 独立文件读取器: 框架提供FileReadingMessageSource 类,它实现了框架的MessageSource接口,定义了receive()方法。 这是全部须要轮询消息的基础接口,返回值是Message对象,该对象包含java.io.File做为负载。 public class StandaloneFileAdapterTest { // set the directory from where the files need to be picked up File directory = new File("/Users/mkonda/dev/ws"); public void startStandaloneAdatper() { FileReadingMessageSource src = new FileReadingMessageSource(); src.setDirectory(directory); Message msg = src.receive(); System.out.println("Received:"+msg); } public static void main(String[] args) { StandaloneFileAdapterTest test = new StandaloneFileAdapterTest(); test.startAdatper(); } } // declaring the framework's class as a bean private void startAdapterUsingDeclaredBeanRef() { ctx = new ClassPathXmlApplicationContext("adapters-file-beans.xml"); fileReader = ctx.getBean("fileReader", FileReadingMessageSource.class); // now you got the instance, poll for msgs Message msg = fileReader.receive(); System.out.println("Message received from the bean:" + msg); } ----------------------------------------------------------------------------------------------- Outbound Adapter:出项适配器 filewriter 适配器就是从信道中获取消息而后把它们写入文件系统。 在file命名空间下使用outbound-channel-adapter 元素来指定出项适配器。 这里定义的适配器从positions-file-channel 信道获取消息,而后写入到 directory属性指定的目录中。 综合入项和出项适配器: ----------------------------------------------------------------------------------------- 独立的文件适配器: 使用standalone 类更加直观,使用一个要写入文件的位置目录实例化 FileWritingMessageHandler // set the directory File directory = new File("/Users/mkonda/dev/ws/tmp"); .. private void startStandaloneWriter() { // fetch the channel for incoming feed outChannel = ctx.getBean("files-channel",PublishSubscribeChannel.class); handler = new FileWritingMessageHandler(directory); // subscribe to the incoming feed outChannel.subscribe(handler); } ------------------------------------------------------------------------------------------ FTP 适配器: 咱们使用File Transfer Protocol来进行远程文件获取和本地文件上载。 咱们的框架提供了inbound和outbound信道适配器。 输入信道适配器链接到一个FTP Server来获取远程文件并将它们做为消息Message的负载。 输出信道适配器链接信道,消费消息,并把消息的负载写入到远程服务器目录。 这两个适配器均可以使用ftp命名空间下的inbound-channel-adapter 和 outbound-channeladapter 配置,在配置它们时一个前提条件是链接配置。 Session Factory:会话工厂 适配器应该要知道它所要链接的服务器的详细细节,包括用户名和密码。 框架的DefaultFtpSessionConnectionFactory类提供了这些内容。 咱们须要在配置文件中声明它,并设置相应的属性值。 而后将该bean引用给session-factory属性。 ... 有了会话工厂,咱们就能够定义FTP适配器了。 Inbound FTP 适配器 使用会话工厂提供的链接远程文件系统,并轮询文件。若是发现文件,它会获取文件并以其做为消息的负载建立消息Message 而后发送给指定的信道进行进一步处理。 ... 组件使用Session 工厂链接远程服务器,从remote-directory指定的文件目录获取文件,并把它们包裹成消息发布到指定信道。 咱们能够经过 filename-pattern, filename-regex等设置获取文件的规则。 这里的local-directory指定的目录是组件在开始轮询远程文件以前会先检查本地这个目录。 一旦全部的本地文件都被发布了,才会开始对远程文件进行轮询和传输。 咱们能够定义本身的文件过滤类,使用filter属性指定到配置文件中。 Outbound FTP 适配器: 用于建立一个经过FTP发布消息到远程文件系统的终端。 ----------------------------------------------------------------------------------------------- Caching Session:缓存会话 框架在双向信道适配器器上建立一个FTP 会话池来优化网络访问。 咱们能够经过属性cache-session设置为false,来关闭它。 ----------------------------------------------------------------------------------------------- JMS 适配器: 框架提供了输入和输出适配器来跨外部消息系统接收和发送消息。 输入适配器能够从一个JMS目标地(topic 或者queue)获取消息,而后发布他们到本地信道。 输出适配器能够将本地信道负载转换成JMS消息发布到JMS目的地(topic或者queue). 相关的适配器元素定义在jms命名空间下: Inbound 适配器:接收消息 从消息系统接收消息可能很复杂,取决因而由消费客户端驱动仍是消息提供者驱动。 客户端会基于某种规则轮询消息,服务器也会在消息到达时直接发送给客户端(也就是消息驱动或者事件驱动)。 同步消费者: inbound-channel-adapter负责从JMS服务器获取消息,经过对端点的配置来链接JMS Server,获取消息,发布消息到一个本地的信道。 这是一种拉消费,在底层,它使用JmsTemplate的receive方法来拉消息。咱们也能够提供一个JmsTemplate实例或者同时提供 connectionFactory和destination。 tcp://localhost:61616 链接工厂封装链接外部JMS提供者的详细信息链接信息,简单的定义为一个bean,并使用适配器的connection-factory属性 装配到适配器。若是你把该bean的名字定义成connectionFactory,则不须要指定它,适配器会自动找该名字的bean注入。 咱们使用ActiveMQ做为提供者,brokerURL指向本地的ActiveMQ服务器。 这里须要注意的是JMS Destination对象,它实质上是JMS技术里的一个Queue,适配器链接本地ActiveMQ服务,检查POSITIONS_QUEUE 获取发现的消息,发布它到本地应用程序信道positions-channel. Message-Driven Consumers:消息驱动消费 该情形是服务端基于订阅状况来驱动的消费类型。message-driven-channel-adapter 元素定义它,消费者须要一个 Spring MessageListener容器或者一个connectionFactory和destination的组合。 这里须要将Spring的对象转换为JMS Message,或者将Message转换回Spring对象, extract-payload属性用于转换消息的Payload 关于消息的Payload转换: 咱们须要使用转换器从JMS Message中取出payload,而后放到本地message中。框架为咱们提供了SimpleMessageConerter 它会将内容转换为咱们须要的消息payload, 若是JMS是一个TextMessage,会转换为String,若是是ByteMessage,它会转换为bytes。 须要注意的是只有属性extract-payload设置为true时,转换器才会启动,默认为true。 咱们还能够经过message-converter属性指定自定义的转换器。 ----------------------------------------------------------------------------------------------------- 发布消息:输出适配器 它的任务是从信道获取消息并发布到JMS Queue或者Topic中。 底层咱们使用JmsSendingMessageHandler,相反的当 extract-payload设置为true时,适配器会转换信道的payload到JMS Message 内容。 ============================================================================================================== JDBC 适配器: 一样分为进项适配器和出向适配器,进向适配器从数据库中获取数据并将结果集做为Message负载发布到本地信道。 出向适配器读取信道的消息数据保存到数据库中。 进向JDBC适配器: 负责读取数据集而后转换成消息,jdbc命名空间下inbound-channel-adapter用于建立这类端点。 适配器提供一个SQL查询和一个目的信道,同时还定义一个Datasource实例,用它来提供相关数据库的链接设置。 上面的设置使用query查询ACCOUNTS表状态为NEW的数据而后转换为Message发布到resultset-channel信道。 这里会将整个查询结果集List做为一个消息的payload,记录的类型依赖于咱们行映射策略。 有时候咱们不但愿轮询结果包含重复的结果,框架提供了update语句追加到每次查询上。 咱们每次拉数据时,咱们会使用select查询中的特定设置更新记录,来避免获取以更新的数据。 好比咱们只但愿获取新建立的Account记录,因此咱们能够更新名为POLLED的一列。 -------------------------------------------------------------------------------------------------- 出向JDBC适配器: 用于在数据库中执行SQL查询,而这查询语句是有从输入信道的消息里获取的内容构建的。 因此,出向适配器监听消息信道,获取消息,抽取消息中相关的值,构造查询语句并在数据库上执行该语句。 好比每一个出如今信道trade-persistence-channel的Trade消息应该被保存。 正常状况下,咱们会写一个消息消费终端来获取每一个消息而后用持久化机制来说消息保存到数据库。 然而,Spring Integration替咱们干了这些。 咱们只须要配置一个出向适配器: 这里比较有趣的是query参数的设置,咱们可使用payload键标记来规范参数。 每个输入的消息都会有一个Map类型的payload负载,从这里面咱们能够查询ID,ACCOUNT等 同时咱们还可使用headers这个Map值。 qyery="insert into TRADE t(ID,ACCOUNT,INSTRUMENT,EXPIRY) values(:payload[TRADE_ID], :payload[TRADE_ACCOUNT], :payload[TRADE_INSTRUMENT], :headers[EXPIRY])"> Map消息的建立代码: public Message> createTradeMessage(){ Map tradeMap = new HashMap(); tradeMap.put("ID", "1929303d"); tradeMap.put("ACCOUNT", "ACC12345"); //.. // Create a Msg using MessageBuilder Message> tradeMsg = MessageBuilder.withPayload(tradeMap).build(); return tradeMsg; } 如此,只要消息一到达persistence-channel,它就会被适配器获取并自动在数据库上执行构造的SQL。

相关文章
相关标签/搜索