title: 自定义log4j2发送日志到Kafkasql
tags: log4j2,kafkaapache
为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知。作了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶忙看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,若是kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上作了一些修改。bootstrap
log4j日志工做流程架构
log4j2对于log4j在性能上有着显著的提高,这点官方上已经有了明确的说明和测试,因此很少赘述。在为了更熟练的使用,仍是有必要了解其内部的工做流程。这是 官网 log4j的一张类图并发
Applications using the Log4j 2 API will request a Logger with a specific name from the LogManager. The LogManager will locate the appropriate LoggerContext and then obtain the Logger from it. If the Logger must be created it will be associated with the LoggerConfig that contains either a) the same name as the Logger, b) the name of a parent package, or c) the root LoggerConfig. LoggerConfig objects are created from Logger declarations in the configuration. The LoggerConfig is associated with the Appenders that actually deliver the LogEvents.app
官网已经解释他们之间的关系了,这里再也不对每一个类的功能和做用作具体介绍,今天的重点是 Appender 类,由于他将决定将日志输出至何方。socket
The ability to selectively enable or disable logging requests based on their logger is only part of the picture. Log4j allows logging requests to print to multiple destinations. In log4j speak, an output destination is called an Appender. Currently, appenders exist for the console, files, remote socket servers, Apache Flume, JMS, remote UNIX Syslog daemons, and various database APIs. See the section on Appenders for more details on the various types available. More than one Appender can be attached to a Logger.分布式
核心配置ide
上图是log4j2发送日志到kafka的核心类,其实最主要的 KafkaAppender ,其余的几个类是链接 kafka 服务的。高并发
@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true) public final class KafkaAppender extends AbstractAppender { /** * */ private static final long serialVersionUID = 1L; @PluginFactory public static KafkaAppender createAppender( @PluginElement("Layout") final Layout<? extends Serializable> layout, @PluginElement("Filter") final Filter filter, @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name, @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions, @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic, @PluginElement("Properties") final Property[] properties) { final KafkaManager kafkaManager = new KafkaManager(name, topic, properties); return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); } private final KafkaManager manager; private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) { super(name, filter, layout, ignoreExceptions); this.manager = manager; } @Override public void append(final LogEvent event) { if (event.getLoggerName().startsWith("org.apache.kafka")) { LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); } else { try { if (getLayout() != null) { manager.send(getLayout().toByteArray(event)); } else { manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8)); } } catch (final Exception e) { LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); } } } @Override public void start() { super.start(); manager.startup(); } @Override public void stop() { super.stop(); manager.release(); }
<?xml version="1.0" encoding="UTF-8"?> ... <Appenders> <Kafka name="Kafka" topic="log-test"> <PatternLayout pattern="%date %message"/> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> </Appenders> <Loggers> <Root level="DEBUG"> <AppenderRef ref="Kafka"/> </Root> <Logger name="org.apache.kafka" level="INFO" /> <!-- avoid recursive logging --> </Loggers>
其中 @Plugin 的name属性对应的xml配置文件里面Kafka标签,固然这个也能够自定义。与此同时,也须要将 @Plugin 的name属性改成MyKafka。以下配置:
<MyKafka name="Kafka" topic="log-test">
自定义配置
有时候咱们会用到的属性因为默认的 KafkaAppender 不必定支持,因此须要必定程度的改写。可是改写也比较方便,只须要从构造器的 Properties kafkaProps 属性中取值便可。为了知足项目要求,我这边定义了platform和serviceName两个属性。
经过 KafkaAppender 的源码可知,他发送消息采起的是同步阻塞的方式。通过测试,一旦kafka服务挂掉,那么将会影响项目服务正常的日志输出,而这不是我但愿看到的,因此我对他作了必定的程度的修改。
feature::
全部的消息都会被输出至本地文件。
欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!