基于 Algebird 谈一谈代数数据类型在数据聚合中的应用

此文已由做者肖乃同受权网易云社区发布。
java

欢迎访问网易云社区,了解更多网易技术产品运营经验。android




代数数据类型是指知足必定数学特性的数据类型, 这些特性使得计算可以很方便的并行化,在Scalding和 Spark等数据计算框架中有着普遍的应用。代数数据类型是一个通用的概念, 其实现不限于Algebird, 本文主要结合近期处理的一个数据任务, 介绍一下这一技术及Algebird这个函数库。 文中代码示例都是基于Scala, 若有纰漏欢迎指正。git

应用场景:云阅读用户流失模型特征体取

近期接到这样一个任务, 提取一个特定时间窗口登录用户的95个特征,用于训练预测用户流失的模型。抽取 任何一个单独特征并不复杂, 不过特征众多,数据分布在多个数据源。我计划延续前期的代码, 使用scalding 处理这一任务。 这些特征大体分为四类:github

  1. 功能使用的次数sql

  2. 功能使用的天数express

  3. 用户某一属性最新的非空值apache

  4. 用户某一属性的集合数量api

使用次数就是常规的数值累加,2和4都要考虑集合的去重,3是时间上的maxBy同时要考虑空值处理问题。 为了方便统一处理,我考虑将这些数据都转化成可加的代数数据类型, 而后基于这些类型作聚合。安全

Scalding 及 Spark 类型安全聚合接口介绍

先来看一下Scalding提供的聚合接口,直接使用Algebird提供的聚合器:bash

import com.twitter.algebird.Aggregator.count
  val users: TypedPipe[User] = ???
  users.groupBy(_.userId).aggregate(count)复制代码

Spark在2.0后增长了DataSet这一新的API, 简单讲就是类型安全的DataFrame (基本等同于与scalding type safe api 之于 field based api)。

import org.apache.spark.sql.expressions.scalalang.count
 spark.createDataset(Seq(1,2,3))
      .groupByKey(_)
      .agg(count)复制代码

Spark 没有直接使用 algebird, 其参考algebird代码(看了一下spark的代码注释),写了一套相似接口。 后面会讲到如何在Spark使用Algebird。

Aggregator提供了可复用的聚合组件,再也不限于特定的字段。Algebird的Aggregator是基于半群和幺半群的代数数据类型聚合。

代数数据类型理论

先来补习一下数学知识,群是一个二元操做下知足必定特征的集合:

  1. 闭包性: 集合中的任意两个元素A和B, A op B 结果依然是集合的元素

  2. 结合律: 集合中的任意两个元素A、B和C, (A op B) op C 等价于 A op (B op C)

  3. 幺元(也能够叫零元 Unit):集合存在元素e, 使得任意的元素A有 e op A 等价于 A op e 等价于 A

  4. 逆元: 任意元素A, 存在集合元素B, 使得 A op B = e (e 为幺元)

知足所有条件的是群, 知足1和2的是半群, 知足一、2和3的是幺半群(有幺元存在)

举例几个具体的例子说明一下:

  • 天然数在加法下是一个群,知足闭包和结合律,0是幺元,负数是逆元

  • 偶数在加法下是一个群, 奇数不是, 不知足闭包性,奇数相加为偶数

  • 奇数在乘法下是一个幺半群, 不存在逆元

  • 正整数在加法下是一个半群, 不存在幺元,0不属于正整数

Algebird 实现介绍

Algebird 是twitter开源的scala的抽象代数库,实现了常见数据类型的半群、幺半群等支持, 是从scalding分离出来的通用库。

经过例子比较好理解:

import com.twitter.algebird._ import com.twitter.algebird.Operators._ Max(3) + Max(5) + Max(10)  // result: Max(10)
 Map(1 -> 2) + Map(1 -> 3) // result: Map(1 -> 5) 
 Map(1 -> Max(3), 2 -> Max(7)) + Map(1 -> Max(-10), 2 -> Max(20)) 
 // result: Map(1 -> Max(3), 2 -> Max(20))复制代码

Max是个Semigroup(半群), Map是个Monoid(幺半群), Algebird有很大的灵活性,从上面示例能够看到Map的值是半群,能够实现相同key的值的聚合。

Algebird除了基本Semigroup和Monoid, Map、IndexedSeq、Tuple等高阶的群 (参数类型是群的群,我这样称谓),能够组合出很是灵活的使用。

用户流失模型中的应用

回到我要处理的问题上来, 须要按照用户去计算4类不一样的特征值, 这些值很稀疏, 能够把上述问题转化成聚合问题。

以搜索这个事件来讲明, 假设要统计用户搜索的次数、天数、关键词数量,那么

Map("搜索次数" -> 1)
  Map("搜索天数" -> date)
  Map("搜索关键词数量" -> keyword)复制代码

Map是幺半群,须要值类型是半群, date和keyword须要转换成半群的数据结构。 关键词数量须要去重, 能够使用Set来作, 使用Set求集合, 最后取集合数量, 聚合器以下:

import com.twitter.algebird.Aggregator.{const, toSet, prepareMonoid => sumAfter}

  val searchCountAgg = sumAfter[MdaEvent, Map[String, Int]](_.searchCount)

  val keywordCountAgg = toSet[String]
   .composePrepare[ClientEvent](_.keyword)
   .andThenPresent(_.size)复制代码

搜索天数统计,日期也能够使用上述集合,不过天数的统计很是多, 集合开销比较大, 我把它转成一个bitset, , 我统计的窗口只有1个月, 因此用Long型记下相对于开始日期, 这一天是否是有使用:

import com.twitter.algebird.Monoid
  import com.github.nscala_time.time.Imports._
  import org.joda.time.Days

  class Bits(val value: Long) extends AnyVal {
    def count: Int = java.lang.Long.bitCount(value)
    def get(b: Int): Int = if((value & (1 << b)) > 0) 1 else 0
    override def toString: String = value.toBinaryString
  }

  object BitsMonoid extends Monoid[Bits] {
    override def zero = new Bits(0L)
    override def plus(left: Bits, right: Bits) = new Bits(left.value | right.value)
    override def sumOption(iter: TraversableOnce[Bits]): Option[Bits] = {
      if(iter.isEmpty) None
      Some(iter.reduce((a, b) => new Bits(a.value | b.value)))
    }
  }

  def dateDiffToBits(fromDate: DateTime): Long => Bits = {
    val base = fromDate.withTimeAtStartOfDay()
    (timestamp: Long) => {
      val theDay = new DateTime(timestamp).withTimeAtStartOfDay()
      val days = Days.daysBetween(base, theDay).getDays
      require(days < 64, s"only 64 bits long is supported, got day diff: $days")
      new Bits(1 << days)
    }
  }

  val toBitsFun = dateDiffToBits(sampleStartDate)

  val searchDaysAgg = {
      implicit val m = BitsMonoid
      sumAfter[MdaEvent, Map[String, Bits]] { event =>
        searchTime(event).mapValues(toBitsFun)
      }.andThenPresent(_.mapValues(b => b.count))
    }复制代码

最后来处理第三类特征非空最新属性,这个属性是取按时间的最大值,空值须要特别处理一下, 使用Max, 把排序函数修改一下:

import com.twitter.algebird.Aggregator.max
  def latestStringProperty[U <: ClientEvent](fn: U => String): Aggregator[U, U, String] = {
      import com.twitter.algebird.Aggregator.max
      implicit val ordU = Ordering.by { u: U =>
        val p = fn(u)
        val isEmpty = if (p.isEmpty) 0 else 1
        (isEmpty, u.opTime) // empty property always be covered by value property
      }
      max[U].andThenPresent(e => fn(e))
  }复制代码

最后就可以使用这些聚合器, 提取所需的特征值了

val multiOps = MultiAggregator(
    searchCountAgg,
    keywordCountAgg,
    searchDaysAgg,
    latestStringProperty(_.productVersion)
 )
  val daReport = daEvents.groupBy(_.userId).aggregate(multiOps)复制代码

如何在Spark中的使用

最后来说讲若是在Spark中使用Algebird聚合器, 这个特征提取原本应该在Spark处理更为方便, 来研究了一下Spark的聚合器。

Spark没有直接使用Algebird, 但其聚合器基本参照Algebird的, 我写了一个适配的类来方便直接在 Spark中使用上述的聚合器:

import com.twitter.algebird.MonoidAggregator
  import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
  import org.apache.spark.sql.expressions.Aggregator
  import org.apache.spark.sql.{Encoder, SparkSession, TypedColumn}

  implicit class MonoidToTypedColumn[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) {
    def toColumn: TypedColumn[A,C] = new MonoidAggregatorAdaptor(m).toColumn
  }

  class MonoidAggregatorAdaptor[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) extends  Aggregator[A,B,C] {
      override def zero = m.monoid.zero
      override def reduce(b: B, a: A) = m.reduce(b, m.prepare(a))
      override def finish(reduction: B) =  m.present(reduction)
      override def merge(b1: B, b2: B) = m.reduce(b1, b2)

      override def bufferEncoder = implicitly[Encoder[B]]
      override def outputEncoder = implicitly[Encoder[C]]
 }复制代码

这里只贴了适配Monoid的聚合器, Semigroup的会稍麻烦,代码比较多, 基本参考org.apache.spark.sql.expressions.ReduceAggregator。

最后咱们就能够直接再Spark使用Algebird:

val latest = maxBy[DeviceEvent, Long](_.timestamp).toColumn.name("latest")
  val count = size.toColumn.name("count")

  spark.createDataset(Seq(DeviceEvent("a", "iphone", 10L), DeviceEvent("a", "android", 100L), DeviceEvent("a", "iphone", 123L)))
      .groupByKey(_.id)
      .agg(count, latest)
      .collect复制代码

总结

使用代数数据类型, 咱们数据计算的代码更接近于问题描述语言, 表达力更强,避免了命令式的操做,bug更少。


网易云免费体验馆,0成本体验20+款云产品!

更多网易技术、产品、运营经验分享请点击




相关文章:
【推荐】 Android TV 开发 (1)

相关文章
相关标签/搜索