Deep Interest Network(DIN)是阿里妈妈精准定向检索及基础算法团队在2017年6月提出的。其针对电子商务领域(e-commerce industry)的CTR预估,重点在于充分利用/挖掘用户历史行为数据中的信息。python
本系列文章解读论文以及源码,顺便梳理一些深度学习相关概念和TensorFlow的实现。git
本文经过DIN源码 https://github.com/mouna99/dien 分析,来深刻展开看看embedding层如何自动更新。github
在上文中,咱们分析了embedding层的做用,可是留了一个问题还没有解答:算法
即DIN代码中,以下变量怎么更新:编程
self.uid_embeddings_var = tf.get_variable("uid_embedding_var", [n_uid, EMBEDDING_DIM]) self.mid_embeddings_var = tf.get_variable("mid_embedding_var", [n_mid, EMBEDDING_DIM]) self.cat_embeddings_var = tf.get_variable("cat_embedding_var", [n_cat, EMBEDDING_DIM])
由于在DIN中,只有这一处初始化 embeddings 的地方,没有找到迭代更新的代码,这会给初学者带来一些困扰。网络
先简要说一下答案,embedding层经过 optimizer 进行更新(自动求导),经过 session.run 进行调用更新。session
通常意义的 embedding 大可能是神经网络倒数第二层的参数权重,只具备总体意义和相对意义,不具有局部意义和绝对含义,这与 embedding 的产生过程有关,app
任何 embedding 一开始都是一个随机数,而后随着优化算法,不断迭代更新,最后网络收敛中止迭代的时候,网络各个层的参数就相对固化,获得隐层权重表(此时就至关于获得了咱们想要的 embedding),而后在经过查表能够单独查看每一个元素的 embedding。框架
DIN中对应代码以下:dom
# 优化更新(自动求导) self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss) ...... # 经过 session.run 进行调用更新 def train(self, sess, inps): if self.use_negsampling: loss, accuracy, aux_loss, _ = sess.run([self.loss, self.accuracy, self.aux_loss, self.optimizer], feed_dict={ self.uid_batch_ph: inps[0], self.mid_batch_ph: inps[1], self.cat_batch_ph: inps[2], self.mid_his_batch_ph: inps[3], self.cat_his_batch_ph: inps[4], self.mask: inps[5], self.target_ph: inps[6], self.seq_len_ph: inps[7], self.lr: inps[8], self.noclk_mid_batch_ph: inps[9], self.noclk_cat_batch_ph: inps[10], }) return loss, accuracy, aux_loss else: loss, accuracy, _ = sess.run([self.loss, self.accuracy, self.optimizer], feed_dict={ self.uid_batch_ph: inps[0], self.mid_batch_ph: inps[1], self.cat_batch_ph: inps[2], self.mid_his_batch_ph: inps[3], self.cat_his_batch_ph: inps[4], self.mask: inps[5], self.target_ph: inps[6], self.seq_len_ph: inps[7], self.lr: inps[8], }) return loss, accuracy, 0
这涉及的部分不少,咱们须要一一阐释。
大多数机器学习(深度学习)任务就是最小化损失,在损失函数定义好的状况下,使用一种优化器进行求解最小损失。
而为了让loss降低,深度学习框架常见的优化方式通常采用的是梯度降低(Gradient Descent)算法,这要求对loss公式上的每一个op都须要求偏导,而后使用链式法则结合起来。
给定一个可微函数,理论上能够用解析法找到它的最小值:函数的最小值是导数为 0 的点,所以你只需找到全部导数为 0 的点,而后计算函数在其中哪一个点具备最小值。
将这一方法应用于神经网络,就是用解析法求出最小损失函数对应的全部权重值。能够经过对方程 gradient(f)(W) = 0 求解 W 来实现这一方法。
即便用基于梯度的优化方式进行求解,基于当前在随机数据批量上的损失,一点一点地对参数进行调节。因为处理的是一个可微函数,你能够计算出它的梯度,而后沿着梯度的反方向更新权重,损失每次都会变小一点。
W -= step * gradient
,从而使这批数据这就叫做小批量随机梯度降低(mini-batch stochastic gradient descent,又称为小批量SGD)。
术语随机(stochastic)是指每批数据都是随机抽取的(stochastic 是random在科学上的同义词)。
反向传播 算法的训练过程则是根据网络计算获得的 Y_out 和实际的真实结果 Y_label 来计算偏差,而且沿着网络反向传播来调整公式中的全部 Wi 和 bi,使偏差达到最小。强调一下,深度学习里面 BP 的本质目标是让偏差达到最小,因此要用偏差对中间出现过的全部影响因素求偏导。
经过反向传播算法优化神经网络是一个迭代的过程。
前向求导是从第一层开始,逐层计算梯度 ∂ / ∂X 到最后一层。反向求导是从最后一层开始,逐层计算梯度 ∂Z / ∂ 到第一层。前向求导关注的是输入是怎么影响到每一层的,反向求导则是关注于每一层是怎么影响到最终的输出结果的。
自动求导就是每个op/layer本身依据本身的输入和输出作前向计算/反向求导,而框架则负责组装调度这些op/layer,表现出来就是你经过框架去定义网络/计算图,框架自动前向计算并自动求导。
常见的深度学习框架里每一个op(op指的是最小的计算单元,caffe里叫layer)都预先定义好了 forward 和backward(或者叫grad)两个函数,这里的 backward 也就是求导。也就是说每一个op的求导都是预先定义好的,或者说是人手推的。
当你定义好了一个神经网络,常见的深度学习框架将其解释为一个dag(有向无环图),dag里每一个节点就是op,从loss function这个节点开始,经过链式法则一步一步从后往前计算每一层神经网络的梯度,整个dag梯度计算的最小粒度就是op的 backward 函数(这里是手动的),而链式法则则是自动的。
TensorFlow也是如此。
TensorFlow 提供的是声明式的编程接口,用户不须要关心求导的细节,只须要定义好模型获得一个loss方程,而后使用TensorFlow实现的各类Optimizer来进行运算便可。
这要求TensorFlow自己提供了每一个op的求偏导方法,并且虽然咱们使用的是Python的加减乘除运算符,其实是TensorFlow重载了运算符实际上会建立“Square”这样的op,能够方便用户更容易得构建表达式。
所以TensorFlow的求导,其实是先提供每个op求导的数学实现,而后使用链式法则求出整个表达式的导数。
具体咱们能够参见RegisterGradient的实现,以及nn_grad.py,math_grad.py等几个文件
这些文件的全部的函数都用RegisterGradient装饰器包装了起来,这些函数都接受两个参数,op和grad。其余的只要注册了op的地方也有各类使用这个装饰器,例如batch。
RegisterGradient使用举例以下:
@ops.RegisterGradient("Abs") def _AbsGrad(op, grad): x = op.inputs[0] return grad * math_ops.sign(x)
RegisterGradient定义以下,就是注册op梯度函数的装饰器:
class RegisterGradient(object): def __init__(self, op_type): if not isinstance(op_type, six.string_types): raise TypeError("op_type must be a string") self._op_type = op_type def __call__(self, f): """Registers the function `f` as gradient function for `op_type`.""" _gradient_registry.register(f, self._op_type) return f
道理说着还不错,可是神经网络是究竟怎么反向传递更新呢?这就须要看Optimizer了。
回到 TensorFlow 的 Python 代码层面,自动求导的部分是靠各类各样的 Optimizer 串起来的:
minimize()
方法就会自动完成反向部分的数据流图构建。在DIEN这里,代码以下:
ctr_loss = - tf.reduce_mean(tf.log(self.y_hat) * self.target_ph) self.loss = ctr_loss if self.use_negsampling: self.loss += self.aux_loss self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss)
TF的optimizer都继承自Optimizer这个类,这个类的方法很是多,几个重要方法是 minimize、compute_gradients、apply_gradients、slot系列。
Optimizer 基类的这个方法为每一个实现子类预留了_create_slots()
,_prepare()
,_apply_dense()
,_apply_sparse()
四个接口出来,后面新构建的 Optimizer 只须要重写或者扩展 Optimizer 类的某几个函数便可;
整个反向传播过程可分为三步,这三步仅需经过一个minimize()函数完成:
compute_gradients()
;apply_gradients();
即往最小化 loss 的方向更新 var_list 中的每个参数;代码以下:
def minimize(self, loss, global_step=None, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, name=None, grad_loss=None): grads_and_vars = self.compute_gradients( loss, var_list=var_list, gate_gradients=gate_gradients, aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops, grad_loss=grad_loss) vars_with_grad = [v for g, v in grads_and_vars if g is not None] return self.apply_gradients(grads_and_vars, global_step=global_step, name=name)
该函数用于计算loss对于可训练变量val_list的梯度,最终返回的是元组列表,即 [(gradient, variable),...]。
参数含义:
tf.Variable
to update to minimize loss
. Defaults to the list of variables collected in the graph under the key GraphKeys.TRAINABLE_VARIABLES
.基本逻辑以下:
其中,_get_processor函数可理解为一种快速更新variables的方法,每一个processor都会包含一个update_op这样的函数来进行variable更新操做。
变量更新公式:
代码以下:
def compute_gradients(self, loss, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, grad_loss=None): self._assert_valid_dtypes([loss]) if grad_loss is not None: self._assert_valid_dtypes([grad_loss]) if var_list is None: var_list = ( variables.trainable_variables() + ops.get_collection(ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES)) else: var_list = nest.flatten(var_list) var_list += ops.get_collection(ops.GraphKeys._STREAMING_MODEL_PORTS) processors = [_get_processor(v) for v in var_list] var_refs = [p.target() for p in processors] grads = gradients.gradients( loss, var_refs, grad_ys=grad_loss, gate_gradients=(gate_gradients == Optimizer.GATE_OP), aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops) if gate_gradients == Optimizer.GATE_GRAPH: grads = control_flow_ops.tuple(grads) grads_and_vars = list(zip(grads, var_list)) return grads_and_vars
gradients 的实际定义在 tensorflow/python/ops/gradients_impl.py
中。把整个求导过程抽象成一个 ys=f(xs) 的函数。
简单说,它就是为了计算一组输出张量ys = [y0, y1, ...]
对输入张量xs = [x0, x1, ...]
的梯度,对每一个xi
有grad_i = sum[dy_j/dx_i for y_j in ys]
。默认状况下,grad_loss
是None
,此时grad_ys
被初始化为全1向量。
gradients 部分参数以下:
grad_ys
存储计算出的梯度;gate_gradients
是一个布尔变量,指示全部梯度是否在使用前被算出,若是设为True
,能够避免竞争条件;这个方法会维护两个重要变量
queue
,队列里存放计算图里全部出度为0的操做符grads
,字典的键是操做符自己,值是该操做符每一个输出端收到的梯度列表反向传播求梯度时,每从队列中弹出一个操做符,都会把它输出变量的梯度加起来(对应全微分定理)获得out_grads
,而后获取对应的梯度计算函数grad_fn
。操做符op
自己和out_grads
会传递给grad_fn
作参数,求出输入的梯度。
基本逻辑以下:
具体代码以下:
def gradients(ys, xs, grad_ys=None, name="gradients", colocate_gradients_with_ops=False, gate_gradients=False, aggregation_method=None, stop_gradients=None): to_ops = [t.op for t in ys] from_ops = [t.op for t in xs] grads = {} # Add the initial gradients for the ys. for y, grad_y in zip(ys, grad_ys): _SetGrad(grads, y, grad_y) # Initialize queue with to_ops. queue = collections.deque() # Add the ops in 'to_ops' into the queue. to_ops_set = set() for op in to_ops: ready = (pending_count[op._id] == 0) if ready and op._id not in to_ops_set: to_ops_set.add(op._id) queue.append(op) while queue: # generate gradient subgraph for op. op = queue.popleft() with _maybe_colocate_with(op, colocate_gradients_with_ops): if loop_state: loop_state.EnterGradWhileContext(op, before=True) out_grads = _AggregatedGrads(grads, op, loop_state, aggregation_method) if loop_state: loop_state.ExitGradWhileContext(op, before=True) if has_out_grads and (op._id not in stop_ops): if is_func_call: func_call = ops.get_default_graph()._get_function(op.type) grad_fn = func_call.python_grad_func else: try: grad_fn = ops.get_gradient_function(op) for i, (t_in, in_grad) in enumerate(zip(op.inputs, in_grads)): if in_grad is not None: if (isinstance(in_grad, ops.Tensor) and t_in.dtype != dtypes.resource): try: in_grad.set_shape(t_in.get_shape()) _SetGrad(grads, t_in, in_grad) if loop_state: loop_state.ExitGradWhileContext(op, before=False)
该函数的做用是将compute_gradients()
返回的值做为输入参数对variable进行更新,即根据前面求得的梯度,把梯度进行方向传播给weights和biases进行参数更新。
那为何minimize()
会分开两个步骤呢?缘由是由于在某些状况下咱们须要对梯度作必定的修正,例如为了防止梯度消失(gradient vanishing)或者梯度爆炸(gradient explosion),咱们须要事先干预一下以避免程序出现Nan的尴尬状况;有的时候也许咱们须要给计算获得的梯度乘以一个权重或者其余乱七八糟的缘由,因此才分开了两个步骤。
基本逻辑以下:
grad, var, processor in converted_grads_and_vars
,应用 ops.colocate_with(var),做用是保证每一个参数var的更新都在同一个device上;update_ops.append(processor.update_op(self, grad))
,若是有global_step
的话,global_step需加个1。train_op
。train_op
是一般训练过程当中,client为session的fetches提供的参数之一,也就是这个Operation被执行以后,模型的参数将会完成更新,并开始下一个batch的训练。那么这也就意味着,这个方法中涉及到的计算图将会实现说明文档中的训练逻辑。具体代码是:
def apply_gradients(self, grads_and_vars, global_step=None, name=None): grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works. converted_grads_and_vars = [] for g, v in grads_and_vars: if g is not None: # Convert the grad to Tensor or IndexedSlices if necessary. g = ops.convert_to_tensor_or_indexed_slices(g) p = _get_processor(v) converted_grads_and_vars.append((g, v, p)) converted_grads_and_vars = tuple(converted_grads_and_vars) var_list = [v for g, v, _ in converted_grads_and_vars if g is not None] with ops.control_dependencies(None): self._create_slots([_get_variable_for(v) for v in var_list]) update_ops = [] with ops.name_scope(name, self._name) as name: self._prepare() for grad, var, processor in converted_grads_and_vars: if grad is None: continue scope_name = var.op.name if context.in_graph_mode() else "" with ops.name_scope("update_" + scope_name), ops.colocate_with(var): update_ops.append(processor.update_op(self, grad)) if global_step is None: apply_updates = self._finish(update_ops, name) else: with ops.control_dependencies([self._finish(update_ops, "update")]): with ops.colocate_with(global_step): apply_updates = state_ops.assign_add(global_step, 1, name=name).op train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP) if apply_updates not in train_op: train_op.append(apply_updates) return apply_updates
DIEN使用的是AdamOptimizer优化器。
Adam 这个名字来源于自适应矩估计(Adaptive Moment Estimation),也是梯度降低算法的一种变形,可是每次迭代参数的学习率都有必定的范围,不会由于梯度很大而致使学习率(步长)也变得很大,参数的值相对比较稳定。
几率论中矩的含义是:若是一个随机变量 X 服从某个分布,X 的一阶矩是 E(X),也就是样本平均值,X 的二阶矩就是 E(X^2),也就是样本平方的平均值。
Adam 算法利用梯度的一阶矩估计和二阶矩估计动态调整每一个参数的学习率。TensorFlow提供的tf.train.AdamOptimizer可控制学习速度,通过偏置校订后,每一次迭代学习率都有个肯定范围,使得参数比较平稳。
在利用计算好的导数对权重进行修正时,对Embedding矩阵的梯度进行特殊处理,只更新局部,见optimization.py中Adagrad.update函数。
在_prepare
函数中经过convert_to_tensor
方法来存储了输入参数的 Tensor 版本。
def _prepare(self): self._lr_t = ops.convert_to_tensor(self._lr, name="learning_rate") self._beta1_t = ops.convert_to_tensor(self._beta1, name="beta1") self._beta2_t = ops.convert_to_tensor(self._beta2, name="beta2") self._epsilon_t = ops.convert_to_tensor(self._epsilon, name="epsilon")
_create_slots
函数用来建立参数,好比 _beta1_power,_beta2_power
def _create_slots(self, var_list): first_var = min(var_list, key=lambda x: x.name) create_new = self._beta1_power is None if not create_new and context.in_graph_mode(): create_new = (self._beta1_power.graph is not first_var.graph) if create_new: with ops.colocate_with(first_var): self._beta1_power = variable_scope.variable(self._beta1, name="beta1_power", trainable=False) self._beta2_power = variable_scope.variable(self._beta2, name="beta2_power", trainable=False) # Create slots for the first and second moments. for v in var_list: self._zeros_slot(v, "m", self._name) self._zeros_slot(v, "v", self._name)
函数_apply_dense
和_resource_apply_dense
的实现中分别使用了training_ops.apply_adam
和training_ops.resource_apply_adam
方法。
函数_apply_sparse
和_resource_apply_sparse
主要用在稀疏向量的更新操做上,而具体的实现是在函数_apply_sparse_shared
中。
_apply_sparse_shared
函数,首先获取所须要的参数值并存储到变量里,接着按照 Adam 算法的流程,首先计算学习率,接着计算两个 Momentum ,因为是稀疏 tensor 的更新,因此在算出更新值以后要使用
scatter_add
来完成加法操做, 最后将var_update
和m_t
、v_t
的更新操做放进control_flow_ops.group
中。
优化器已经搭建好,剩下就是调用 session.run
进行更新。
调用一次 run 是执行一遍数据流图, 在 TensorFlow 的训练代码中一般是在一个循环中屡次调用 sess.run()
,一次 run 即为训练过程当中的一步。
fetches 是 run 方法的一个输入参数,这个参数能够是不少种形式的数据,run 最后的 返回值也会和 fetches 有相同的结构。
至此,DIN分析暂时告一段落,下篇开始 DIEN 的分析,敬请期待。
TensorFlow SyncReplicasOptimizer 解读
tensorflow中有向图(计算图、Graph)、上下文环境(Session)和执行流程
TensorFlow 拆包(一):Session.Run ()
tensorflow源码分析(五)session.run()
Tensorflow中优化器--AdamOptimizer详解
【TensorFlow】优化器AdamOptimizer的源码分析
TensorFlow中Session、Graph、Operation以及Tensor详解
TensorFlow 拆包(二):TF 的数据流模型实现以及自动求导
分布式Tensorflow中同步梯度更新tf.train.SyncReplicasOptimizer解读(backup_worker的用法)
TensorFlow学习笔记之--[compute_gradients和apply_gradients原理浅析]