1、原理介绍html
本文内容参考:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhostsjava
(一)使用storm-kafka的关键步骤git
一、建立ZkHostsgithub
当storm从kafka中读取某个topic的消息时,须要知道这个topic有多少个分区,以及这些分区放在哪一个kafka节点(broker)上,docker
ZkHosts就是用于这个功能。apache
关于kafka信息在zk中的内容请参考:http://blog.csdn.net/jinhong_lu/article/details/46653087api
建立zkHosts有2种形式:app
public ZkHosts(String brokerZkStr, String brokerZkPath) 框架
public ZkHosts(String brokerZkStr)less
默认状况下,zk信息被放到/brokers中,此时可使用第2种方式:
new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181”)
若zk信息被放置在/kafka/brokers中,则可使用:
publicZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181",“/kafka")
或者直接:
new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181/kafka”)
默认状况下,每60秒去读取一次kafka的分区信息,能够经过修改host.refreshFreqSecs来设置。
除了使用ZkHosts来读取分析信息外,storm-kafka还提供了一种静态指定的方法,如:
由此能够看出,ZkHosts完成的功能就是指定了从哪一个kafka节点读取某个topic的哪一个分区。
二、建立KafkaConfig
(1)有2种方式建立KafkaConfig
public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
BrokerHosts就是上面建立的实例,topic就是要订阅的topic名称,clientId用于指定存放当前topic consumer的offset的位置,这个id 应该是惟一的,不然多个拓扑会引发冲突。
事实上,trident的offset并不保存在这个位置,见下面介绍。
真正使用时,有2种扩展,分别用于通常的storm以及trident。
(2)core storm
Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely identify your spout.
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
KafkaSpout 只接受 SpoutConfig做为参数
(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用参数
trident消费kafka的offset位置是在创建拓扑中指定,如:
topology.newStream(test, kafkaSpout).
则offset的位置为:
/transactional/test/coordinator/currtx
(4)KafkaConfig的一些默认参数
能够经过如下方式修改:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
三、设置MultiScheme
MultiScheme用于指定若是处理从kafka中读取到的字节,同时它用于控制输出字段名称。
public Iterable<List<Object>> deserialize(byte[] ser);
public Fields getOutputFields();
默认状况下,RawMultiScheme读取一个字段并返回一个字节,而发射的字段名称为bytes。
能够经过SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改变这种默认行为:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
上面的语句指定了将字节转化为字符。
同时创建拓扑时:
topology.newStream(“test", kafkaSpout).
each(new Fields("str"), new FilterFunction(),new Fields("word”))….
会指定发射的字段名称为str。
四、建立Spout
(1)core storm
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
(2)trident
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
五、创建拓扑:
(1)core-storm
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 12);
kafka-reader指定了spout的名称,12指定了并行度。
(2)trident
topology.newStream(“test", kafkaSpout).
each(new Fields("str"), new FilterFunction(),new Fields("word”))….
test指定了放置offset的位置,也就是txid的位置。
str指定了spout发射字段的名称。
完整示例:
Core Spout
Trident Spout
(二)当拓扑出错时,如何从上一次的kafka位置继续处理消息
一、KafkaConfig有一个配置项为KafkaConfig.startOffsetTime,它用于指定拓扑从哪一个位置上开始处理消息,可取的值有3个:
(1)kafka.api.OffsetRequest.EarliestTime(): 从最先的消息开始
(2)kafka.api.OffsetRequest.LatestTime(): 从最新的消息开始,即从队列队伍最末端开始。
(3)根据时间点: 能够参阅 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的实现原理。
How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?
Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.
二、因为运行拓扑时,指定了offset在zk中保存的位置,当出现错误时,能够找出offset
当从新部署拓扑时,必须保证offset的保存位置不变,它才能正确的读取到offset。
(1)对于core storm,就是
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
后2个参数不能变化
(2)对于trident而言,就是
topology.newStream(“test", kafkaSpout).
第1个参数不能变化。
三、也就是说只要拓扑运行过一次KafkaConfig.startOffsetTime,以后从新部署时都可从offset中开始。
再看看这2个参数
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
若是将forceFromStart(旧版本是ignoreZkOffsets)设置为true,则每次拓扑从新启动时,都会从开头读取消息。
若是为false,则:
第一次启动,从开头读取,以后的重启均是从offset中读取。
通常使用时,将数值设置为以上2个便可。
(三)结果写回kafka
若是想把结果写回kafka,并保证事务性,可使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
Writing to Kafka as part of your topology
You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
You need to provide implementation of following 2 interfaces
TupleToKafkaMapper and TridentTupleToKafkaMapper
These interfaces have 2 methods defined:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.java implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
KafkaTopicSelector and trident KafkaTopicSelector
This interface has only one method
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
}
The implementation of this interface should return topic to which the tuple's key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.
Specifying kafka producer properties
You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs section "Important configuration properties for the producer", in your storm topology config by setting the properties map with key kafka.broker.properties.
附带2个官方的示例
For the bolt :
For Trident:
2、示例介绍
(一)简介
一、本项目完整代码请见https://github.com/jinhong-lu/stormkafkademo/tree/master/src/main/java/org/jinhong/demo/storm_kafka/trident。
二、本项目主要完成如下功能:
(1)从kafka中读取一个topic的消息,而后根据空格拆分单词,最后统计数据后写入一个HazelCastState(一个分布式的内存存储框架)。
(2)经过DRPC从上述的HazelCastState中读取结果,并将结果输出。
三、代码可分为3部分:
(1)单词拆分
(2)定义拓扑行为
(3)state定义
如下分为三部分分别介绍。
(二)单词拆分
原理很简单,就是经过空格将单词进行拆分。
这里的wordsplit是一个function,它继承自BaseFunction,最后,它将拆分出来的单词逐个emit出去。
(三)定义拓扑行为
一、定义kafka的相关配置
(1)建立一个topo。
(2)首先定义一个输入流,其中第一个参数定义了zk中放置这个topo元信息的信息,通常是/transactional/kafka
(3)对每一个输入的消息进行拆分:首先它的输入是字段名称为str的消息,而后通过WordSplit这个Function处理,最后,以字段名称word发送出去
(4)将结果根据word字段的值进行分组,就是说word值相同的放在一块儿。
(5)将分组的结果分别count一下,而后以字段名称aggregates_words写入HazelCastStateFactory定义的state中,关于state请见下一部分的介绍。
三、从分布式内存中读取结果并进行输出
(1)第三行定义了使用drpc须要处理的内容
(2)查询分布式内存中的内容,查询字段为word,而后以字段名count发送出去。
(3)将不须要统计的过滤掉。
(4)将结果进行聚合。
四、主函数
三个参数的含义为:
/* args[0]:kafkazk,如:192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181,192.168.172.117:2181/kafka
* args[1]:topo名称
* args[2]:niubus节点,如,192.168.172.98
*/
当参数数据大于1时,将拓扑提交到集群中,不然提交到本地。提交拓扑到集群的比较直观,下面郑重介绍一下drpc的查询。
(1)首先定义一个本地的drpc对象,以及一个本地storm集群。
(2)而后将拓扑群提交到本地集群。
(3)最后,使用drpuc不停的循环查询统计结果并输出。
注意上面的拓扑定义了2个流,第一个流用于接收kafka消息,而后拆分统计后写入内存,第二个流则接受drpc的输入,将drpc的输入拆分后,再统计须要查询的每一个单词的统计结果。如在本例中,须要显示单词the的数量。
在本例中,drpc和kafka没有本质的区别,它们都是一个用于向storm发送消息的集群,只是输入数据的方式有些不一样,kafka经过spout输入,drpc则直接经过execute()进行输入。
运行方式:
方式一:直接在eclipse右键运行,参数只填一个,如123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181,123.58.172.117:2181/kafka。只要保证kafka集群中有对应的topic,则会获得如下输出:
Word count: [[2]]
Word count: [[5]]
Word count: [[10]]
Word count: [[17]]
Word count: [[28]]
固然,统计结果根据输入kafka的内容而不一样。
(四)state定义
在定义拓扑的时候,最终的wordcount结果写在了HazelCastState中:
persistentAggregate(new HazelCastStateFactory(),new Count(), newFields("aggregates_words"))
下面咱们分析一下如何使用state来保存topo的处理结果,或者是中间处理结果。
注意,使用state除了能够保存最终的结果输出,以保证事务型、透明事务型之外,还常常用于保存中间结果。好比blueprint第3章的一个例子中,用于统计疾病的发生数量,若是超过预警值,则向外发信息。若是统计结果成功,但向外发送信息失败,则spout会重发数据,致使统计结果有误,所以,此时能够经过state将结果保存下来。
一、Factory类
内容很简单,就是返回一个state,它也是三个state相关的类中惟一对外的接口。
二、Handler类
使用单例模式返回一个map。
三、State类
真正处理业务逻辑的类。主要的方法有mutiPut和mutiGet,用于将结果放入state与取出state。