一直使用apache storm-kafka的工具包去消费kafka,索性直接配置好对应的zookeeper集群的host、port,kafka集群的host、port。java
直接放到HDP-storm的环境上提交。果真直接报错查看异常发现拿不到kafka leader所在机器host、port信息。查看apache storm-kafka源码发现它是从zk的/brokers/ids/0下得到的.node
//本身维护和构建全部topic的partition对应的host与port信息,因kafka管理集群都是经过将topic、分区、副本信息写入zk //中监听更新或删除的。因此在zk中能够读取到kafka全部的状态信息 public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException { List<String> topics = getTopics();//得到全部的topic从zk的/brokers/topics下得到 List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); for (String topic : topics) { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic); try { //得到当前topic中有几个partition从zk的/brokers/topics/distributedTopic/partitions/下得到. int numPartitionsForTopic = getNumPartitions(topic); String brokerInfoPath = brokerPath();//一个broker路径默认是/brokers/ids //找到每一个partition的leader对应的host和port为后面建立consumer作准备 for (int partition = 0; partition < numPartitionsForTopic; partition++) { //得到partition的leader在zk中保存的路径,默认0、一、2....可在hdp中100一、100二、1003.... int leader = getLeaderFor(topic,partition); String path = brokerInfoPath + "/" + leader;// /brokers/ids/1001 try { //得到/brokers/ids/1001 znode的信息 byte[] brokerData = _curator.getData().forPath(path); Broker hp = getBrokerHost(brokerData);//拿到host与port //构建partition与broker对应关系简单说就是partition所在的host机器和port globalPartitionInformation.addPartition(partition, hp); } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { LOG.error("Node {} does not exist ", path); } } } catch (SocketTimeoutException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); partitions.add(globalPartitionInformation); } return partitions; }
/** * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 } * * @param contents * @return */ private Broker getBrokerHost(byte[] contents) { try { Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(contents, "UTF-8")); String host = (String) value.get("host"); Integer port = ((Long) value.get("port")).intValue(); return new Broker(host, port); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } }
但是上HDP-zookeeper中查看发现/brokers/ids/1001下面的host:null、port:-1.apache
找到获取partition映射host与port,发现无非就是在对应zk的znode上获取到他的JSONValue值拿到host与port,虽然这个/brokers/ids/1001的znode下面host:null,port:-1.不知道出于什么缘由HDP的kafka要将host与port变成null与-1,可是我发现这个endpoints中同样能够拿到很显然host : woker11-cs.zuhu2.com , port : 6667.安全
修改后使用maven打包.结果测试发现host与port确实拿到了,但是仍是没法消费.bash
google发现apache storm-kafka安全认证的API,只有hdp二次开发storm-kafka才有网络
public SimpleConsumer register(Broker host, String topic, int partition) { if (!this._connections.containsKey(host)) { this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo( new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol))); } DynamicPartitionConnections.ConnectionInfo info = (DynamicPartitionConnections.ConnectionInfo)this._connections.get(host); info.partitions.add(this.getHashKey(topic, partition)); return info.consumer; }
调用register获取SompleConsumer,hdp版本的比apache storm-kafka的多一个securityProtocol变量.这个值默认是"PLAINTEXT".并发
hdp的kafka_2.10中的SimpleConsumer默认它是开启认证的因此你在建立时只给host、port、soTimeout、bufferSize、clientId,同样会默认给securityProtocol赋值为“PLAINTEXT”socket
apache的kafka_2.10中的SimpleConsumermaven
最根本的仍是storm建立new SimpleConsumer使用的kafka_2.10 API也二次开发了.它提供了安全认证的API.以下apache kafka_2.10没有提供认证API工具
最后发现本身修改apache storm-kafka是不行了.kafka_2.10和kafka-clients都是通过二次开发了的.
因为网络缘由这些包都下载不了.我去
http://repo.hortonworks.com/content/repositories/releases/
官网去下载
并手动的install到本地maven库中.
mvn install:install-file -Dfile=F:\sougoDownload\storm-kafka-1.0.1.2.5.3.0-37.jar -DgroupId=org.apache.storm -DartifactId=storm-kafka -Dversion=1.0.1.2.5.3.0-37 -Dpackaging=jar
最后打包观察能够消费并发送数据到kafka.
注意下载的hdp版本的包必须和环境中的版本一直.hdp各个版本存在差别并不像java向后兼容.最开始我下的最新版本storm-kafka 1.1.0.3.0.1.3-1,KafkaBolt继承的BaseTickTupleAwareRichBolt可是环境的storm-core是1.0.1.2.5.3.0-37中根本没有BaseTickTupleAwareRichBolt这个类.
storm-kafka 1.0.1.2.5.3.0-37版本的KafkaBolt继承的是BaseRichBolt