Apache Griffin定位为大数据的数据质量监控工具,支持批处理数据源hive、text文件、avro文件和实时数据源kafka,而一些以关系型数据库如mysql、oracle为存储的项目也一样须要可配置化的数据质量监控工具,因此扩展griffin的mysql数据源就能够为项目的数据质量监控提供多一种选择。前端
从上一篇文章apache griffin 中已经介绍了griffin的特性、执行流程及其架构,本文主要介绍一下其代码结构及扩展数据源的简单实现,先了解一下代码结构:java
代码主要分为measure、service、ui三部分,measure为spark定时任务代码;service为spring boot代码,作web端配置和监控界面;ui为前端angular js相关代码和资源。mysql
扩展数据源主要实现代码在measure模块,下面以griffin项目中的demo读取avro数据源的批处理为实例介绍一下griffin如何读取配置和选择数据源:git
环境配置文件: env-batch.jsongithub
{ # spark 配置 "spark": { "log.level": "WARN", "config": { "spark.master": "local[*]" } }, # 对比结果输出配置,console、hdfs、elasticsearch "sinks": [ { "type": "CONSOLE", "config": { "max.log.lines": 10 } }, { "type": "HDFS", "config": { "path": "hdfs://localhost/griffin/batch/persist", "max.persist.lines": 10000, "max.lines.per.file": 10000 } }, { "type": "ELASTICSEARCH", "config": { "method": "post", "api": "http://10.148.181.248:39200/griffin/accuracy", "connection.timeout": "1m", "retry": 10 } } ], "griffin.checkpoint": [] }
数据源配置文件:config-batch.jsonweb
{ # 任务名称 "name": "accu_batch", # 任务类型,batch 或 streaming "process.type": "batch", # 数据源 和 数据对比目标 配置 "data.sources": [ { "name": "source", "baseline": true, "connectors": [ { "type": "avro", "version": "1.7", "config": { "file.name": "src/test/resources/users_info_src.avro" } } ] }, { "name": "target", "connectors": [ { "type": "avro", "version": "1.7", "config": { "file.name": "src/test/resources/users_info_target.avro" } } ] } ], # 数据校验规则,这里选择 accuracy 准确性对比 "evaluate.rule": { "rules": [ { "dsl.type": "griffin-dsl", "dq.type": "accuracy", "out.dataframe.name": "accu", "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code" } ] }, # 数据对比结果输出 控制台和es "sinks": ["CONSOLE","ELASTICSEARCH"] }
package org.apache.griffin.measure import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param} import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.launch.batch.BatchDQApp import org.apache.griffin.measure.launch.streaming.StreamingDQApp /** * application entrance */ object Application extends Loggable { def main(args: Array[String]): Unit = { info(args.toString) if (args.length < 2) { error("Usage: class <env-param> <dq-param>") sys.exit(-1) } // 配置运行参数读取 env-batch.json 和 config-batch.json val envParamFile = args(0) val dqParamFile = args(1) info(envParamFile) info(dqParamFile) // read param files val envParam = readParamFile[EnvConfig](envParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } val dqParam = readParamFile[DQConfig](dqParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } // 环境配置和数据源配置组合成 griffin配置 val allParam: GriffinConfig = GriffinConfig(envParam, dqParam) // 根据数据源配置选择数据源 // 从数据源配置 process.type 获得配置类型为 batch val procType = ProcessType(allParam.getDqConfig.getProcType) val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) case _ => error(s"${procType} is unsupported process type!") sys.exit(-4) } startup // 初始化 griffin 定时任务执行环境 // 具体代码见下个代码块,主要逻辑是建立 sparkSession 和注册griffin自定义的spark udf dqApp.init match { case Success(_) => info("process init success") case Failure(ex) => error(s"process init error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } // 执行定时任务,这里根据配置是执行批处理任务 val success = dqApp.run match { case Success(result) => info("process run result: " + (if (result) "success" else "failed")) result case Failure(ex) => error(s"process run error: ${ex.getMessage}", ex) if (dqApp.retryable) { throw ex } else { shutdown sys.exit(-5) } } // 关闭定时任务 dqApp.close match { case Success(_) => info("process end success") case Failure(ex) => error(s"process end error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } shutdown // 退出执行程序 if (!success) { sys.exit(-5) } } private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = { val paramReader = ParamReaderFactory.getParamReader(file) paramReader.readConfig[T] } private def startup(): Unit = { } private def shutdown(): Unit = { } }
批处理任务处理类spring
package org.apache.griffin.measure.launch.batch import java.util.Date import scala.util.Try import org.apache.spark.SparkConf import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.griffin.measure.configuration.dqdefinition._ import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context._ import org.apache.griffin.measure.datasource.DataSourceFactory import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent case class BatchDQApp(allParam: GriffinConfig) extends DQApp { val envParam: EnvConfig = allParam.getEnvConfig val dqParam: DQConfig = allParam.getDqConfig val sparkParam = envParam.getSparkParam val metricName = dqParam.getName // val dataSourceParams = dqParam.dataSources // val dataSourceNames = dataSourceParams.map(_.name) val sinkParams = getSinkParams var sqlContext: SQLContext = _ implicit var sparkSession: SparkSession = _ def retryable: Boolean = false // 初始化并建立sparkSession、注册griffin自定义udf def init: Try[_] = Try { // build spark 2.0+ application context val conf = new SparkConf().setAppName(metricName) conf.setAll(sparkParam.getConfig) conf.set("spark.sql.crossJoin.enabled", "true") sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel) sqlContext = sparkSession.sqlContext // register udf GriffinUDFAgent.register(sqlContext) } // 定时任务执行方法 def run: Try[Boolean] = Try { // start time val startTime = new Date().getTime val measureTime = getMeasureTime val contextId = ContextId(measureTime) // get data sources // 根据配置获取数据源,即config-batch.json的data.sources配置,读取avro文件数据,有source和target两个数据源 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources) // 数据源初始化 dataSources.foreach(_.init) // 建立griffin执行上下文 val dqContext: DQContext = DQContext( contextId, metricName, dataSources, sinkParams, BatchProcessType )(sparkSession) // 根据配置,输入结果到 console 和 elasticsearch val applicationId = sparkSession.sparkContext.applicationId dqContext.getSink().start(applicationId) // 建立数据检查对比job val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule) // 执行数据对比job,根据在web端配置的步骤执行,demo主要执行配置中的rule sql,将执行结果写入sink中 val result = dqJob.execute(dqContext) // 打印本次检查结束时间 val endTime = new Date().getTime dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms") // 关闭griffin context dqContext.clean() // 输出结束标记 dqContext.getSink().finish() result } def close: Try[_] = Try { sparkSession.close() sparkSession.stop() } }
到这里,对于measure的代码执行顺序已经作了一个简单说明,仔细看的同窗不难发现,其实执行过程并不复杂,代码逻辑的比较清晰;sql
其中,本文关注的数据建立主要在:BatchDQApp 类的 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
;数据库
咱们看下DataSourceFactory类的代码:apache
package org.apache.griffin.measure.datasource import scala.util.Success import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory} object DataSourceFactory extends Loggable { def getDataSources(sparkSession: SparkSession, ssc: StreamingContext, dataSources: Seq[DataSourceParam] ): Seq[DataSource] = { dataSources.zipWithIndex.flatMap { pair => val (param, index) = pair getDataSource(sparkSession, ssc, param, index) } } private def getDataSource(sparkSession: SparkSession, ssc: StreamingContext, dataSourceParam: DataSourceParam, index: Int ): Option[DataSource] = { val name = dataSourceParam.getName val connectorParams = dataSourceParam.getConnectors val timestampStorage = TimestampStorage() // streaming 数据缓存 val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage) // 获取数源链接 val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => // 从链接工厂获取链接 DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, timestampStorage, streamingCacheClientOpt) match { case Success(connector) => Some(connector) case _ => None } } Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt)) } }
DataConnectorFactory 数据源链接工厂
package org.apache.griffin.measure.datasource.connector import scala.util.Try import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.datasource.connector.batch._ import org.apache.griffin.measure.datasource.connector.streaming._ object DataConnectorFactory extends Loggable { val HiveRegex = """^(?i)hive$""".r val AvroRegex = """^(?i)avro$""".r val TextDirRegex = """^(?i)text-dir$""".r val KafkaRegex = """^(?i)kafka$""".r val CustomRegex = """^(?i)custom$""".r /** * create data connector * @param sparkSession spark env * @param ssc spark streaming env * @param dcParam data connector param * @param tmstCache same tmst cache in one data source * @param streamingCacheClientOpt for streaming cache * @return data connector */ def getDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): Try[DataConnector] = { val conType = dcParam.getType val version = dcParam.getVersion Try { // 数据源映射 conType match { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case KafkaRegex() => getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("connector creation error!") } } } private def getStreamingDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): StreamingDataConnector = { if (ssc == null) throw new Exception("streaming context is null!") val conType = dcParam.getType val version = dcParam.getVersion conType match { case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("streaming connector creation error!") } } // 自定义数据源标识方法 private def getCustomConnector(session: SparkSession, context: StreamingContext, param: DataConnectorParam, storage: TimestampStorage, maybeClient: Option[StreamingCacheClient]): DataConnector = { val className = param.getConfig("class").asInstanceOf[String] val cls = Class.forName(className) if (classOf[BatchDataConnector].isAssignableFrom(cls)) { val ctx = BatchDataConnectorContext(session, param, storage) val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext]) meth.invoke(null, ctx).asInstanceOf[BatchDataConnector] } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) { val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient) val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext]) meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector] } else { throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector") } } private def getKafkaDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): KafkaStreamingDataConnector = { val KeyType = "key.type" val ValueType = "value.type" val config = dcParam.getConfig val keyType = config.getOrElse(KeyType, "java.lang.String").toString val valueType = config.getOrElse(ValueType, "java.lang.String").toString (keyType, valueType) match { case ("java.lang.String", "java.lang.String") => KafkaStreamingStringDataConnector( sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("not supported type kafka data connector") } } }
看到这里,相信你们都已经知道数据源建立的方法,这里对数据源配置作一个映射,运行时获得相应的数据,demo选择avro数据源,咱们接着看看AvroBatchDataConnector的实现:
package org.apache.griffin.measure.datasource.connector.batch import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.HdfsUtil import org.apache.griffin.measure.utils.ParamUtil._ /** * batch data connector for avro file */ case class AvroBatchDataConnector(@transient sparkSession: SparkSession, dcParam: DataConnectorParam, timestampStorage: TimestampStorage ) extends BatchDataConnector { val config = dcParam.getConfig val FilePath = "file.path" val FileName = "file.name" val filePath = config.getString(FilePath, "") val fileName = config.getString(FileName, "") val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName private def pathPrefix(): Boolean = { filePath.nonEmpty } private def fileExist(): Boolean = { HdfsUtil.existPath(concreteFileFullPath) } def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load avro file ${concreteFileFullPath} fails", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } }
跟着代码能够看到 AvroBatchDataConnector 实现了 DataConnector 接口,主要实现了data 从文件获取数据的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
;
相似的,咱们看看griffin默认数据源hive的数据源实现方式:
package org.apache.griffin.measure.datasource.connector.batch import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.ParamUtil._ /** * batch data connector for hive table */ case class HiveBatchDataConnector(@transient sparkSession: SparkSession, dcParam: DataConnectorParam, timestampStorage: TimestampStorage ) extends BatchDataConnector { val config = dcParam.getConfig val Database = "database" val TableName = "table.name" val Where = "where" val database = config.getString(Database, "default") val tableName = config.getString(TableName, "") val whereString = config.getString(Where, "") val concreteTableName = s"${database}.${tableName}" val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val dtSql = dataSql info(dtSql) val df = sparkSession.sql(dtSql) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } private def tableExistsSql(): String = { // s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql s"tableName LIKE '${tableName}'" } private def metaDataSql(): String = { s"DESCRIBE ${concreteTableName}" } private def dataSql(): String = { val tableClause = s"SELECT * FROM ${concreteTableName}" if (wheres.length > 0) { val clauses = wheres.map { w => s"${tableClause} WHERE ${w}" } clauses.mkString(" UNION ALL ") } else tableClause } }
hive数据源链接的实现是否是看上去比较简单,从配置文件中获得源表、目标表和对比sql,由sparkSession.sql执行val df = sparkSession.sql(dtSql)
,返回对比结果数据; 熟悉spark的同窗看到这里,大概已经想到,扩展一个mysql数据源已经不是很难的事情了,由于spark sql支持mysql数据源。
因为各类缘由,实现代码及demo下回补上。