不懂算法的数据开发者不是一个好的算法工程师,还记得研究生时候,导师讲过的一些数据挖掘算法,很有兴趣,可是无奈工做后接触少了,数据工程师的鄙视链,模型>实时>离线数仓>ETL工程师>BI工程师(不喜勿喷哈),如今作的工做主要是离线数仓,固然前期也作过一些ETL的工做,为了职业的长远发展,拓宽本身的技术边界,有必要逐步深刻实时和模型,因此从本篇文章开始,也是列个FLAG,深刻学习实时和模型部分。git
改变本身,从提高本身不擅长领域的事情开始。github
K-Means算法是无监督的聚类算法,它实现起来比较简单,聚类效果也不错,所以应用很普遍,算法
K-means算法,也称为K-平均或者K-均值,通常做为掌握聚类算法的第一个算法。sql
这里的K为常数,需事先设定,通俗地说该算法是将没有标注的 M 个样本经过迭代的方式汇集成K个簇。apache
在对样本进行汇集的过程每每是以样本之间的距离做为指标来划分。app
核心:K-means聚类算法是一种迭代求解的聚类分析算法,其步骤是随机选取K个对象做为初始的聚类中心,而后计算每一个对象与各个种子聚类中心之间的距离,把每一个对象分配给距离它最近的聚类中心。聚类中心以及分配给它们的对象就表明一个聚类。每分配一个样本,聚类的聚类中心会根据聚类中现有的对象被从新计算。这个过程将不断重复直到知足某个终止条件。终止条件能够是没有(或最小数目)对象被从新分配给不一样的聚类,没有(或最小数目)聚类中心再发生变化,偏差平方和局部最小dom
退出循环的条件:函数
1.指定循环次数学习
2.全部的中心点几乎再也不移动(即中心点移动的距离总和小于咱们给定的一个常熟,好比0.00001)大数据
K值的选择: k 值对最终结果的影响相当重要,而它却必需要预先给定。给定合适的 k 值,须要先验知识,凭空估计很困难,或者可能致使效果不好。
异常点的存在:K-means算法在迭代的过程当中使用全部点的均值做为新的质点(中心点),若是簇中存在异常点,将致使均值误差比较严重。 好比一个簇中有二、四、六、八、100五个数据,那么新的质点为24,显然这个质点离绝大多数点都比较远;在当前状况下,使用中位数6可能比使用均值的想法更好,使用中位数的聚类方式叫作K-Mediods聚类(K中值聚类)
初值敏感:K-means算法是初值敏感的,选择不一样的初始值可能致使不一样的簇划分规则。为了不这种敏感性致使的最终结果异常性,能够采用初始化多套初始节点构造不一样的分类规则,而后选择最优的构造规则。针对这点后面所以衍生了:二分K-Means算法、K-Means++算法、K-Means||算法、Canopy算法等
实现简单、移动、伸缩性良好等优势使得它成为聚类中最经常使用的算法之一。
连接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取码:hell
复制这段内容后打开百度网盘手机App,操做更方便哦
鸢尾花数据集,数据集包含3类共150调数据,每类含50个数据,每条记录含4个特征:花萼长度、花萼宽度、花瓣长度、花瓣宽度
过这4个 特征,将花聚类,假设将K取值为3,看看与实际结果的差异
没有使用mlb库,而是使用scala原生实现
package com.hoult.work import org.apache.commons.lang3.math.NumberUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.math.{pow, sqrt} import scala.util.Random object KmeansDemo { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName(this.getClass.getCanonicalName) .getOrCreate() val sc = spark.sparkContext val dataset = spark.read.textFile("data/lris.csv") .rdd.map(_.split(",").filter(NumberUtils.isNumber _).map(_.toDouble)) .filter(!_.isEmpty).map(_.toSeq) val res: RDD[(Seq[Double], Int)] = train(dataset, 3) res.sample(false, 0.1, 1234L) .map(tp => (tp._1.mkString(","), tp._2)) .foreach(println) } // 定义一个方法 传入的参数是 数据集、K、最大迭代次数、代价函数变化阈值 // 其中 最大迭代次数和代价函数变化阈值是设定了默认值,能够根据须要作相应更改 def train(data: RDD[Seq[Double]], k: Int, maxIter: Int = 40, tol: Double = 1e-4) = { val sc: SparkContext = data.sparkContext var i = 0 // 迭代次数 var cost = 0D //初始的代价函数 var convergence = false //判断收敛,即代价函数变化小于阈值tol // step1 :随机选取 k个初始聚类中心 var initk: Array[(Seq[Double], Int)] = data.takeSample(false, k, Random.nextLong()).zip(Range(0, k)) var res: RDD[(Seq[Double], Int)] = null while (i < maxIter && !convergence) { val bcCenters = sc.broadcast(initk) val centers: Array[(Seq[Double], Int)] = bcCenters.value val clustered: RDD[(Int, (Double, Seq[Double], Int))] = data.mapPartitions(points => { val listBuffer = new ListBuffer[(Int, (Double, Seq[Double], Int))]() // 计算每一个样本点到各个聚类中心的距离 points.foreach { point => // 计算聚类id以及最小距离平方和、样本点、1 val cost: (Int, (Double, Seq[Double], Int)) = centers.map(ct => { ct._2 -> (getDistance(ct._1.toArray, point.toArray), point, 1) }).minBy(_._2._1) // 将该样本归属到最近的聚类中心 listBuffer.append(cost) } listBuffer.toIterator }) // val mpartition: Array[(Int, (Double, Seq[Double]))] = clustered .reduceByKey((a, b) => { val cost = a._1 + b._1 //代价函数 val count = a._3 + b._3 // 每一个类的样本数累加 val newCenters = a._2.zip(b._2).map(tp => tp._1 + tp._2) // 新的聚类中心点集 (cost, newCenters, count) }) .map { case (clusterId, (costs, point, count)) => clusterId -> (costs, point.map(_ / count)) // 新的聚类中心 } .collect() val newCost = mpartition.map(_._2._1).sum // 代价函数 convergence = math.abs(newCost - cost) <= tol // 判断收敛,即代价函数变化是否小于小于阈值tol // 变换新的代价函数 cost = newCost // 变换初始聚类中心 initk = mpartition.map(tp => (tp._2._2, tp._1)) // 聚类结果 返回样本点以及所属类的id res = clustered.map(tp=>(tp._2._2,tp._1)) i += 1 } // 返回聚类结果 res } def getDistance(x:Array[Double],y:Array[Double]):Double={ sqrt(x.zip(y).map(z=>pow(z._1-z._2,2)).sum) } }
完整代码:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/work/KmeansDemo.scala
结果截图:
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注