假设您正在对流进行计数聚合,并但愿将运行计数存储在数据库中。如今假设您在数据库中存储了一个表示计数的值,而且每次处理新元组时都会增长计数。php
发生故障时,将重发送元组。这会在执行状态更新(或任何带有反作用的事物)时出现问题 - 您不知道之前是否曾基于此元组成功更新状态。也许你之前从未处理过元组,在这种状况下你应该增长计数。也许你已经处理了元组并成功递增了计数,可是元组在另外一个步骤中处理失败。在这种状况下,您不该增长计数。或许您以前看过元组但在更新数据库时出错。在这种状况下,您应该更新数据库。css
只需将计数存储在数据库中,您就不知道以前是否已经处理过这个元组。所以,您须要更多信息才能作出正确的决定。Trident提供如下语义,足以实现一次性处理语义:html
使用这些原语,您的State实现能够检测以前是否已经处理了一批元组,并采起适当的操做以一致的方式更新状态。您采起的操做取决于输入splot提供的确切语义,即每批中的内容。在容错方面有三种可能的splot:“非事务性”,“事务性”和“不透明事务性”。一样,在容错方面有三种可能的状态:“非事务性”,“事务性”和“不透明事务性”。让咱们来看看每一个splot类型,看看每种喷口能够达到什么样的容错能力。java
请记住,Trident将元组做为小批量处理,每一个批次都被赋予惟一的事务ID。spout的属性根据它们能够提供的关于每批中的含量的保证而变化。事务性spout具备如下属性:git
为何不老是使用事务性spout? 它们简单易懂。您可能不使用它的一个缘由是由于它们不必定很是容错。例如,TransactionalTridentKafkaSpout的工做方式是txid的批处理将包含来自主题的全部Kafka分区的元组。一旦批次被发出,那么在未来从新发出批次的任什么时候候都必须发出彻底相同的元组集合以知足事务性喷口的语义。如今假设从TransactionalTridentKafkaSpout发出批处理,批处理没法处理,同时其中一个Kafka节点发生故障。您如今没法重播与以前相同的批次(由于节点已关闭且主题的某些分区不可用),github
这就是存在“不透明事务”spout的缘由 - 它们对丢失源节点具备容错能力,同时仍容许您实现一次性处理语义。数据库
(一方面注意 - 一旦Kafka支持复制,就有可能拥有对节点故障具备容错能力的事务性spout,但该功能尚不存在。)apache
假设您的拓扑计算字数,而且您但愿将字数存储在键/值数据库中。键将是单词,值将包含计数。您已经看到只存储计数,由于该值不足以知道您以前是否处理过一批元组。相反,您能够作的是将事务id与数据库中的count一块儿存储为原子值。而后,在更新计数时,您只需将数据库中的事务ID与当前批次的事务ID进行比较。若是它们是相同的,则跳过更新 - 因为强大的排序,您肯定数据库中的值包含当前批次。若是它们不一样,则增长计数。这个逻辑有效,由于txid的批处理永远不会改变,bash
假设您正在处理由如下一批元组组成的txid 3:并发
["man"]
["man"]
["dog"]
复制代码
假设数据库当前包含如下键/值对:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
复制代码
与“man”关联的txid为txid 1.因为当前txid为3,所以您肯定该批次中未表示此批元组。所以,您能够继续将计数增长2并更新txid。另外一方面,“dog”的txid与当前的txid相同。所以,您肯定已知当前批次的增量已在数据库中表示为“dog”键。因此你能够跳过更新。完成更新后,数据库以下所示:
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
复制代码
模糊事务型spout不能保证txid的一批元组保持不变。不透明的事务性spout具备如下属性:
非事务型 spout不对每批中的物品提供任何保证。所以它可能最多只进行一次处理,在这种状况下,在批次失败后不会重试元组。或者它可能具备至少一次处理,其中元组能够经过多个批次成功处理。对于这种spout,没有办法实现彻底一次的语义。
模糊事务型 state 具备最好的容错性特征,不过这是以在数据库中存储更多的内容为代价的(一个 txid 和两个 value)。事务型 state 要求的存储空间相对较小,可是它的缺点是只对事务型 spout 有效。相对的,非事务型要求的存储空间最少,可是它也不能提供任何的刚好一次的消息执行语义。
你选择 state 与 spout 的时候必须在容错性与存储空间占用之间权衡。能够根据你的应用的需求来肯定哪一种组合最适合你。
简单输出数据
public class TridentTopology1 {
/**
* 接受一组输入字段并发出零个或多个元组做为输出 (相似storm bolt数据流处理组件)
* @author qxw
* @data 2018年9月19日下午6:17:14
*/
public static class MyFunction extends BaseFunction {
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("a: "+tuple.getIntegerByField("a"));
System.out.println("b: "+tuple.getIntegerByField("b"));
System.out.println("c: "+tuple.getIntegerByField("c"));
System.out.println("d: "+tuple.getIntegerByField("d"));
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
//固定批处理数据源(相似storm原生的spout) 声明2个输入的字段
FixedBatchSpout spout =new FixedBatchSpout(new Fields("a","b","c","d"),4,//设置批处理大小
new Values(1,4,7,10),
new Values(2,3,5,7),
new Values(6,9,7,2),
new Values(9,1,6,8) //设置数据内容
);
//是否循环发送
spout.setCycle(false);
//建立topology
TridentTopology topology =new TridentTopology();
//指定数据源
Stream input=topology.newStream("spout", spout);
//要实现storm原生spolt--bolt的模式在Trident中用each实现
input.each(new Fields("a","b","c","d"),
new MyFunction(),//执行函数 相似bolt
new Fields() //为空不向下发送
);
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentTopology1", conf, topology.build());
}
}
复制代码
经过要继承BaseFilter,重写isKeep方法
public class TridentTopology2 {
/**
* 能够海量数据进行过滤,须要继承BaseFilter,重写isKeep方法
* @author qxw
* @data 2018年9月21日上午10:57:00
*/
public static class MyFilter extends BaseFilter {
private static final long serialVersionUID = 1L;
public boolean isKeep(TridentTuple tuple) {
//可以被2对第1个和第2个值进行相加.而后除2,为0则发射,不为零则不发射射
return tuple.getInteger(1) % 2 == 0;
}
}
/**
* 相似原生storm bolt数据流处理组件
* @author qxw
* @data 2018年9月21日下午3:31:12
*/
public static class MyFunction extends BaseFunction{
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//获取tuple输入内容
Integer a = tuple.getIntegerByField("a");
Integer b = tuple.getIntegerByField("b");
Integer c = tuple.getIntegerByField("c");
Integer d = tuple.getIntegerByField("d");
System.out.println("a: "+ a + ", b: " + b + ", c: " + c + ", d: " + d);
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
//固定批处理数据源(相似storm原生的spout) 声明a,b,c,d四个字段
FixedBatchSpout spout =new FixedBatchSpout(new Fields("a","b","c","d"),4,//设置批处理大小
new Values(1,4,7,10),
new Values(2,3,5,7),
new Values(6,9,7,2),
new Values(9,1,6,8) //设置数据内容
);
//是否循环发送
spout.setCycle(false);
//建立topology
TridentTopology topology =new TridentTopology();
//指定数据源
Stream input=topology.newStream("spout", spout);
//要实现storm原生spolt--bolt的模式在Trident中用each实现 (随机分组)
input.shuffle().each(new Fields("a","b","c","d"),new MyFilter()).each(new Fields("a","b","c","d"), new MyFunction(),new Fields());
//本地模式
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentTopology2", conf, topology.build());
//集群模式
// StormSubmitter.submitTopology("TridentTopology1", conf, buildTopology());
}
复制代码
public class TridentWordCount {
public static class MyFunction extends BaseFunction {
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
String word=tuple.getStringByField("word");
Long count=tuple.getLongByField("count");
System.out.println(word+" : "+count);
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
/* 建立spout */
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 4,
new Values("java php asd java"),
new Values("php css js html"),
new Values("js php java java"),
new Values("a a b c d"));
//是否循环发送
spout.setCycle(false);
/* 建立topology */
TridentTopology topology = new TridentTopology();
/* 建立Stream spout1, 分词、统计 */
topology.newStream("spout", spout)
//先切割
.each(new Fields("sentence"), new Split(), new Fields("word"))
//分组
.groupBy(new Fields("word"))
//聚合统计
.aggregate(new Count(), new Fields("count"))
//输出函数
.each(new Fields("word","count"), new MyFunction(),new Fields())
//设置并行度
.parallelismHint(1);
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TridentWordCount", conf, topology.build());
}
}
复制代码
public class TridentDrpc {
private static class MyFunction extends BaseFunction{
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
String sentence = tridentTuple.getString(0);
for (String word : sentence.split(" ")) {
tridentCollector.emit(new Values(word));
}
}
}
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TridentTopology topology=new TridentTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
//本地模式
if (args.length==0){
LocalCluster cluster = new LocalCluster();
LocalDRPC drpc = new LocalDRPC();
Stream input=topology.newDRPCStream("data",drpc);
input.each(new Fields("args"),new MyFunction(),new Fields("result")).project(new Fields("result"));
cluster.submitTopology("wordCount", conf, topology.build());
//调用
System.err.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped"));
drpc.shutdown();
cluster.shutdown();
}else{
//集群模式
conf.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], conf, topology.build());
}
}
}
复制代码