最近由于部门须要将服务程序的各类日志发送给 Kafka 进行分析,因此写一个 Kafka 消息日志操做类,主要用来保存日志到 Kafka 以便查询。java
1、pom.xml 增长配置web
<!-- HH: 引入 kafka 模块 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.0.0.RELEASE</version> </dependency> <!-- HH: 引入 fastjson 模块 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.43</version> </dependency>
2、appication.yml 配置算法
server: port: 8081 spring: application: name: HAVENT-SPRING-BOOT-DEMO kafka: producer: bootstrap-servers: IP地址1:9092,IP地址2:9092,IP地址3:9092 template: topic: mobile-service logging: level: root: info
3、com.havent.logger.config.KafkaConfiguration 配置文件spring
package com.havent.logger.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConfiguration { @Value("${spring.kafka.producer.bootstrap-servers}") private String serverAddress; public Map<String, Object> producerConfigs() { System.out.println("HH > serverAddress: " + serverAddress); Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress); // 若是请求失败,生产者会自动重试,咱们指定是0次,若是启用重试,则会有重复消息的可能性 props.put(ProducerConfig.RETRIES_CONFIG, 1); // Request发送请求,即Batch批处理,以减小请求次数,该值即为每次批处理的大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); /** * 这将指示生产者发送请求以前等待一段时间,但愿更多的消息填补到未满的批中。这相似于TCP的算法,例如上面的代码段, * 可能100条消息在一个请求发送,由于咱们设置了linger(逗留)时间为1毫秒,而后,若是咱们没有填满缓冲区, * 这个设置将增长1毫秒的延迟请求以等待更多的消息。 须要注意的是,在高负载下,相近的时间通常也会组成批,即便是 * linger.ms=0。在不处于高负载的状况下,若是设置比0大,以少许的延迟代价换取更少的,更有效的请求。 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); /** * 控制生产者可用的缓存总量,若是消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。 * 当缓存空间耗尽,其余发送调用将被阻塞,阻塞时间的阈值经过max.block.ms设定, 以后它将抛出一个TimeoutException。 */ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
4、com.havent.logger.request.LoggerMessageReq 文件sql
package com.havent.logger.request; import java.sql.Timestamp; public class LoggerMessageReq { private String appName; private Object message; private String loggerType; private String loggerLevel; private Timestamp timestamp; public LoggerMessageReq(String appName, Object message) { this.appName = appName; this.message = message; this.timestamp = new Timestamp(System.currentTimeMillis()); } public String getAppName() { return appName; } public Object getMessage() { return message; } public void setMsg(Object message) { this.message = message; } public String getLoggerLevel() { return loggerLevel; } public void setLoggerLevel(String loggerLevel) { this.loggerLevel = loggerLevel; } public String getLoggerType() { return loggerType; } public void setLoggerType(String loggerType) { this.loggerType = loggerType; } public Timestamp getTimestamp() { return timestamp; } public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; } }
5、com.havent.logger.service.KafkaService 服务文件apache
package com.havent.logger.service; import com.alibaba.fastjson.JSON; import com.havent.demo.logger.request.LoggerMessageReq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; /** * kafka 消息推送服务类 * @author havent.liu */ @Component public class KafkaService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${spring.kafka.template.topic}") private String topic; @Value("${spring.application.name}") private String appName; public void trace(Object msg, String loggerType){ this.sendMessage("trace", loggerType, msg); } public void trace(Object msg){ this.sendMessage("trace", "", msg); } public void debug(Object msg, String loggerType){ this.sendMessage("debug", loggerType, msg); } public void debug(Object msg){ this.sendMessage("debug", "", msg); } public void info(Object msg, String loggerType) { this.sendMessage("info", loggerType, msg); } public void info(Object msg) { this.sendMessage("info", "", msg); } public void warn(Object msg, String loggerType){ this.sendMessage("warn", loggerType, msg); } public void warn(Object msg){ this.sendMessage("warn", "", msg); } public void error(Object msg, String loggerType){ this.sendMessage("error", loggerType, msg); } public void error(Object msg){ this.sendMessage("error", "", msg); } private void sendMessage(String loggerLevel, String loggerType, Object msg) { LoggerMessageReq loggerMessage = new LoggerMessageReq(appName, msg); loggerMessage.setLoggerLevel(loggerLevel); loggerMessage.setLoggerType(loggerType); String message = JSON.toJSONString(loggerMessage); this.sendMessage(message); } /** * 发送消息到 kafka */ private void sendMessage(String message) { ListenableFuture future = kafkaTemplate.send(topic, message); future.addCallback(o -> System.out.println("kafka > 消息发送成功:" + message), throwable -> System.out.println("kafka > 消息发送失败:" + message)); } }
6、com.havent.controller.HelloController 调用示例json
package com.havent.controller; import com.alibaba.fastjson.JSON; import com.havent.demo.logger.request.LoggerMessageReq; import com.havent.demo.logger.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaService logger; @RequestMapping("/") public String index() { logger.info("test info"); logger.trace(this.getClass()); logger.warn(new LoggerMessageReq("testApp", "test message")); logger.error(JSON.parse("{id:111,name:'test',content:'something wrong!'}")); return "Hello World"; } }
执行效果:bootstrap