前面写了个cassandra-appender,一个基于cassandra的logback插件。正是cassandra的分布式数据库属性才合适做为akka-cluster-sharding分布式应用的logger。因此,cassandra-appender核心功能就是对logback消息的存写部分了。一样,基于ES的logback-appender核心部分就是对ES的存写过程了。在ES里这个过程还附带了索引indexing过程。未来对历史消息的搜索、分析会更加方便。直接看看消息存写这部分elastic4代码:java
def writeLog(event: ILoggingEvent)(client: ElasticClient, idx: String)(appName: String, ip: String, hostName: String, default: String) = { var content: List[(String,Any)] = List( APP_NAME -> appName, HOST_IP -> ip, HOST_NAME -> hostName, LOGGER_NAME -> event.getLoggerName(), LEVEL -> event.getLevel().toString, THREAD_NAME -> event.getThreadName(), LOG_DATE -> logDate, LOG_TIME -> logTime ) try { val callerData = event.getCallerData() if (callerData.nonEmpty) { content = content ++ List( CLASS_NAME -> callerData.head.getClassName(), FILE_NAME -> callerData.head.getFileName(), LINE_NUMBER -> callerData.head.getLineNumber().toString, METHOD_NAME -> callerData.head.getMethodName() ) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} try { if (event.getThrowableProxy() != null) { val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]] val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() } content = content :+ (THROWABLE_STR -> throwableStr) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} var logmsgs = event.getMessage() try { val logMap = fromJson[Map[String,String]](logmsgs) logMap.foreach ( m => content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => content = content :+ (MESSAGE -> logmsgs) try { val dftMap = fromJson[Map[String,String]](default) dftMap.foreach ( m => content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => } } val newRecord = indexInto(idx) .fields( content ).createOnly(true) client.execute(newRecord) //.await
}
能够看到,咱们先判断了一下event.getMessage()消息是不是json格式的:若是是正确的json格式,那么解析成为字段名和字段值,不然就直接写入log_msg字段 + 一串默认的字段和值。干什么呢?要知道这个elastic-appender是一个通用的logback-plugin,是能够在任何软件中使用的。由于各类软件对运行状态跟踪目标、方式的要求不一样,为了知足这些要求,那么经过用户自定义跟踪目标字段的方式应该是一个好的解决方案。从测试例子里能够理解:数据库
var loggedItems = Map[String,String]() loggedItems = loggedItems ++ Map( ("app_customer" -> "logback.com"), ("app_device" -> "9101"), ("log_msg" -> "specific message for elastic ...")) log.debug(toJson(loggedItems)) //logback.xml
<appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
<host>http://localhost</host>
<port>9200</port>
<appName>ESLoggerDemo</appName>
<defaultFieldValues>{"app_customer":"中心书城","app_device":"9013"}</defaultFieldValues>
<indexName>applog</indexName>
</appender>
上面代码里定义了app_customer,app_device,log_msg这几个自定义字段和值。这样作的意思是:logback只定义了log.info(msg)里msg一个字段。若是存放在数据库里咱们只能在msg一个字段里进行分类、查询了。但既然已经使用了数据库做为存储咱们更但愿用更多的字段来表明一条消息,如用户号,机器号,店号等等。这样跟踪起来方便不少。因此,对于内部的用户能够要求把因应特殊须要额外增长的字段-值加密成json,而后传递给ElasticAppender去处理。对于应用中引用三方软件所产生的logback-msg,咱们可没办法要求他们按照这个格式来传递消息,但仍然会存进ES,因此就用logback.xml中defaultFieldValaues定义的默认字段-值来填写这些额外的信息了。json
这一篇咱们主要讨论一下这个特别的elastic-appender,它的使用方法。那么先重复一下logback的工做原理:app
首先认识一下logback:感受须要重点了解的logging运做核心应该是消息等级level的操做。消息等级是指logback根据不一样的消息等级来筛选须要记录的消息。logback支持下面几个消息等级,按照各自记录动做覆盖面由弱到强排列,包括: TRACE -> DEBUG -> INFO -> WARN -> ERROR 分别对应记录函数 trace(msg),debug(msg),info(msg),warn(msg),error(msg) logback按消息等级进行记录筛选的规则以下: 假设记录函数为p,某个class的消息等级level为q:当p>=q时选择记录消息。换言之调用函数error(msg)时logback会记录全部等级消息,反之trace(msg)只能记录TRACE级别的消息。logback手册中以下表示: TRACE DEBUG INFO WARN ERROR OFF trace() YES NO NO NO NO NO debug() YES YES NO NO NO NO info() YES YES YES NO NO NO warn() YES YES YES YES NO NO error() YES YES YES YES YES NO logback中每一个类的默认消息等级能够按照类型继承树结构继承。当一个子类没有定义消息等级时,它继承对上父类的消息等级,即:X.Y.Z中Z的默认消息等级从Y继承。
再看看下面logback.xml例子:分布式
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<!-- path to your log file, where you want to store logs -->
<file>~/logback.log</file>
<append>false</append>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
<appName>POCServer</appName>
<defaultFieldValues>{"app_customer":"999999","app_device":"9999"}</defaultFieldValues>
<keyspaceName>applog</keyspaceName>
<columnFamily>txnlog</columnFamily>
</appender>
<appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
<host>http://localhost</host>
<port>9200</port>
<appName>ESLoggerDemo</appName>
<defaultFieldValues>{"app_customer":"中心书城","app_device":"9013"}</defaultFieldValues>
<indexName>applog</indexName>
</appender>
<logger name="com.datatech" level="info" additivity="false">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.datatech.sdp" level="info" additivity="false">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</logger>
<root level="info">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</root>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>
上面配置文件中定义了包括STDOUT,FILE,cassandraLoggeer,elasticLogger几个appender。首先,不一样level可使用不一样的appender。cassandraLogger,elasticLogger是咱们自定义的appender。在elasticLogger段落里定义了ES终端链接参数如host,port。在ElasticAppender类源码中的elastic终端链接和关闭以下:ide
override def start(): Unit = { if(! _hosts.isEmpty) { connectES() super.start() } } override def stop(): Unit = { if(optESClient.isDefined) { (optESClient.get).close() optESClient = None } super.stop() } def connectES(): Unit = { try { val url = _hosts + ":" + _port.toString val esjava = JavaClient(ElasticProperties(url)) val client = ElasticClient(esjava) optESClient = Some(client) } catch { case e: Throwable => optESClient = None } }
注意,假如host在logback.xml里定义了那么在ElasticAppender实例化时系统会自动直接链接,不然须要手工调用logger.start()来链接ES。xml文件里的属性是经过getter来获取的,以下:函数
private var _hosts: String = "" def setHost(host: String): Unit = _hosts = host def getHost : String = _hosts private var _port: Int = 9200 def setPort(port: Int): Unit = _port = port private var _idxname: String = "applog" def setIndexName(indexName: String): Unit = _idxname = indexName private var _username: String = "" def setUsername(username: String): Unit = _username = username private var _password: String = "" def setPassword(password: String): Unit = _password = password private var _defaultFieldValues: String = "" def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues
下面是ElasticAppender的使用示范:(先把logback_persist.jar放入lib目录)测试
import scala.concurrent.ExecutionContext.Implicits.global import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} import ch.qos.logback.classic.Logger import ch.qos.logback.core.{ConsoleAppender, FileAppender} import com.datatech.logback.{CassandraAppender,ElasticAppender, JsonConverter} import ch.qos.logback.classic.spi.ILoggingEvent import org.slf4j.LoggerFactory import ch.qos.logback.classic.LoggerContext import java.time._ import java.time.format._ import java.util.Locale object ElasticAppenderDemo extends App with JsonConverter { val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger] val elasticAppender = log.getAppender("elasticLogger").asInstanceOf[ElasticAppender] val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]] val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]] val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender] //stop other appenders
if (stdoutAppender != null) stdoutAppender.stop() if (fileAppender != null) fileAppender.stop() if (cassAppender != null) cassAppender.stop() //check if host not set in logback.xml
if(elasticAppender != null) { if (elasticAppender.getHost.isEmpty) { elasticAppender.setHost("http://localhost") elasticAppender.setPort(9200) elasticAppender.start() } } val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) val now = LocalDateTime.now.format(dateTimeFormatter) (1 to 100).foreach { idx => log.info(s"************this is a info message $idx ") } log.debug("***********debugging message here ..." + now) log.debug(toJson(loggedItems)) //stop the logger
val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] loggerContext.stop() }
在Appender实例化时getAppender("elasticLogger")中这个elasticLogger是xml文件中appender段落名称。若是host,port没在xml文件中定义的话能够手工用setter setHost,setPort在程序里设置。loggerContext.stop()一次性关闭全部appender,包括它们链接的数据库。也能够用elasticAppender.stop()来关闭独立的appender。ui
咱们能够用elastic4自定义一个表结构mapping, 以下:this
val esjava = JavaClient(ElasticProperties("http://localhost:9200")) val client = ElasticClient(esjava) //删除索引
val rspExists = client.execute(indexExists("applog")).await
if (rspExists.result.exists) client.execute(deleteIndex("applog")).await
//构建索引
val idxCreate = client.execute(createIndex("applog") .shards(1).replicas(1)).await
//建立表结构
if(idxCreate.isSuccess) { val applogMapping = client.execute( putMapping("applog").fields( textField("class_name"), textField("file_name"), ipField("host_ip"), textField("host_name"), keywordField("level"), keywordField("line_number"), keywordField("logger_name"), keywordField("method_name"), keywordField("thread_name"), textField("throwable_str_rep"), dateField("log_date").format("basic_date").ignoreMalformed(true), dateField("log_time").format("basic_date_time").ignoreMalformed(true), textField("log_msg"), keywordField("app_name"), keywordField("app_customer"), keywordField("app_device") )).await
if(applogMapping.isSuccess) println(s"mapping successfully created.") else println(s"mapping creation error: ${applogMapping.error.reason}") } else { println(s"index creation error: ${idxCreate.error.reason}") } client.close()
依赖引用在build.sbt里:
name := "logback-persist-demo" version := "0.1" scalaVersion := "2.12.9" val elastic4sVersion = "7.6.0" libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, // for the default http client "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion, "ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "2.0.0-M1", "org.json4s" %% "json4s-native" % "3.6.1", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )