第8章Enterprise integration patterns是core Camel的最后一章了,第二章已经介绍了一部分camel在eip中的应用。这一章所有都是讲eip.数据库
看来eip确实是camel的核心,camel确实是基于eip的。缓存
这一章包含了5方面的EIP问题:安全
■ The Aggregator EIP 消息合并
■ The Splitter EIP 消息分拆
■ The Routing Slip EIP 根据消息标签进行路由(也是一种路由)
■ The Dynamic Router EIP 动态路由
■ The Load Balancer EIP 负载均衡负载均衡
消息合并模式,以下图所示,只有当接收到三个标识相同的消息合并完成后再发送出去。1为消息标识,ABC为消息内容。dom
关于消息合并咱们须要关注三个方面:ide
1.Correlation identifierui
消息合并的标识,即如何肯定这几个消息是一组的。如上图标识的1,2.net
2.Completion condition对象
消息合并完成的条件,即多少个消息,多长时间内的消息合并。如上图3个消息合并完成。token
3.Aggregation strategy
消息合并的策略,即消息是如何合并的。这个须要本身实现接口AggregationStrategy。
public void configure() throws Exception {
from("direct:start")
.log("Sending ${body}")
.aggregate(xpath(/order/@customer), new MyAggregationStrategy()) //定义合并标示
.completionSize(2).completionTimeout(5000) //定义合并完成条件消息数量为2,等待时间为5s
.log("Sending out ${body}")
.to("mock:result");
}
消息合并的是须要等待的,默认状况下Camel是将收到的消息缓存在内存中,在生产中为了消息的安全,可能须要将收到的消息持久化。
Camel提供了接口AggregationRepository帮助咱们实现消息持久化,以下。
AggregationRepository myRepo = new
HawtDBAggregationRepository("myrepo", "data/myrepo.dat");
from("file://target/inbox")
.log("Consuming ${file:name}")
.aggregate(constant(true), new MyAggregationStrategy())
.aggregationRepository(myRepo)
.completionSize(3)
.log("Sending out ${body}")
.to("mock:result");
HawtDB是Camel自带的一个嵌入式数据库。
消息分拆模式与前一种模式恰好相反,是将接收到的消息按必定的规则分拆成多个消息发出去,以下图所示。
Camel对消息的分拆也提供了多种方法:
若是消息自己就是一个集合,能够用DSL中的split对消息分拆,示例以下:
public class SplitterABCTest extends CamelTestSupport {
public void testSplitABC() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("A", "B", "C");
List<String> body = new ArrayList<String>();
body.add("A");
body.add("B");
body.add("C");
template.sendBody("direct:start", body);
assertMockEndpointsSatisfied();
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start")
.split(body())
.log("Split line ${body}")
.to("mock:split");
}
};
}
}
若是消息自己是一个对象咱们也可能经过bean的形式对消息分拆,示例以下:
public void configure() throws Exception {
from("direct:start")
.split().method(CustomerService.class, "splitDepartments")
.to("log:split")
.to("mock:split");
}
最后对大消息的分拆可能采用流的形式如.split(body().tokenize("\n")).streaming()
有些特殊的消息咱们须要根据消息来动态的路由到一个多个结点。这是咱们能够采用routing Slip的模式,示例以下:
public void testRoutingSlip() throws Exception {
getMockEndpoint("mock:a").expectedMessageCount(1);
getMockEndpoint("mock:b").expectedMessageCount(0);
getMockEndpoint("mock:c").expectedMessageCount(1);
template.sendBodyAndHeader("direct:start", "Hello World",
"mySlip", "mock:a,mock:c");
assertMockEndpointsSatisfied();
}
路由定义为:from("direct:start").routingSlip("mySlip", ";");
关于Routing Slip的形式也有多种方式:
各split同样能够采用dsl的形式,bean的形式同时camel仍是提供了注解@RoutingSlip。
动态路由和Routing Slip同样也是不一样的消息路由到不一样的节点,以下。
public class DynamicRouterBean {
public String route(String body,
@Header(Exchange.SLIP_ENDPOINT) String previous) {
return whereToGo(body, previous);
}
private String whereToGo(String body, String previous) {
if (previous == null) {
return "mock://a";
} else if ("mock://a".equals(previous)) {
return "language://simple:Bye ${body}";
} else {
return null;
}
}
基本能够同上,也提供了注解的形式@DynamicRouter。
负载均衡就不用细说了,Camel提供了对负载均衡的支持,示例以下:
from("direct:start")
.loadBalance().roundRobin()
.to("seda:a").to("seda:b")
.end();
关于负载均衡的策略Camel也提供了6种策略:Random,Round robin,Sticky,Topic,Failover,Custom。
更详细的信息请查询手册。
小结:至此Camel 的核心部分已经介绍完了,后面会再写一篇关于Camel的事务控制(第9章),额外介绍下Camel关于路由质量的监控BAM组件的使用,
其余的章节就再也不介绍了。