前言
最近业务开发部门给咱们部门提了一个需求,由于他们开发环境和测试环境共用一套kafka,他们但愿咱们部门能帮他们实现自动给kafka的topic加上环境前缀,好比开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我内心是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,做为小罗罗也只能接了java
实现思路
一、生产者端
能够经过生产者拦截器,来给topic加前缀git
二、实现步骤
a、编写一个生产者拦截器github
@Slf4j public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> { /** * 运行在用户主线程中,在消息被序列化以前调用 * @param record * @return */ @Override public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) { log.info("原始topic:{}",record.topic()); return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(), record.partition(),record.timestamp(),record.key(), record.value()); } /** * 在消息被应答以前或者消息发送失败时调用,一般在producer回调逻辑触发以前,运行在produer的io线程中 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { log.info("实际topic:{}",metadata.topic()); } /** * 清理工做 */ @Override public void close() { } /** * 初始化工做 * @param configs */ @Override public void configure(Map<String, ?> configs) { }
b、配置拦截器spring
kafka: producer: # 生产者拦截器配置 properties: interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
c、测试springboot
二、消费者端
这个就稍微有点难搞了,由于业务开发部门他们是直接用@KafkaListener的注解,形以下ide
@KafkaListener(id = "msgId",topics = {Constant.TOPIC})
像这种也没啥好的办法,就只能经过源码了,经过源码能够发如今以下地方工具
KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
会把@KafkaListener的值赋值给消费者,若是对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操做,既然咱们知道@KafkaListener会在bean初始化后再进行赋值,那咱们就能够在bean初始化前,修改掉@KafkaListener的值。具体实现以下post
@Component public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor { @SneakyThrows @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { List<String> packageNames = AutoConfigurationPackages.get(beanFactory); for (String packageName : packageNames) { Reflections reflections = new Reflections(new ConfigurationBuilder() .forPackages(packageName) // 指定路径URL .addScanners(new SubTypesScanner()) // 添加子类扫描工具 .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具 .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具 .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具 ); Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class); if(!CollectionUtils.isEmpty(methodSet)){ for (Method method : methodSet) { KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class); changeTopics(kafkaListener); } } } } private void changeTopics(KafkaListener kafkaListener) throws Exception{ InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener); Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues"); memberValuesField.setAccessible(true); Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler); String[] topics = (String[])memberValues.get("topics"); System.out.println("修改前topics:" + Lists.newArrayList(topics)); for (int i = 0; i < topics.length; i++) { topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i]; } memberValues.put("topics", topics); System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics())); } }
测试测试
总结
虽然实现了动态修改topic,但我仍是以为topic不要随便改变,有条件的话,kafka仍是得基于物理环境隔离,其次真的客观条件不容许,要动态变动topic,则需作好topic动态变动宣导以及相关wiki的编写,否则很容易掉坑ui
demo连接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume