计算引擎总体设计实例

概述:以数据为中心,覆盖数据的采集到可视化展示整个流程,提供流式和离线两种实时计算方式加工处理数据,针对加工后的数据提供丰富的可视化展示,也能够自行定制个性化的数据展示。html

总体方案        
      java

流程:1,数据采集 ---→  2,数据预处理 ---→  3,数据实时聚合(单批数据)---→  4,数据存储 ---→  5,数据查询 —→  6,数据聚合 ---→  7,数据展示react

主要模块:sql

元数据配置(采集项、统计项、报表):MetaData Service,WebApp express

数据采集/加工/存储(1,数据采集、2,数据预处理、3,数据实时聚合、4,数据存储)-----流式实时计算:Spark Stream/SQL,MetaData Service,ExpressionParserapache

可视化包含(5,数据查询、6,数据聚合、7,数据展示)-----离线实时计算 :Report Service,MetaData Service,WebAPPjson

日志:Log Serviceapi

监控:Monitor Serviceapp

相关技术:Scala、Spark、Akka、Spray、Slick、Kafka、AngularJS、React、Cassandra、MySQL、Redis框架

 

WebAPI:REST API

使用Spray.can构建的HttpServer,底层基于Akaka的ActorSystem,启动ActorSystem时实例化了ApiRouterActor,外部经过http访问api时,由ApiRouterActor路由到不一样的Actor,进而调用不一样的Service

 

     ApiRouterActor:api路由actor,经过路由规则分发url请求到不一样的Actor

           SparkActor:负责启动SparkStramActor,并经过同步消息跟踪其状态、查询实时数据

SparkStreamActor:流式计算Actor,从Kafka实时分批次接收数据,通过预处理后进行实时聚合计算加工,在此过程当中会把预处理后的数据和计算后的发送到Kafka,后续的存储服务会从Kafka接收数据并保存到Cassandra

          ReportActor:报表Actor,响应可视化报表的请求,经过Storage Service检索数据,启动SparkOfflineActor,并经过同步消息跟踪其状态,把计算后的结果返回给可视化报表

 SparkOfflineActor:离线计算Actor,从ReportActor接收查询数据根据可视化报表的配置进行实时聚合计算加工,计算后的结果经过ReportActor返回给可视化报表

ApiRouterActor

object ApiRouterActor {
   
  val routes = Map(
    "inputdto" -> "cn.xiaoneng.actors.routes.InputDtoRouteActor",
    "dashboarddto" -> "cn.xiaoneng.actors.routes.DashboardDtoRouteActor",
    "dtoproperty" -> "cn.xiaoneng.actors.routes.Dto_PropertyRouteActor",
    "statisdto" -> "cn.xiaoneng.actors.routes.StatisDtoRouteActor",
    "eventgroup" -> "cn.xiaoneng.actors.routes.EventGroupRouteActor",
    "spark"->"cn.xiaoneng.actors.routes.SparkActor",
    "report"->"cn.xiaoneng.actors.routes.ReportActor")
  def instance: Props =  Props(new ApiRouterActor(routes))
}
 
class ApiRouterActor(routes: Map[String,String]) extends HttpServiceActor
  with ActorLogging {
 
  override def  preStart = {
    for ((key, value) <- ApiRouterActor.routes) actorRefFactory.actorOf(Props(Class.forName(value).asInstanceOf[Class[Actor]]), key)
  }
 
  def receive = runRoute {
    compressResponseIfRequested() {
      pathPrefix("api") {
        respondWithHeader(RawHeader("Access-Control-Allow-Origin", "*")) {
          (options) {
            respondWithHeaders(List(RawHeader("Access-Control-Allow-Methods", "POST, GET, DELETE, PUT, PATCH, OPTIONS"), RawHeader("Access-Control-Allow-Headers", "Content-Type, Authorization, Accept,X-Requested-With"))) {
              complete(StatusCodes.NoContent)
            }
          } ~
          pathPrefix("person") { ctx => actorRefFactory.actorSelection("person") ! ctx } ~
          pathPrefix("order") { ctx => actorRefFactory.actorSelection("order") ! ctx } ~
          pathPrefix("inputdto") { ctx => actorRefFactory.actorSelection("inputdto") ! ctx } ~
          pathPrefix("dashboarddto") { ctx => actorRefFactory.actorSelection("dashboarddto") ! ctx } ~
          pathPrefix("dtoproperty") { ctx => actorRefFactory.actorSelection("dtoproperty") ! ctx } ~
          pathPrefix("statisdto") { ctx => actorRefFactory.actorSelection("statisdto") ! ctx }~
          pathPrefix("spark") { ctx => actorRefFactory.actorSelection("spark") ! ctx }~
          pathPrefix("report") { ctx => actorRefFactory.actorSelection("report") ! ctx }~
          pathPrefix("eventgroup") { ctx => actorRefFactory.actorSelection("eventgroup") ! ctx }
        }
      } ~
        getFromResourceDirectory("app")
    }
 
  }
 
}


SparkActor

class SparkActor extends HttpServiceActor with SprayJsonSupport {
  var sparkStreamActor = None: Option[ActorRef]
 
  def receive = runRoute {
    path(Segment) { (actionName: String) =>
        actionName match {
          case "start_stream" =>
            sparkStreamActor = Some(actorRefFactory.actorOf(Props(new SparkStreamActor()),"stream"))
            sparkStreamActor.get ! "start"
            complete { HttpResponse(OK, "{\"state\":\"starting\"}") }
          case "stop_stream" =>
            if(!sparkStreamActor.isEmpty) sparkStreamActor.get ! "stop"
            complete { HttpResponse(OK, "{\"state\":\"stoping\"}") }
          case x :String => {
            complete { HttpResponse(OK, "{\"result\":\"unknowed command\"}") }
          }
        }
    }
  }
}
SparkStreamActor

class SparkStreamActor extends Actor {
 
  val ssc = innerSparkStream.init
 
  def receive = {
    case "start" => {
      ssc.start()
      sender() ! "starting"
    }
    case "stop" => {
      ssc.stop(true, true)
      sender() ! "stoping"
    }
    case "state" => sender() ! ssc.getState()
    case "running_metrics" => sender() ! innerSparkStream.statisResultMap
    case _ =>
  }
 
}

innerSparkStream
object innerSparkStream extends  Serializable {
  def init(): StreamingContext = {
    val sparkConf = new SparkConf().setMaster("spark://spark129:7077").setAppName("streaming-query").set("spark.driver.host", "192.168.30.172")
    val sc = new SparkContext(sparkConf)
  
    val topics = Set("test")
    val brokers = "spark129:9092,spark130:9092,spark131:9092"
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "org.apache.spark.serializer.KryoSerializer")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
  
    val sparkSqlContext = new SQLContext(sc)
    import sparkSqlContext.implicits._
    kafkaStream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        val jsonDF = sparkSqlContext.read.json(rdd)
        mapGlobalStatis.foreach(item => {
          var df = jsonDF.filter(jsonDF("name") isin (item._2.event_source.split(","): _*))
          if (df.count() > 0) {
            df = preProcess(df, item._2, secondInputResultMap, sparkSqlContext)
 
            if (df.count() > 0) {
 
              println("加工后的数据:")
              df.cache()
              df.show()
 
              //发送到kafka进行二次处理
              df.toJSON.foreach(row => MyKafkaProducerSingleton.send(new KeyedMessage[String, String]("testspark", row)))
 
              //计算引擎继续后续处理(groupExp)
              var colDimensions = item._2.properties.filter(p => !p.propertyType.isEmpty && p.propertyType.get == 1).map(_.name)
              colDimensions = Array.concat(Array("time_key"), colDimensions)
              val colMetrics = item._2.properties.filter(p => !p.propertyType.isEmpty && p.propertyType.get == 2).map(p => {
                p.groupSumExpresion match {
                  case Some("sum") => sum(p.name).alias(p.name)
                  case Some("max") => max(p.name).alias(p.name)
                  case Some("min") => min(p.name).alias(p.name)
                  case Some("avg") => avg(p.name).alias(p.name)
                  case _ => col(p.name)
                }
              })
              val getTimeKeyStr = udf((event_time: Long, format: String) => getGroupType(event_time, format))
              df = df.withColumn("time_key", getTimeKeyStr($"event_time", lit(item._2.properties.filter(_.name == "event_time").head.groupType.getOrElse(""))))
              df = df.groupBy(colDimensions.head, colDimensions.tail: _*).agg(colMetrics.head, colMetrics.tail: _*)
              df.cache()
              df.show()
 
              //发送到kafka进行序列化存储
              df.withColumn("name", lit(item._1)).toJSON.collect().foreach(row => MyKafkaProducerSingleton.send(new KeyedMessage[String, String]("computed_data", row)))
 
              println("计算后的数据:")
              if (statisResultMap.contains(item._1)) {
                val oldresultDF = statisResultMap.apply(item._1)
                df = oldresultDF.unionAll(df) //.groupBy(groupByStr: _*).agg(sum("num").alias("num"))
                df.cache()
                statisResultMap.update(item._1, df)
              } else {
                //df.cache()
                statisResultMap.put(item._1, df)
              }
              df.show()
            }
          }
        })
 
      } else {
        println("no data")
      }
    })
 
    ssc
  }
  
  //采集数据根据业务逻辑进行预处理(outputExp)
  def preProcess(jsonDF: DataFrame, input: InputDtoNew, secondInputResultMap: mutable.HashMap[String, DataFrame], sqlContext: SQLContext): DataFrame = {
 
    //从当前采集事件中提取数据并进行计算
    var df = jsonDF
    df.show()
 
    val fields = input.properties.map(prop => StructField(prop.name, getDataType(prop.`type`), true))
    var st = StructType.apply(fields)
 
    val rddRow = df.rdd.map(row => extractData(row, input))
    df = sqlContext.createDataFrame(rddRow, st)
    df.cache()
    df.show()
 
    //追加当前采集事件的处理结果到内存中未完成的状态事件集合中为后续合并数据做准备
    val mapKey = input.name
    if (secondInputResultMap.contains(mapKey)) {
      val oldresultDF = secondInputResultMap.apply(mapKey)
      df = oldresultDF.union(df)
      df.cache()
      df.show()
    } else {
      secondInputResultMap.put(mapKey, df)
    }
 
    //合并多个采集事件生成新的数据,多了一列cnt用来标识事件状态是否结束
    df = mergeExtractedData(df, input)
    df.cache()
    df.show()
 
    //过滤出未完成的状态事件保存到内存中参与下一批数据处理
    import sqlContext.implicits._
    val eventCount = input.event_source.split(",").length
    println("未完成的状态事件:")
    val dfEventCurrentState = df.where($"cnt" < eventCount).select(input.properties.map(p => col(p.name)).toArray: _*)
    dfEventCurrentState.cache()
    secondInputResultMap.update(mapKey, dfEventCurrentState)
    dfEventCurrentState.show()
 
    //过滤出已完成的状态事件经过触发beforecompute事件进行二次加工
    //println("已完成的状态事件:")
    df = df.where($"cnt" >= eventCount)
    val tmpRDDRow = df.rdd.map(row => computeExtractedData(row, input))
    df = sqlContext.createDataFrame(tmpRDDRow, st)
    //df.cache()
    //df.show()
 
    df
  }
}

ReportActor

class ReportActor extends HttpServiceActor with SprayJsonSupport with  SyncMessage {
 
  import cn.xiaoneng.report.model.reportJsonProtocol._
 
  private val svc = ReportService
 
  var sparkOfflineActors = ArrayBuffer[ActorRef]()
 
  def receive = runRoute {
    path(Segment) { (actionName: String) =>
      actionName match {
        case "running_metrics" =>
          val sparkStreamActor = actorRefFactory.actorSelection("../spark/stream")
          onComplete(send(actionName, sparkStreamActor)) {
            case Success(value) => {
              val result = value match {
                case map: scala.collection.mutable.HashMap[String, org.apache.spark.sql.DataFrame] => {
                  val sb = new StringBuilder()
                  sb.append("{")
                  map.foreach(kv => {
                    sb.append("\"" + kv._1 + "\":[")
                    sb.append(kv._2.toJSON.collect().mkString(","))
                    sb.append("],")
                  })
                  if (map.size > 0) sb.length -= 1
                  sb.append("}")
                  sb.toString()
                }
                case _ => value.toString
              }
              complete(HttpResponse(OK, result))
            }
            case Failure(ex) => complete(StatusCodes.ExpectationFailed)
          }
        case "stream_state" =>
          val sparkStreamActor = actorRefFactory.actorSelection("../spark/stream")
          onComplete(send("state", sparkStreamActor)) {
            case Success(value) => complete(HttpResponse(OK, "{\"state\":\"" + value.toString + "\"}"))
            case Failure(ex) => complete(StatusCodes.ExpectationFailed)
          }
        case "second_input_data" => complete(svc.getSecondInputData)
        case "grid" => complete(svc.grid)
        case "chart" => complete(svc.chart())
        case "grid_spark" => {
          val actorid = "offline" + sparkOfflineActors.length.toString
          val q = new Query(new QueryItem("name", "String", "totalchat", "="))
          val actor = actorRefFactory.actorOf(Props(new SparkOfflineActor()), actorid)
          sparkOfflineActors += actor
          onComplete(query(q, actor)) {
            case Success(value) => complete(HttpResponse(OK, "{\"state\":\"" + value.toString + "\", \"id\":\"" + actorid + "\"}"))
            case Failure(ex) => complete(StatusCodes.ExpectationFailed)
          }
          //sparkOfflineActors -= actor;
        }
        case "chart_spark" => complete(svc.chart_spark())
        case actorid: String if actorid.startsWith("offline") => {
          //println(actorid)
          val actor = actorRefFactory.actorSelection(actorid)
          onComplete(send("query", actor)) {
            case Success(value) => {
              value match {
                case msg: String => complete(HttpResponse(OK, "{\"state\":\"" + msg.toString + "\"}"))
                case rpt: ReportBase => {
                  import cn.xiaoneng.report.model.reportJsonProtocol._
                  sparkOfflineActors.remove(sparkOfflineActors.indexWhere(_.path.name.endsWith(actorid)))
                  complete(HttpResponse(OK, "{\"state\":\"ok\",\"result\":" + rpt.toJson.compactPrint + "}"))
                }
                case _ => complete(HttpResponse(OK, "{\"state\":\"" + value.toString + "\", \"id\":\"" + sparkOfflineActors.length + "\"}"))
              }
            }
            case Failure(ex) => complete(StatusCodes.ExpectationFailed)
          }
        }
        case x: String => complete(HttpResponse(OK, s"actionName: $x"))
      }
    }
  }
}
 
trait SyncMessage {
  def send(msg: String, actorSel: ActorSelection): Future[Any] = {
 
    /**
      * 以ask模式来向一个actor发送消息时,须要一个 implicit `akka.util.Timeout`
      **/
    implicit val timeout = Timeout(20 seconds)
    if (actorSel == null) {
      Future {
        msg
      }
    } else {
      actorSel.ask(msg)
    }
  }
 
  def query(q: Query, actorRef: ActorRef): Future[Any] = {
    implicit val timeout = Timeout(8 seconds)
    if (actorRef == null) {
      Future {
        q
      }
    } else {
      actorRef.ask(q)
    }
  }
}

SparkOfflineActor

class SparkOfflineActor extends Actor with SparkConfTrait {
 
  val timeout = 1 * 1000
  var status = "inited";
  var result: Option[ReportBase] = None
 
  def receive = {
    case q: Query => {
      status = "starting"
      Future {
        getData(q)
      }
      sender() ! status
    }
    case cmd: String => {
      var m = 0
      while (m < 5) {
        if (status == "executed") {
          m = 5
        } else {
          Thread.sleep(timeout)
        }
        m = m + 1
      }
      val msg = if (m == 5) status else result.get
      sender() ! msg
    }
    case _ =>
  }
 
  def getData(q: Query) {
 
    val reportBase = ReportService.grid_spark(q)
    val pos = reportBase.columns.indexWhere(_.name == "chatnumber")
 
    //spark计算查询出来的数据
    val sc = SparkContext.getOrCreate(sparkConf)
 
    status = "executing"
 
    import cn.xiaoneng.report.model.reportJsonProtocol._
    try {
      val colMTC = reportBase.columns.filter(_.columnType == "mtc")
      var colDMS = reportBase.columns.filter(_.columnType == "dms")
      var colIndexDMS = colDMS.map(_.index)
 
      val rdd = sc.parallelize(reportBase.rows)
 
      var tmpRDD = rdd
      colDMS.length - 1 to 0 by -1 map(i=> {
        val colIndex = colDMS.filter(_.index<=i).map(_.index)
        println("colIndex last= "+colIndex.last)
        tmpRDD = tmpRDD.groupBy(row => row.cells.filter(cell => colIndex.contains(cell.index)).mkString(",")).map(kv => {
          val row = kv._2.head.copy()
          colMTC.foreach(col => {
            val cnt = kv._2.map(_.cells.apply(col.index).value.get.asInstanceOf[Long]).sum
            row.cells.update(col.index, ReportCell(pos, Some(cnt)))
          })
          row.rowKey = colDMS.apply(i).name
          if(colDMS.length - 1 > i) row.children = Some(kv._2.toArray)
          row
        })
        val cnt = tmpRDD.count()
      })
 
      tmpRDD = tmpRDD.sortBy(row => row.cells.apply(0).value.get.toString).groupBy(row => "totalsum").map(kv => {
        val row = kv._2.head.copy()
        colMTC.foreach(col => {
          val cnt = kv._2.map(_.cells.apply(col.index).value.get.asInstanceOf[Long]).sum
          row.cells.update(col.index, ReportCell(pos, Some(cnt)))
        })
        row.rowKey = "totalsum"
        row.children = Some(kv._2.toArray)
        row
      })
      result = Some(reportBase.copy(rows = tmpRDD.collect()))
      status = "executed"
    } catch {
      case ex: Exception => {
        var msg = ex.getMessage
        msg = msg.substring(0, msg.indexOf("\n\t"))
        status = "error:" + msg
      }
    }
  }
 
}

WebApp:元数据配置管理、数据可视化(报表),目前版本使用的是AngularJs,后面会切换到客户端的封装的框架(底层基于React)

UI组件:https://ant.design/

Chart组件:https://g2.alipay.com/

不只用于UI构建:Facebook React彻底解析

 

日志:ELK Stack

 

表达式解析器:ExpressionParser,参考资料:仿查询分析器的C#计算器  面向 Java 开发人员的 Scala 指南: 构建计算器