Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文和下文将介绍在线学习算法FTRL在Alink中是如何实现的,但愿对你们有所帮助。html
由于 Alink 实现的是 LR + FTRL,因此咱们须要从逻辑回归 LR 开始介绍。java
Logistic Regression 虽然被称为回归,但其其实是分类模型,并经常使用于二分类。Logistic 回归的本质是:假设数据服从这个分布,而后使用极大似然估计作参数的估计。python
逻辑回归的思路是,先拟合决策边界(不局限于线性,还能够是多项式),再创建这个边界与分类的几率联系,从而获得了二分类状况下的几率。算法
咱们从线性回归开始提及。某些状况下,使用线性的函数来拟合规律后取阈值的办法是行不通的。行不通的缘由在于拟合的函数太直,离群值(也叫异常值)对结果的影响过大。可是咱们的总体思路是没有错的,错的是用太"直"的拟合函数,若是咱们用来拟合的函数是非线性的,不这么直,是否是就好一些呢?机器学习
因此咱们下面来作两件事:分布式
对于第一件事,咱们用sigmod函数把回归函数掰弯。ide
对于二分类问题,1表示正例,0表示负例。逻辑回归是在线性函数输出预测实际值的基础上,寻找一个假设函数函数h_θ(x) = g(θ,x),将实际值映射到到0,1之间。逻辑回归中选择对数概率函数(logistic function)做为激活函数,对数概率函数是Sigmoid函数(形状为S的函数)的重要表明。函数
对于第二件事,咱们选定阈值是0.5。学习
意思就是,当我选阈值为0.5,那么小于0.5的必定是负例,哪怕他是0.49。此时咱们判断一个样本为负例必定是准确的吗?其实不必定,由于它仍是有49%的几率为正例的。可是,即使它是正例的几率为0.1,则咱们随机选择1w个样原本作预测,仍是会有接近100个预测它是负例结果它实际是正例的偏差。不管怎么选,偏差都是存在的,因此咱们选定阈值就是在选择能够接受偏差的程度。测试
到这里,逻辑回归的由来咱们就基本理清楚了,咱们知道逻辑回归的判别函数就是
如何求解逻辑回归?也就是如何找到一组可让 h(z) 全都预测正确几率最大的W。
求解逻辑回归的方法有很是多,咱们这里主要聊下梯度降低和牛顿法。优化的主要目标是找到一个方向,参数朝这个方向移动以后使得损失函数的值可以减少,这个方向每每由一阶偏导或者二阶偏导各类组合求得。
梯度降低是经过 J(w) 对 w 的一阶导数来找降低方向,而且以迭代的方式来更新参数。
牛顿法的基本思路是,在现有极小点估计值的附近对 J(w) 作二阶泰勒展开,进而找到极小点的下一个估计值。
当样本数据里N很大的时候,一般采用的是随机梯度降低法,算法以下所示:
while { for i in range(0,m): w_j = w_j + a * g_j }
随机梯度降低的好处是能够实现分布式并行化,具体计算流程是:
从逻辑回归的求解方法中咱们能够发现这些算法都是须要计算梯度的,所以逻辑回归的并行化最主要的就是对目标函数梯度计算的并行化。
咱们看到目标函数的梯度向量计算中只须要进行向量间的点乘和相加,能够很容易将每一个迭代过程拆分红相互独立的计算步骤,由不一样的节点进行独立计算,而后归并计算结果。
因此并行 LR 实际上就是在求解损失函数最优解的过程当中,针对寻找损失函数降低方向中的梯度方向计算做了并行化处理,而在利用梯度肯定降低方向的过程当中也能够采用并行化。
若是将样本矩阵按行划分,将样本特征向量分布到不一样的计算节点,由各计算节点完成本身所负责样本的点乘与求和计算,而后将计算结果进行归并,则实现了“ 按行 并行的LR”。
按行并行的LR解决了样本数量的问题,可是实际状况中会存在针对高维特征向量进行逻辑回归的场景(如广告系统中的特征维度高达上亿),仅仅按行进行并行处理,没法知足这类场景的需求,所以还须要 按列 将高维的特征向量拆分红若干小的向量进行求解。
传统的机器学习开发流程基本是如下步骤:
这种方式主要存在两个瓶颈:
好比,传统Batch算法中每次迭代对全体训练数据集进行计算(例如计算全局梯度),优势是精度和收敛还能够,缺点是没法有效处理大数据集(此时全局梯度计算代价太大),且无法应用于数据流作在线学习。
针对这些问题,通常而言有两种解决方式:
在线学习 ( OnlineLearningOnlineLearning ) 表明了一系列机器学习算法,特色是每来一个样本就能训练,可以根据线上反馈数据,实时快速地进行模型调整,使得模型及时反映线上的变化,提升线上预测的准确率。
传统的训练方法在模型训练上线后,通常是静态的,不会与线上的情况有任何的互动,加入预测错误,只能在下一次更新的时候完成修正,可是这个更新的时间通常比较长。
Online Learning训练方法不一样,会根据线上的预测结果动态调整模型,加入模型预测错误,从而及时作出修正,所以Online Learning可以更加及时地反应线上变化。
Online Learning的优化目标是使得总体的损失函数最小化,它须要快速求解目标函数的最优解。
在线学习算法的特色是:每来一个训练样本,就用该样本产生的loss和梯度对模型迭代一次,一个一个数据地进行训练,所以能够处理大数据量训练和在线训练。经常使用的有在线梯度降低(OGD)和随机梯度降低(SGD)等,本质思想是对上面【问题描述】中的未加和的单个数据的loss函数 L(w,zi)作梯度降低,由于每一步的方向并非全局最优的,因此总体呈现出来的会是一个看似随机的降低路线。
FTR是FTRL的前身,思想是每次找到让以前全部样本的损失函数之和最小的参数。
FTRL,即 Follow The Regularized Leader,是在以前的几个工做上产生的,主要出发点就是为了提升稀疏度且知足精度要求。FTRL 在FTL的优化目标的基础上,加入了正则化,防止过拟合。
FTRL的损失函数通常也不容易求解,这种状况下,通常须要找一个代理的损失函数。
代理损失函数须要知足如下条件:
为了衡量条件2中的两个解的差距,引入regret的概念。
通常对于在线学习来讲,咱们致力于解决两个问题: 下降 regret 和提升 sparsity。其中 regret 的定义为:
其中 t 表示总共 T 轮中的第 t 轮迭代,ℓt 表示损失函数,w 表示要学习的参数。Regret 表示 "代理函数求出来的解" 离 "真正损失函数求出来的解" 的损失差距。
第二项 表示获得了全部样本后损失函数的最优解,由于在线学习一次只能根据少数几个样本更新参数,随机性较大,因此须要一种稳健的优化方式,而 regret 字面意思是 “后悔度”,意即更新完不后悔。
在理论上能够证实,若是一个在线学习算法能够保证其 regret 是 t 的次线性函数,则:
那么随着训练样本的增多,在线学习出来的模型无限接近于最优模型。即随着训练样本的增长,代理损失函数和原损失函数求出来的参数的实际损失值差距愈来愈小。而绝不意外的,FTRL 正是知足这一特性。
另外一方面,现实中对于 sparsity,也就是模型的稀疏性也很看重。上亿的特征并不鲜见,模型越复杂,须要的存储、时间资源也随之升高,而稀疏的模型会大大减小预测时的内存和复杂度。另外稀疏的模型相对可解释性也较好,这也正是一般所说的 L1 正则化的优势。
Per-Coordinate 意思是FTRL是对w每一维分开训练更新的,每一维使用的是不一样的学习速率,也是上面代码中lamda2以前的那一项。与w全部特征维度使用统一的学习速率相比,这种方法考虑了训练样本自己在不一样特征上分布的不均匀性,若是包含w某一个维度特征的训练样本不多,每个样本都很珍贵,那么该特征维度对应的训练速率能够独自保持比较大的值,每来一个包含该特征的样本,就能够在该样本的梯度上前进一大步,而不须要与其余特征维度的前进步调强行保持一致。
咱们再看看下一时刻的特征权重的更新公式,增长理解(我我的以为找到的这个解释相对易于理解):
式中第一项是对损失函数的贡献的一个估计,第二项是控制w(也就是model)在每次迭代中变化不要太大,第三项表明L1正则(得到稀疏解)。
咱们采用的就是Alink官方示例代码。咱们能够看到大体分为几部分:
你大概已经看出来了,为了剖析FTRL,我前面作了不少工做......
public class FTRLExample { public static void main(String[] args) throws Exception { ...... // setup feature engineering pipeline Pipeline featurePipeline = new Pipeline() .add( new StandardScaler() // 标准缩放 .setSelectedCols(numericalColNames) ) .add( new FeatureHasher() // 特征哈希 .setSelectedCols(selectedColNames) .setCategoricalCols(categoryColNames) .setOutputCol(vecColName) .setNumFeatures(numHashFeatures) ); // 构建特征工程流水线 // fit feature pipeline model PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData); // prepare stream train data CsvSourceStreamOp data = new CsvSourceStreamOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv") .setSchemaStr(schemaStr) .setIgnoreFirstLine(true); // 对于流数据源进行实时切分获得原始训练数据和原始预测数据 // split stream to train and eval data SplitStreamOp splitter = new SplitStreamOp().setFraction(0.5).linkFrom(data); // 训练出一个逻辑回归模型做为FTRL算法的初始模型,这是为了系统冷启动的须要。 // train initial batch model LogisticRegressionTrainBatchOp lr = new LogisticRegressionTrainBatchOp() .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setMaxIter(10); BatchOperator<?> initModel = featurePipelineModel.transform(trainBatchData).link(lr); // 在初始模型基础上进行FTRL在线训练 // ftrl train FtrlTrainStreamOp model = new FtrlTrainStreamOp(initModel) .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setAlpha(0.1) .setBeta(0.1) .setL1(0.01) .setL2(0.01) .setTimeInterval(10) .setVectorSize(numHashFeatures) .linkFrom(featurePipelineModel.transform(splitter)); // 在FTRL在线模型的基础上,链接预测数据进行预测 // ftrl predict FtrlPredictStreamOp predictResult = new FtrlPredictStreamOp(initModel) .setVectorCol(vecColName) .setPredictionCol("pred") .setReservedCols(new String[]{labelColName}) .setPredictionDetailCol("details") .linkFrom(model, featurePipelineModel.transform(splitter.getSideOutput(0))); // 对预测结果流进行评估 // ftrl eval predictResult .link( new EvalBinaryClassStreamOp() .setLabelCol(labelColName) .setPredictionCol("pred") .setPredictionDetailCol("details") .setTimeInterval(10) ) .link( new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[]{"Statistics"}) .setOutputCols(new String[]{"Accuracy", "AUC", "ConfusionMatrix"}) .setJsonPath(new String[]{"$.Accuracy", "$.AUC", "$.ConfusionMatrix"}) ) .print(); StreamOperator.execute(); } }
用问题来引导剖析比较好,如下是咱们容易想到的一些问题。
后续咱们会一一探究这些问题。
在线训练是在 FtrlTrainStreamOp 类中实现的,其 linkFrom 函数实现了基本逻辑。
主要逻辑是:
代码摘要是:
@Override public FtrlTrainStreamOp linkFrom(StreamOperator<?>... inputs) { ...... // 3)获取切分信息 final int[] splitInfo = getSplitInfo(featureSize, hasInterceptItem, parallelism); DataStream<Row> initData = inputs[0].getDataStream(); // 4)切分高维向量。 // Tuple5<SampleId, taskId, numSubVec, SubVec, label> DataStream<Tuple5<Long, Integer, Integer, Vector, Object>> input = initData.flatMap(new SplitVector(splitInfo, hasInterceptItem, vectorSize, vectorTrainIdx, featureIdx, labelIdx)) .partitionCustom(new CustomBlockPartitioner(), 1); // train data format = <sampleId, subSampleTaskId, subNum, SparseVector(subSample), label> // feedback format = Tuple7<sampleId, subSampleTaskId, subNum, SparseVector(subSample), label, wx, timeStamps> // 5)构建一个 IterativeStream.ConnectedIterativeStreams iteration,这样会构建(或者说链接)两个数据流:反馈流和训练流; IterativeStream.ConnectedIterativeStreams<Tuple5<Long, Integer, Integer, Vector, Object>, Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>> iteration = input.iterate(Long.MAX_VALUE) .withFeedbackType(TypeInformation .of(new TypeHint<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() {})); // 6)用iteration来构建迭代体 iterativeBody,其包括两部分:CalcTask,ReduceTask; DataStream iterativeBody = iteration.flatMap( new CalcTask(dataBridge, splitInfo, getParams())) .keyBy(0) .flatMap(new ReduceTask(parallelism, splitInfo)) .partitionCustom(new CustomBlockPartitioner(), 1); // 7)result = iterativeBody.filter;基本是以时间间隔为标准来判断(也能够认为是时间驱动),"时间未过时&向量有意义" 的数据将被发送回反馈数据流,继续迭代,回到步骤 6),进入flatMap2; DataStream<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>> result = iterativeBody.filter( new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() { @Override public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> t3) throws Exception { // if t3.f0 > 0 && t3.f2 > 0 then feedback return (t3.f0 > 0 && t3.f2 > 0); } }); // 8)output = iterativeBody.filter;符合标准(时间过时了)的数据将跳出迭代,而后算法会调用WriteModel将LineModelData转换为多条Row,转发给下游operator(也就是在线预测阶段);即定时把模型更新给在线预测阶段。 DataStream<Row> output = iterativeBody.filter( new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() { @Override public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> value) throws Exception { /* if value.f0 small than 0, then output */ return value.f0 < 0; } }).flatMap(new WriteModel(labelType, getVectorCol(), featureCols, hasInterceptItem)); // 指定了某个流将成为迭代程序的结束,而且这个流将做为输入的第二部分(second input)被反馈回迭代 iteration.closeWith(result); TableSchema schema = new LinearModelDataConverter(labelType).getModelSchema(); ...... this.setOutput(output, names, types); return this; }
为了方便阅读,咱们给出流程图以下(这里省略了split 训练数据集/测试数据集):
原谅我用这种办法画图,由于我最讨厌看到一篇好文,结果发现图没了…
-------------------------------------------------------------------------------------------- │ 初始模型训练阶段 │ │ │ ┌─────────────────┐ ┌─────────────────┐ │ trainBatchData │ │ trainStreamData │ └─────────────────┘ └─────────────────┘ │ │ │ │ ┌──────────────────┐ │ │ featurePipeline │ │ └──────────────────┘ │ │ │ │ │ ┌─────────────┐ │ │ 线性回归模型 │ │ └─────────────┘ │ │ │ │ │ -------------------------------------------------------------------------------------------- │ 在线训练阶段 │ │ │ ┌─────────────┐ ┌──────────────────┐ │ dataBridge │ 加载初始化模型 │ featurePipeline │ └─────────────┘ └──────────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌──────────────────────────┐ │ 获取切分信息 │ getSplitInfo │ inputs[0].getDataStream()│ └─────────────┘ └──────────────────────────┘ │ │ │ │ │ │ │ SplitInfo │ │ │ │ │ ┌──────────────────────────┐ 特征向量 │ │ SplitVector │ <--------------------------│ └──────────────────────────┘ │ │ 解析输入,获得DataStream<Tuple5<SampleId, taskId, numSubVec, SubVec, label>> input │ │ ┌───────────────────────────┐ │ <Tuple5,Tuple7> iteration │ 迭代构建,两个输入train data Tuple5<>,feedback data Tuple7<> └───────────────────────────┘ │ │ CalcTask从逻辑上分红两个模块:flatMap1, flatMap2 │ │ ┌───────────────────┐ ┌───────────────────┐ │ CalcTask.flatMap1 │ 输入Tuple5<> │CalcTask.flatMap2 │ 输入Tuple7 <--------------- └───────────────────┘ └───────────────────┘ │ │ 分布计算FTRL算法中的predict部分 │ 分布处理反馈数据/更新参数/累积参数到期后发出 │ │ │ │ │ │ │ │<----------------------------------------- │ │ 以上两个flatmap都输出到下面ReduceTask │ │ │ │ │ ┌──────────────────────┐ │ │ ReduceTask.flatMap │ 1. 若是时间过时&所有收集完成,归并/输出模型(value.f0 < 0) │ └──────────────────────┘ 2. 未过时,归并每一个CalcTask计算的predict,造成一个 lable y │ │ │ │ │ ┌────────────────────┐ │ │ result = filter │ if t3.f0 > 0 && t3.f2 > 0 or not ? │ └────────────────────┘ │ │ │ │ │ │ │ │ if t3.f0 > 0 && t3.f2 > 0 then ┌───────────────────┐ │ │------------------------------------------>│CalcTask.flatMap2 │输出Tuple7 --------- │ "时间未过时&向量有意义" 将送回反馈,继续迭代 └───────────────────┘ │ │ │ 若是未造成反馈数据流,则继续过滤 │ │ ┌────────────────────┐ │ output = filter │ if value.f0 small than 0 or not ? └────────────────────┘ │ │ │ if value.f0 small than 0, then output │ 符合标准(时间过时了)的数据将跳出迭代,输出模型 │ │ ┌────────────┐ │ WriteModel │ 由于filter out,因此按期输出模型 └────────────┘ │ │ -------------------------------------------------------------------------------------------- │ 在线预测阶段 │ │ ┌─────────────────┐ │ │ testStreamData │ │ └─────────────────┘ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────────┐ │ FTRL Predict │ <----------------------------│ featurePipeline │ └──────────────┘ └──────────────────┘
由于上图在手机上会变形,如下这个图为你们在手机上浏览。
在线机器学习FTRL(Follow-the-regularized-Leader)算法介绍