再看 Kafka Lag

在《Kafka的Lag计算误区及正确实现》一文中说起了kafka.admin.ConsumerGroupCommand.PartitionAssignmentState没法被外部访问,故要将PartitionAssignmentState前的protected修饰符去掉java

能够直接将describeGroup返回的结果转换成JSON而后传至监控页面(supported by YANGliiN oba)。代码以下:git

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions options =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(options);
ObjectMapper mapper = new ObjectMapper();
//1. 使用jackson-module-scala_2.12
mapper.registerModule(new DefaultScalaModule());
//2. 反序列化时忽略对象不存在的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//3. 将Scala对象序列化成JSON字符串
String source = mapper.writeValueAsString(kafkaConsumerGroupService.describeGroup()._2.get());
复制代码

这里须要采用的是jackson-module-scala的包实现,若是直接用普通的JSON序列化方式那么会达不到想要的效果,jackson以及jackson-module-scala对应的Maven库以下:github

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.4</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.12</artifactId>
    <version>2.9.5</version>
</dependency>
复制代码

注意若是本地安装的Scala版本与所配置的jackson-module-scala版本不一致的话会报出一些异常。发散一下思惟:既然能够序列化为JSON,那么彻底能够经过JSON再反序列化会对象,只不过经过JSON做为中间媒介,将访问受限的Scala对象转变为Java对象,上面剩余代码以下:编程

//4. 将JSON字符串反序列化成Java对象
List<PartitionAssignmentState> target = mapper.readValue(source,
        getCollectionType(mapper,List.class,PartitionAssignmentState.class));
//5. 排序
target.sort((o1, o2) -> o1.getPartition() - o2.getPartition());
//6. 打印
printPasList(target);
复制代码

如此就能够达到与前面几篇文章中关于获取消费者详情功能一样的效果。这里有两个注意要点:bootstrap

  1. PartitionAssignmentState中的coordinator是Node类型,这个类型须要自定义,Kafka原生的会报错。
  2. 反序列化时Node会有一个empty的属性不识别,解决方案参考代码中的步骤2.

代码更多细节请参考:代码bash

经过JSON的序列化和反序列化操做实现了本来不能为之的事情,那么思惟再发散一下,也能够序列化成字节流,好比经过ByteBuffer进行转换,只不过编程逻辑变得复杂了。app

上面这段陈述有可能会让人以为Scala与Java之间的互操做起来不容易,其实否则,上面这段陈述只是用来补充一下如何获取消费者详情的另外一种方法,Scala与Java之间的互操做仍是比较简单的,通常状况下均可以直接使用对方的类。对于集合而言,Scala中还有用于Scala与Java集合的互转的scala.collection.JavaConverters(scala2.8.1开始引入),与此雷同的scala.collection.JavaConversions已被标注为@Deprecated(since 2.12.0)。在scala代码中若是须要集合转换,首先引入scala.collection.JavaConverters._,进而显示调用asJava或者asScala方法完成转型。关于Scala与Java集合互转的介绍会在下一篇文章中呈现。post

相关文章
相关标签/搜索