本质上来讲,console consumer启动时会建立一个KafkaStream(能够简单翻译成Kafak流),该stream会不停地等待可消费的新消息——具体作法就是经过LinkedBlockingQueue阻塞队列来实现,后续会有详细描述。针对上面启动的顺序列表,咱们在ConsoleConsumer.scala中逐一进行代码走读:shell
1 // REQUIRED表示这是一个必需要指定的参数
2 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
3 "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String])
2. 生成group.id
1 // 若是没有显式指定group.id,那么代码就本身合成一个 2 // 具体格式: console-consumer-[10万之内的一个随机数] 3 // 10万是一个很大的数,所以只有很是低的概率会碰到多个console consumer的group id相同的状况
4 if(!consumerProps.containsKey("group.id")) { 5 consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) 6 groupIdPassed=false
7 }
3. 建立ConsumerConfig对象封装配置
肯定了consumer的group.id以后console consumer须要把传入参数封装进ConsumerConfig类中并把后者传给Consumer的create方法以构造一个ConsumerConnector——即初始化consumer了,具体逻辑见下面的代码:apache
1 val config = new ConsumerConfig(consumerProps) // 封装ConsumerConfig配置类
2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
4. 建立默认的消息格式化类,其定义的writeTo方法会默认将消息输出到控制台
1 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) // 建立消息格式类,用于最后的输出显示
2 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) 3 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
5. 建立ZookeeperConsumerConnector
ZookeeperConsumerConnector很是重要,它实现了ConsumerConnector接口(该接口定义了建立KafkaStream和提交位移的操做,如createMessageStreams、commitOffsets等)。Kakfa官网把这个接口称为high level的consumer API。对于大多数consumer来讲,这个high level的consumer API提供的功能已经足够了。不过不少用户可能须要对位移有更大的控制,这个时候Kafka推荐用户使用被称为low level的consumer API—— SimpleConsumer。你们参考这篇文章来深刻学习high level API的用法。目前为止,咱们只须要知道Kafka经过下面的语句构建了ConsumerConnector这个consumer的核心接口:
1 val connector = Consumer.create(config) // 建立ConsumerConnector,Consumer核心接口
1 val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) 2 val iter = if(maxMessages >= 0) 3 stream.slice(0, maxMessages) 4 else
5 stream
1 for(messageAndTopic <- iter) { 2 try { 3 formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) // 输出到控制台
4 numMessages += 1
5 } catch { ... } 6 ... 7 }
好了,至此咱们按照启动顺序概述了console consumer启动时的各个阶段。不过,ZookeeperConsumerConnector和建立和迭代器的实现咱们并未详细展开,这部份内容将做为后面续篇的内容呈现给你们。敬请期待!缓存