storm中的storm-kafka组件提供了storm与kafka交互的所需的全部功能,请参考其官方文档:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhostshtml
当storm从kafka中读取某个topic的消息时,须要知道这个topic有多少个分区,以及这些分区放在哪一个kafka节点(broker)上,ZkHosts就是用于这个功能。
建立zkHosts有2种形式:java
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
(1)默认状况下,zk信息被放到/brokers中,此时可使用第2种方式: node
new ZkHosts(“192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181”);
(2)若zk信息被放置在/kafka/brokers中(咱们的集群就是这种情形),则可使用: git
new ZkHosts(“192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181”,“/kafka”) 或者直接: new ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181/kafka”)
默认状况下,每60秒去读取一次kafka的分区信息,能够经过修改host.refreshFreqSecs来设置。github
(3)除了使用ZkHosts来读取分析信息外,storm-kafka还提供了一种静态指定的方法(不推荐此方法),如:apache
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
由此能够看出,ZkHosts完成的功能就是指定了从哪一个kafka节点读取某个topic的哪一个分区。编程
(1)有2种方式建立KafkaConfig json
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
BrokerHosts就是上面建立的实例,topic就是要订阅的topic名称,clientId用于指定存放当前topic consumer的offset的位置,这个id 应该是惟一的,不然多个拓扑会引发冲突。
事实上,trident的offset并不保存在这个位置,见下面介绍。
真正使用时,有2种扩展,分别用于通常的storm以及trident。
(2)core storm api
Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer’s offset. The id should uniquely identify your spout.
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:bash
// setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; // Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail(). // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent // resubmitting the message while still retrying. public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000;
KafkaSpout 只接受 SpoutConfig做为参数
(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用参数
trident消费kafka的offset位置是在创建拓扑中指定,如:
topology.newStream(test, kafkaSpout).
则offset的位置为:
/transactional/test/coordinator/currtx
(4)KafkaConfig的一些默认参数
public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean forceFromStart = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60;
能够经过如下方式修改:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
MultiScheme用于指定如何处理从kafka中读取到的字节,同时它用于控制输出字段名称。
public Iterable<List<Object>> deserialize(byte[] ser); public Fields getOutputFields();
默认状况下,RawMultiScheme读取一个字段并返回一个字节,而发射的字段名称为bytes。
能够经过SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改变这种默认行为:
kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());
上面的语句指定了将字节转化为字符。
同时创建拓扑时:
topology.newStream(“test",kafkaSpout).each(new Fields("str"),new FilterFunction(),new Fields("word”))….
会指定发射的字段名称为str。
(1)core storm
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
(2)trident
OpaqueTridentKafkaSpoutkafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
(1)core-storm
builder.setSpout("kafka-reader",new KafkaSpout(spoutConf),12);
kafka-reader指定了spout的名称,12指定了并行度。
(2)trident
topology.newStream(“test", kafkaSpout). each(new Fields("str"), new FilterFunction(),new Fields("word”))….
test指定了放置offset的位置,也就是txid的位置。str指定了spout发射字段的名称。
完整示例:
Core Spout
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Trident Spout
TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
KafkaConfig有一个配置项为KafkaConfig.startOffsetTime,它用于指定拓扑从哪一个位置上开始处理消息,可取的值有3个:
(1)kafka.api.OffsetRequest.EarliestTime(): 从最先的消息开始
(2)kafka.api.OffsetRequest.LatestTime(): 从最新的消息开始,即从队列队伍最末端开始。
(3)根据时间点:
kafkaConfig.startOffsetTime = new SimpleDateFormat("yyyy.MM.dd-HH:mm:ss").parse(startOffsetTime).getTime();
能够参阅 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的实现原理。
How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?
Kafka allows querying offsets of messages by time and it does so at segment granularity.The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.
SpoutConfigspoutConf = new SpoutConfig(brokerHosts,topic, zkRoot,id);
后2个参数不能变化
(2)对于trident而言,就是
topology.newStream(“test", kafkaSpout).
public booleanforceFromStart =false; public long startOffsetTime= kafka.api.OffsetRequest.EarliestTime();
若是将forceFromStart(旧版本是ignoreZkOffsets)设置为true,则每次拓扑从新启动时,都会从开头读取消息。
若是为false,则:
第一次启动,从开头读取,以后的重启均是从offset中读取。
通常使用时,将数值设置为以上2个便可。
若是想把结果写回kafka,并保证事务性,可使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
如下是官方说明。
Writing to Kafka as part of your topology
You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
You need to provide implementation of following 2 interfacesTupleToKafkaMapper and TridentTupleToKafkaMapper
These interfaces have 2 methods defined:
K getKeyFromTuple(Tuple/TridentTuple tuple); V getMessageFromTuple(Tuple/TridentTuple tuple);
as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.Java implementation. In the KafkaBolt, the implementation always looks for a field with field name “key” and “message” if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
KafkaTopicSelector and trident KafkaTopicSelector
This interface has only one method
publicinterface KafkaTopicSelector { StringgetTopics(Tuple/TridentTuple tuple); }
The implementation of this interface should return topic to which the tuple’s key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.
Specifying kafka producer properties
You can provide all the produce properties , seehttp://kafka.apache.org/documentation.html#producerconfigs section “Important configuration properties for the producer”, in your storm topology config by setting the properties map with key kafka.broker.properties.
附带2个官方的示例
For the bolt :
TopologyBuilder builder = new TopologyBuilder(); Fields fields = new Fields("key", "message"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); builder.setSpout("spout", spout, 5); KafkaBolt bolt = new KafkaBolt() .withTopicSelector(new DefaultTopicSelector("test")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); Config conf = new Config(); //set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
For Trident:
Fields fields = new Fields("word", "count"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); Config conf = new Config(); //set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
基本场景:订阅kafka的某个topic,而后在读取的消息前加上自定义的字符串,而后写回到kafka另一个topic。
从Kafka读取数据的Spout使用storm.kafka.KafkaSpout
,向Kafka写数据的Bolt使用storm.kafka.bolt.KafkaBolt
。中间进行进行数据处理的Bolt定义为TopicMsgBolt
。闲言少叙,奉上代码:
public class TopicMsgTopology { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381"); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology"); // 配置KafkaBolt中的kafka.broker.properties Config conf = new Config(); Properties props = new Properties(); // 配置Kafka broker地址 props.put("metadata.broker.list", "dev2_55.wfj-search:9092"); // serializer.class为消息的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); // 配置KafkaBolt生成的topic conf.put("topic", "msgTopic2"); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout"); builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt"); if (args.length == 0) { String topologyName = "kafkaTopicTopology"; LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology(topologyName); cluster.shutdown(); } else { conf.setNumWorkers(1); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } } }
storm.kafka.ZkHosts
构造方法的参数是zookeeper标准配置地址的形式(ZooKeeper环境搭建能够查看ZooKeeper安装部署),zk一、zk二、zk3在本地配置了host,由于服务器使用的伪分布式模式,所以几个端口号不是默认的2181。
storm.kafka.SpoutConfig
构造方法第一个参数为上述的storm.kafka.ZkHosts
对象,第二个为待订阅的topic名称,第三个参数zkRoot为写读取topic时的偏移量offset数据的节点(zk node),第四个参数为该节点上的次级节点名(有个地方说这个是spout的id)。
backtype.storm.Config
对象是配置storm的topology(拓扑)所须要的基础配置。
backtype.storm.spout.SchemeAsMultiScheme
的构造方法输入的参数是订阅kafka数据的处理参数,这里的MessageScheme
是自定义的,代码以下:
public class MessageScheme implements Scheme { private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class); @Override public List<Object> deserialize(byte[] ser) { try { String msg = new String(ser, "UTF-8"); logger.info("get one message is {}", msg); return new Values(msg); } catch (UnsupportedEncodingException ignored) { return null; } } @Override public Fields getOutputFields() { return new Fields("msg"); } }
MessageScheme
类中getOutputFields方法是KafkaSpout向后发送tuple(storm传输数据的最小结构)的名字,须要与接收数据的Bolt中统一(在这个例子中能够不统一,由于后面直接取第0条数据,可是在wordCount的那个例子中就须要统一了)。
TopicMsgBolt
类是从storm.kafka.KafkaSpout
接收数据的Bolt,对接收到的数据进行处理,而后向后传输给storm.kafka.bolt.KafkaBolt
。代码以下:
public class TopicMsgBolt extends BaseBasicBolt { private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class); @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = (String) input.getValue(0); String out = "Message got is '" + word + "'!"; logger.info("out={}", out); collector.emit(new Values(out)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } }
此处须要特别注意的是,要使用backtype.storm.topology.base.BaseBasicBolt对象做为父类,不然不会在zk记录偏移量offset数据。
须要编写的代码已完成,接下来就是在搭建好的storm、kafka中进行测试:
# 建立topic ./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1 ./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2
接下来须要分别对msgTopic一、msgTopic2启动producer(生产者)与consumer(消费者):
# 对msgTopic1启动producer,用于发送数据 ./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1 # 对msgTopic2启动consumer,用于查看发送数据的处理结果 ./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning
而后将打好的jar包上传到storm的nimbus(可使用远程上传或先上传jar包到nimbus节点所在服务器,而后本地执行):
# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology
待对应的worker启动好以后,就能够在msgTopic1的producer对应终端输入数据,而后在msgTopic2的consumer对应终端查看输出结果了。
有几点须要注意的:
- 必须先建立msgTopic一、msgTopic2两个topic;
- 定义的bolt必须使用BaseBasicBolt做为父类,不可以使用BaseRichBolt,不然没法记录偏移量;
- zookeeper最好使用至少三个节点的分布式模式或伪分布式模式,不然会出现一些异常状况;
- 在整个storm下,spout、bolt的id必须惟一,不然会出现异常。
TopicMsgBolt
类做为storm.kafka.bolt.KafkaBolt
前的最后一个Bolt,须要将输出数据名称定义为message,不然KafkaBolt没法接收数据。
简单的输入输出作完了,来点复杂点儿的场景:从某个topic定于消息,而后根据空格分词,统计单词数量,而后将当前输入的单词数量推送到另外一个topic。
首先规划须要用到的类:
backtype.storm.spout.Scheme
子类;SplitSentenceBolt
;WordCountBolt
;ReportBolt
;WordCountTopology
;SentenceBolt
。backtype.storm.spout.Scheme
子类可使用上面已经定义过的MessageScheme
,此处再也不赘述。
SplitSentenceBolt
是对输入数据进行分割,简单的使用String类的split方法,而后将每一个单词命名为“word”,向后传输,代码以下:
public class SplitSentenceBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getStringByField("msg"); String[] words = sentence.split(" "); Arrays.asList(words).forEach(word -> collector.emit(new Values(word))); } }
SentenceBolt
是从KafkaSpout接收数据,而后直接输出。在拓扑图上就是从输入分叉,一个进入SplitSentenceBolt
,一个进入SentenceBolt
。这种结构能够应用在Lambda架构中,代码以下:
public class SentenceBolt extends BaseBasicBolt { private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class); @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String msg = tuple.getStringByField("msg"); logger.info("get one message is {}", msg); basicOutputCollector.emit(new Values(msg)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } }
WordCountBolt
是对接收到的单词进行汇总统一,而后将单词“word”及其对应数量“count”向后传输,代码以下:
public class WordCountBolt extends BaseBasicBolt { private Map<String, Long> counts = null; @Override public void prepare(Map stormConf, TopologyContext context) { this.counts = new ConcurrentHashMap<>(); super.prepare(stormConf, context); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); collector.emit(new Values(word, count)); } }
ReportBolt是对接收到的单词及数量进行整理,拼成json格式,而后继续向后传输,代码以下:
public class ReportBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getStringByField("word"); Long count = input.getLongByField("count"); String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}"; collector.emit(new Values(reportMessage)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("message")); } }
最后是定义topology(拓扑)WordCountTopology
,代码以下:
public class WordCountTopology { private static final String KAFKA_SPOUT_ID = "kafkaSpout"; private static final String SENTENCE_BOLT_ID = "sentenceBolt"; private static final String SPLIT_BOLT_ID = "sentenceSplitBolt"; private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt"; private static final String REPORT_BOLT_ID = "reportBolt"; private static final String KAFKA_BOLT_ID = "kafkabolt"; private static final String CONSUME_TOPIC = "sentenceTopic"; private static final String PRODUCT_TOPIC = "wordCountTopic"; private static final String ZK_ROOT = "/topology/root"; private static final String ZK_ID = "wordCount"; private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka"; public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381"); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig)); builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID); builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID); builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID); builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID); Config config = new Config(); Map<String, String> map = new HashMap<>(); map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址 map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class为消息的序列化类 config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology()); Utils.sleep(100000); cluster.killTopology(DEFAULT_TOPOLOGY_NAME); cluster.shutdown(); } else { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } } }
除了上面提过应该注意的地方,此处还须要注意,storm.kafka.SpoutConfig
定义的zkRoot与id应该与第一个例子中不一样(至少保证id不一样,不然两个topology将使用一个节点记录偏移量)。
[1] storm-kafka编程指南
http://blog.csdn.net/lujinhong2/article/details/47132287
[2] kafka集群编程指南
http://blog.csdn.net/lujinhong2/article/details/47146693
[3] 关于kafka中的timestamp与offset的对应关系
http://blog.csdn.net/lujinhong2/article/details/49661309
[4] storm笔记:Storm+Kafka简单应用