Storm应用系列之——集成Kafka

本文系原创系列,转载请注明。html

原帖地址:http://blog.csdn.net/xeseojava

 

前言

在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是做为其数据源的很好的选择。本文就如何集成Kafka进行介绍。git

Kafka的基本介绍:http://blog.csdn.net/xeseo/article/details/18311955github

准备工做

KafkaSpout其实网上已经有人写了,在github上开源了,不用咱们本身造轮子。只是要注意版本问题:数据库

0.7版本的Kafka,对应KafkaSpout可使用Storm-contrib下面的例子apache

源码:https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafkaapp

Maven依赖:https://clojars.org/storm/storm-kafkamaven

0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,因此老的KafkaSpout再也不适用,必须使用新的KafkaAPI分布式

源码:https://github.com/wurstmeister/storm-kafka-0.8-pluside

Maven依赖:https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus

这里由于0.8版本的Kafka必然是未来主流,因此我就不介绍0.7 的了,使用方式基本上是相似的。

PS:

是人写的,就会有bug,况且是别人分享出来的。因此,遇到bug,还请去github上提交一个issue告诉做者修正。

2014/7/29 更新:

wurstmeister/storm-kafka-0.8-plus 如今合并到Apache Storm了,在其external/storm-kakfa目录

Maven依赖直接更新成:

[plain] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1.               <dependency>  

  2.   <groupId>org.apache.storm</groupId>  

  3.   <artifactId>storm-kafka</artifactId>  

  4.   <version>0.9.2-incubating</version>  

  5. </dependency>  

可是storm彷佛没有直接把external的包加载到classpath,因此使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。

固然,也能够在maven中加上<scope>compile</scope>,直接把该jar打到你项目一块儿。

使用KafkaSpout

一个KafkaSpout只能去处理一个topic的内容,因此,它要求初始化时提供以下与topic相关信息:

  • Kafka集群中的Broker地址 (IP+Port)

有两种方法指定:

1. 使用静态地址,即直接给定Kafka集群中全部Broker信息

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. GlobalPartitionInformation info = new GlobalPartitionInformation();  

  2. info.addPartition(0, new Broker("10.1.110.24",9092));  

  3. info.addPartition(0, new Broker("10.1.110.21",9092));  

  4. BrokerHosts brokerHosts = new StaticHosts(info);  

 

2. 从Zookeeper动态读取

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");  

推荐使用这种方法,由于Kafka的Broker可能会动态的增减

  • topic名字

  • 当前spout的惟一标识Id (如下代称$spout_id)

  • zookeeper上用于存储当前处理到哪一个Offset了 (如下代称$zk_root)

  • 当前topic中数据如何解码

了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端本身管理的。因此,后面两个的目的,实际上是在zookeeper上创建一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。

在Topology中加入Spout的代码:

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. String topic = "test";  

  2. String zkRoot = "kafkastorm";  

  3. String spoutId = "myKafka";  

  4.   

  5. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);  

  6. spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  

  7.   

  8. TopologyBuilder builder = new TopologyBuilder();  

  9. builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);  


其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. public class TestMessageScheme implements Scheme {  

  2.   

  3.     private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);  

  4.       

  5.     @Override  

  6.     public List<Object> deserialize(byte[] bytes) {  

  7.         try {  

  8.             String msg = new String(bytes, "UTF-8");  

  9.             return new Values(msg);  

  10.         } catch (InvalidProtocolBufferException e) {  

  11.             LOGGER.error("Cannot parse the provided message!");  

  12.         }  

  13.           

  14.         //TODO: what happend if returns null?  

  15.         return null;  

  16.     }  

  17.   

  18.     @Override  

  19.     public Fields getOutputFields() {  

  20.         return new Fields("msg");  

  21.     }  

  22.   

  23. }  

这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,因此这里也还原成String,定义输出为一个名叫"msg"的field。

后面就能够本身添加Bolt处理tuple中该field的数据了。

使用TransactionalTridentKafkaSpout

TransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不一样。

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);  

  2. kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  

  3.   

  4. TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);  

  5.   

  6. TridentTopology topology = new TridentTopology();  

  7. topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());  

看到它并无要求咱们提供zkRoot,由于直接代码里面写死了…… -_-T

地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是  /transactional/test_str/myKafaka

常见问题

1. 本地模式没法保存Offset

KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,因此是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。因此,每次关闭后,数据就没了。

本地模式,要显示的去配置

[java] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. spoutConfig.zkServers = new ArrayList<String>(){{  

  2.                 add("10.1.110.20");  

  3.                 add("10.1.110.21");  

  4.                 add("10.1.110.24");  

  5.             }};  

  6. spoutConfig.zkPort = 2181;  



2. 用Maven导入时,运行中SLF4J打印MutipleBinding 错误,致使无log输出。

缘由是在这个KafkaSpout的pom.xml里依赖了kafka_2.9.2,而这货带了一个slf4j-simple的SLF4J绑定,修复这个问题

[html] view plaincopyprint?在CODE上查看代码片派生到个人代码片

 

  1. <del><dependency>  

  2.   <groupId>net.wurstmeister.storm</groupId>  

  3.   <artifactId>storm-kafka-0.8-plus</artifactId>  

  4.   <version>0.2.0</version>  

  5.   <exclusion>  

  6.     <groupId>org.slf4j</groupId>  

  7.     <artifactId>slf4j-simple</artifactId>  

  8.   </exclusion>  

  9. </dependency></del>  


3. 若是在topology第一次启动前,往kafka里面写数据,启动Storm后,这部分数据读不出来

缘由是第一次启动topology时,在zookeeper上并未建立出保存Offset信息的节点,因此默认它会取当前partition最新的Offset(Kafka本身维护的单个partition上递增序号)。

理论上,若是找不到保存的Offset信息,应该从-1的Offset读起。

这个问题我给做者提出来了,但做者认为这样能够避免重复处理,我没有想通为什么会有重复处理。但好在做者说会在后续版本加入参数来控制。

刚去看了下,彷佛做者已经在提交 8b764cd fix掉了。有兴趣的能够去试下。我是本身本地改了他的代码。

以上问题已修复并合并。

相关文章
相关标签/搜索