集智专栏
资源加载中,请稍后...
集智专栏

零基础深度学习Part IV:梯度下降与反向传播

原文: Deep Learning From Scratch IV: Gradient Descent and Backpropagation

翻译:孙一萌



梯度下降

一般来说,要找到函数的最小值,我们可以把设其导数为 0,然后求解参数。然而实际上要找到 $W$ 和 $b$ 的闭式解是不可能的,于是我们就要通过梯度下降的方法,迭代地寻找最小值。

想象一下你站在一座高山上,如果你想用最短的时间下山到达地面,那你就要选择山最陡峭的方向。

梯度下降是一样的道理,它先在参数的取值范围内随便取一个数,然后迭代地去寻找使损失 $J$ 最小的情况。迭代的每一步,都会找一个 $J$ 的值下降最陡峭的方向,然后沿这个方向继续迭代一步。

这个过程的一维情况如下图:

梯度下降的一维情况梯度下降的一维情况

在某个特定点上,下降最陡峭的方向,是由函数在该点的梯度决定的:梯度的相反数(因为是下降)就是损失下降最快的方向。

这样的话,如何最小化 $J$ 我们就有些思路了:

  1. 先给 $W$ 和 $b$ 一个随机值
  2. 计算 $J$ 的关于 $W$ 和 $b$ 的梯度
  3. 以梯度相反数为方向,再向前迭代一步
  4. 回到第 2 步

我们来写一个 operation,通过梯度下降计算一个节点的最小值。通过参数 learning_rate 可以设置每一步的步长。

程序说明
梯度下降计算节点最小值
预处理代码
import numpy as np class Operation: """Represents a graph node that performs a computation. An `Operation` is a node in a `Graph` that takes zero or more objects as input, and produces zero or more objects as output. """ def __init__(self, input_nodes=[]): """Construct Operation """ self.input_nodes = input_nodes # Initialize list of consumers (i.e. nodes that receive this operation's output as input) self.consumers = [] # Append this operation to the list of consumers of all input nodes for input_node in input_nodes: input_node.consumers.append(self) # Append this operation to the list of operations in the currently active default graph _default_graph.operations.append(self) def compute(self): """Computes the output of this operation. "" Must be implemented by the particular operation. """ pass class Graph: """Represents a computational graph """ def __init__(self): """Construct Graph""" self.operations = [] self.placeholders = [] self.variables = [] def as_default(self): global _default_graph _default_graph = self class placeholder: """Represents a placeholder node that has to be provided with a value when computing the output of a computational graph """ def __init__(self): """Construct placeholder """ self.consumers = [] # Append this placeholder to the list of placeholders in the currently active default graph _default_graph.placeholders.append(self) class Variable: """Represents a variable (i.e. an intrinsic, changeable parameter of a computational graph). """ def __init__(self, initial_value=None): """Construct Variable Args: initial_value: The initial value of this variable """ self.value = initial_value self.consumers = [] # Append this variable to the list of variables in the currently active default graph _default_graph.variables.append(self) class add(Operation): """Returns x + y element-wise. """ def __init__(self, x, y): """Construct add Args: x: First summand node y: Second summand node """ super().__init__([x, y]) def compute(self, x_value, y_value): """Compute the output of the add operation Args: x_value: First summand value y_value: Second summand value """ return x_value + y_value class matmul(Operation): """Multiplies matrix a by matrix b, producing a * b. """ def __init__(self, a, b): """Construct matmul Args: a: First matrix b: Second matrix """ super().__init__([a, b]) def compute(self, a_value, b_value): """Compute the output of the matmul operation Args: a_value: First matrix value b_value: Second matrix value """ return a_value.dot(b_value) class Session: """Represents a particular execution of a computational graph. """ def run(self, operation, feed_dict={}): """Computes the output of an operation Args: operation: The operation whose output we'd like to compute. feed_dict: A dictionary that maps placeholders to values for this session """ # Perform a post-order traversal of the graph to bring the nodes into the right order nodes_postorder = traverse_postorder(operation) # Iterate all nodes to determine their value for node in nodes_postorder: if type(node) == placeholder: # Set the node value to the placeholder value from feed_dict node.output = feed_dict[node] elif type(node) == Variable: # Set the node value to the variable's value attribute node.output = node.value else: # Operation # Get the input values for this operation from node_values node.inputs = [input_node.output for input_node in node.input_nodes] # Compute the output of this operation node.output = node.compute(*node.inputs) # Convert lists to numpy arrays if type(node.output) == list: node.output = np.array(node.output) # Return the requested node value return operation.output def traverse_postorder(operation): """Performs a post-order traversal, returning a list of nodes in the order in which they have to be computed Args: operation: The operation to start traversal at """ nodes_postorder = [] def recurse(node): if isinstance(node, Operation): for input_node in node.input_nodes: recurse(input_node) nodes_postorder.append(node) recurse(operation) return nodes_postorder red_points = np.random.randn(50, 2) - 2*np.ones((50, 2)) blue_points = np.random.randn(50, 2) + 2*np.ones((50, 2)) class negative(Operation): """ 逐元素计算负数 """ def __init__(self, x): """ 构造负运算 参数列表: x: 输入节点 """ super().__init__([x]) def compute(self, x_value): """ 计算负运算 operation 的输出 参数列表: x_value: 输入值 """ return -x_value class log(Operation): """ 对每一个元素进行对数运算 """ def __init__(self, x): """ 构造 log 参数列表: x: 输入节点 """ super().__init__([x]) def compute(self, x_value): """ 计算对数 operation 的输出 参数列表: x_value: 输入值 """ return np.log(x_value) class multiply(Operation): """ 对每一个元素,返回 x * y 的值 """ def __init__(self, x, y): """ 构造乘法 参数列表: x: 第一个乘数的输入节点 y: 第二个乘数的输入节点 """ super().__init__([x, y]) def compute(self, x_value, y_value): """ 计算乘法 operation 的输出 Args: x_value: 第一个乘数的值 y_value: 第二个乘数的值 """ return x_value * y_value class negative(Operation): """ 逐元素计算负数 """ def __init__(self, x): """ 构造负运算 参数列表: x: 输入节点 """ super().__init__([x]) def compute(self, x_value): """ 计算负运算 operation 的输出 参数列表: x_value: 输入值 """ return -x_value class reduce_sum(Operation): """ 计算张量中元素延某一或某些维度的总和 """ def __init__(self, A, axis=None): """ 构造 reduce_sum 参数列表: A: 要进行 reduce 运算的张量 axis: 需要 reduce 的维度,如果 `None`(即缺省值),则延所有维度 reduce """ super().__init__([A]) self.axis = axis def compute(self, A_value): """ 计算 reduce_sum operation 的输出值 参数列表: A_value: 输入的张量值 """ return np.sum(A_value, self.axis) class softmax(Operation): """返回 a 的 softmax 函数结果. """ def __init__(self, a): """构造 softmax 参数列表: a: 输入节点 """ super().__init__([a]) def compute(self, a_value): """计算 softmax operation 的输出值 参数列表: a_value: 输入值 """ return np.exp(a_value) / np.sum(np.exp(a_value), axis=1)[:, None]
示例代码
from queue import Queue class GradientDescentOptimizer: def __init__(self, learning_rate): self.learning_rate = learning_rate def minimize(self, loss): learning_rate = self.learning_rate class MinimizationOperation(Operation): def compute(self): # 计算梯度 grad_table = compute_gradients(loss) # 遍历所有节点 for node in grad_table: if type(node) == Variable: # 找到节点对应的梯度 grad = grad_table[node] # 沿着负梯度的方向进行一步迭代 node.value -= learning_rate * grad return MinimizationOperation()
程序验证过程
True

下图描绘了一次梯度下降的迭代过程。我们先随机找一条分割线(标序号 1),前进一步,找到一条稍微好一点的(标 2)。依此一步一步地前进,直到找到一条好的分割线。

迭代寻找“好”分割线迭代寻找“好”分割线


反向传播

在上面梯度下降的实现中,我们用了 compute_gradient(loss) 函数,来计算计算图中,loss operation 关于其他节点 n 的输出梯度(即让损失增加最快的方向,梯度的相反数是下降最快的方向)。

考虑以下计算图:

计算图计算图

根据导数的链式法则,我们可以得到:

$$ \frac{\partial e}{\partial a} = \frac{\partial e}{\partial b} \cdot \frac{\partial b}{\partial a} = \frac{\partial e}{\partial c} \cdot \frac{\partial c}{\partial b} \cdot \frac{\partial b}{\partial a} = \frac{\partial e}{\partial d} \cdot \frac{\partial d}{\partial c} \cdot \frac{\partial c}{\partial b} \cdot \frac{\partial b}{\partial a} $$

由此可见,要想算出 $e$ 关于 $a$ 的梯度,我们要从 $e$ 开始,一路倒推回 $a$,对于一路上的每个节点,计算它的输出关于输入的梯度,一个接一个,一直算到 $a$。然后把所有梯度相乘。

再考虑这个情景:

包含多路径的计算图包含多路径的计算图

这种情况下,从 $a$ 到 $e$ 有两条路径:$a, b, d, e$ 和 $a, c, d, e$。因此,$e$ 关于 $a$ 的全微分求法如下:

$$ \frac{\partial e}{\partial a} = \frac{\partial e}{\partial d} \cdot \frac{\partial d}{\partial a} = \frac{\partial e}{\partial d} \cdot (\frac{\partial d}{\partial b} \cdot \frac{\partial b}{\partial a} + \frac{\partial d}{\partial c} \cdot \frac{\partial c}{\partial a}) = \frac{\partial e}{\partial d} \cdot \frac{\partial d}{\partial b} \cdot \frac{\partial b}{\partial a} + \frac{\partial e}{\partial d} \cdot \frac{\partial d}{\partial c} \cdot \frac{\partial c}{\partial a} $$

由此我们就可以推导出一个通用的算法,用来算出损失关于一个节点的梯度:从代表损失的节点开始,反向做广度优先搜索。每访问一个节点 n,就算出损失关于 n 的输出的梯度,这里梯度要通过对节点 n 的每一个消费节点 c (消费节点即节点 c 以节点 n 的输出作为输入)做如下操作:

  • 取得损失关于 c 输出的梯度 G
  • G 乘上 c 的输出关于 n 的输出的梯度

之后我们把这些梯度相加。

实现反向传播有一个先决条件,我们需要为每个 operation 指定一个函数,通过关于 operation 的输出的梯度,计算关于 operation 的每个输入的梯度。

为此我们先来定义一个装饰器 @RegisterGradient(operation_name)

程序说明
装饰器
示例代码
# operation 到对应梯度计算函数的映射 _gradient_registry = {} class RegisterGradient: """装饰器,用来给 op type 注册梯度计算函数 """ def __init__(self, op_type): """创建一个装饰器实例,以 op_type 作为 Operation type 参数列表: op_type: operation 的名字 """ self._op_type = eval(op_type) def __call__(self, f): """把函数 `f` 注册为 `op_type`的梯度计算函数""" _gradient_registry[self._op_type] = f return f
程序验证过程
True

现在假设 _gradient_registry 字典内容已经完整了,于是我们来实现反向传播:

程序说明
反向传播
示例代码
from queue import Queue def compute_gradients(loss): # 可以通过 grad_table[node] 取出损失关于节点输出的梯度 grad_table = {} # 损失关于损失的梯度是 1 grad_table[loss] = 1 # 从损失节点开始,反向进行广度优先搜索 visited = set() queue = Queue() visited.add(loss) queue.put(loss) while not queue.empty(): node = queue.get() # 如果节点不是损失节点 if node != loss: # # 计算损失关于节点输出的梯度 # grad_table[node] = 0 # 遍历所有消费节点 for consumer in node.consumers: # 取出损失关于消费节点的输出的梯度 lossgrad_wrt_consumer_output = grad_table[consumer] # 取出根据关于消费者节点的输出的梯度,计算关于消费者节点的输入的梯度的函数 consumer_op_type = consumer.__class__ bprop = _gradient_registry[consumer_op_type] # 得到损失关于消费节点所有的输入的梯度 lossgrads_wrt_consumer_inputs = bprop(consumer, lossgrad_wrt_consumer_output) if len(consumer.input_nodes) == 1: # 如果消费节点只有一个输入节点,lossgrads_wrt_consumer_inputs 就是标量 grad_table[node] += lossgrads_wrt_consumer_inputs else: # 否则,lossgrads_wrt_consumer_inputs 是对各个输入节点的梯度的数组 # 取得该节点在消费节点的输入节点中的序列 node_index_in_consumer_inputs = consumer.input_nodes.index(node) # 得到损失关于节点的梯度 lossgrad_wrt_node = lossgrads_wrt_consumer_inputs[node_index_in_consumer_inputs] # 加到关于这个节点的总梯度中 grad_table[node] += lossgrad_wrt_node # # 把每个输入节点加入队列 # if hasattr(node, "input_nodes"): for input_node in node.input_nodes: if not input_node in visited: visited.add(input_node) queue.put(input_node) # 返回所有关于已访问过节点的梯度 return grad_table
程序验证过程
True

每个 operation 对应的梯度计算

对于每一个 operation,我们要定义一个函数,把损失关于 operation 的输出的梯度,转化为损失关于 operation 的每个输入节点的梯度,将它们存入列表中。计算一个关于矩阵的梯度很没意思,因此我们忽略这个过程,我直接把结果给出。跳过本节的学习不会影响你对整体的理解。

如果你想知道怎么得到这个结果,大致过程是这样的:

  • 找到各个输出值关于各个输入值的偏导数(可能是一个阶数大于 2 的张量,也就是说既不是标量,也不是向量,也不是矩阵,涉及大量的求和)
  • 运用链式法则,通过关于节点的输出的梯度,计算出关于节点的输入的梯度。得到的结果将会是一个和输入张量形态相同的张量,也就是说如果输入是一个矩阵,得到的结果也会是一个矩阵。
  • 把结果写成一连串的矩阵 operation 来提高运算效率。这一步有点取巧。

负运算对应的梯度计算

已知关于 $-x$ 的梯度 $G$,那么关于 $x$ 的梯度就是 $-G$。

程序说明
负运算
示例代码
@RegisterGradient("negative") def _negative_gradient(op, grad): """计算 `negative` 的梯度 参数列表: op: 我们要单独处理的 `negative` Operation grad: 关于这个 `negative` operation 的梯度 返回值: 关于 `negative` 的输入的梯度 """ return -grad
程序验证过程
True

对数对应的梯度计算

已知关于 $log(x)$ 的梯度 $G$,则关于 $x$ 的梯度就是 $\frac{G}{x}$。

程序说明
对数运算
示例代码
@RegisterGradient("log") def _log_gradient(op, grad): """计算 `log` 的梯度。 参数列表: op: 要处理的 `log` Operation grad: 关于 `log` operationa 的输出的梯度 返回值: 关于 `log` 的输入的梯度 """ x = op.inputs[0] return grad/x
程序验证过程
True

乘法对应的梯度计算

已知关于$A \odot B$ 的梯度 $G$,则关于 $A$ 的梯度为 $G \odot B$,关于 $B$ 的梯度是 $G \odot A$。

程序说明
乘法运算
示例代码
@RegisterGradient("multiply") def _multiply_gradient(op, grad): """计算 `multiply` 的梯度。 参数列表: op: 要处理的 `multiply` Operation grad: 关于 `multiply` operationa 的输出的梯度 返回值: 关于 `multiply` 的输入的梯度 """ A = op.inputs[0] B = op.inputs[1] return [grad * B, grad * A]
程序验证过程
True

matmul 对应的梯度运算

已知关于$AB$ 的梯度 $G$,则关于 $A$ 的梯度为 $GB^T$,关于 $B$ 的梯度是 $A^TG$。

程序说明
matmul
示例代码
@RegisterGradient("matmul") def _matmul_gradient(op, grad): """计算 `matmul` 的梯度。 参数列表: op: 要处理的 `matmul` Operation grad: 关于 `matmul` operationa 的输出的梯度 返回值: 关于 `matmul` 的输入的梯度 """ A = op.inputs[0] B = op.inputs[1] return [grad.dot(B.T), A.T.dot(grad)]
程序验证过程
True

add 对应的梯度运算

已知关于 $a + b$ 的梯度是 $G$,则关于 $a$ 的梯度为 $G$,关于 $b$ 的梯度也是 $G$。因此 $a$ 和 $b$ 的形状是相同的。

如果 $a$ 和 $b$ 形状不同,比如矩阵 $a$ 行数是 100,向量 $b$ 的行数为 1,那么 $a + b$ 就是将 $b$ 加到 $a$ 的每一行上。在这种情况下,梯度的计算会复杂一点,此处我们就不详细展开了。

程序说明
add
示例代码
@RegisterGradient("add") def _add_gradient(op, grad): """计算 `add` 对应的梯度 参数列表: op: 要处理的 `add` Operation grad: 关于 `add` operationa 的输出的梯度 返回值: 关于 `add` 的输入的梯度 """ a = op.inputs[0] b = op.inputs[1] grad_wrt_a = grad while np.ndim(grad_wrt_a) > len(a.shape): grad_wrt_a = np.sum(grad_wrt_a, axis=0) for axis, size in enumerate(a.shape): if size == 1: grad_wrt_a = np.sum(grad_wrt_a, axis=axis, keepdims=True) grad_wrt_b = grad while np.ndim(grad_wrt_b) > len(b.shape): grad_wrt_b = np.sum(grad_wrt_b, axis=0) for axis, size in enumerate(b.shape): if size == 1: grad_wrt_b = np.sum(grad_wrt_b, axis=axis, keepdims=True) return [grad_wrt_a, grad_wrt_b]
程序验证过程
True

reduce_sum 对应的梯度运算

已知关于 $reduce_sum$ 的输出的梯度 $G$,则关于输入 $A$ 的梯度为沿着指定轴重复 $G$。

示例代码
@RegisterGradient("reduce_sum") def _reduce_sum_gradient(op, grad): """计算 `reduce_sum` 对应的梯度 参数列表: op: 要处理的 `reduce_sum` Operation grad: 关于 `reduce_sum` operationa 的输出的梯度 返回值: 关于 `reduce_sum` 的输入的梯度 """ A = op.inputs[0] output_shape = np.array(A.shape) output_shape[op.axis] = 1 tile_scaling = A.shape // output_shape grad = np.reshape(grad, output_shape) return np.tile(grad, tile_scaling)
程序验证过程
True

softmax 对应的梯度运算

示例代码
@RegisterGradient("softmax") def _softmax_gradient(op, grad): """计算 `softmax` 对应的梯度 参数列表: op: 要处理的 `softmax` Operation grad: 关于 `softmax` operationa 的输出的梯度 返回值: 关于 `softmax` 的输入的梯度 """ softmax = op.output return (grad - np.reshape( np.sum(grad * softmax, 1), [-1, 1] )) * softmax
程序验证过程
True

示例

现在来试试上面的代码,看看我们能得出的感知机最佳权重是多少。

程序说明
示例
示例代码
# 创建一个新 Graph Graph().as_default() X = placeholder() c = placeholder() # 随机初始化权重 W = Variable(np.random.randn(2, 2)) b = Variable(np.random.randn(2)) # 搭建感知机 p = softmax(add(matmul(X, W), b)) # 搭建交叉熵损失 J = negative(reduce_sum(reduce_sum(multiply(c, log(p)), axis=1))) # 搭建最小化 operation minimization_op = GradientDescentOptimizer(learning_rate=0.01).minimize(J) # 搭建占位输入 feed_dict = { X: np.concatenate((blue_points, red_points)), c: [[1, 0]] * len(blue_points) + [[0, 1]] * len(red_points) } # 创建会话 session = Session() # 执行 100 步梯度下降迭代 for step in range(100): J_value = session.run(J, feed_dict) if step % 10 == 0: print("Step:", step, " Loss:", J_value) session.run(minimization_op, feed_dict) # 打印最终结果 W_value = session.run(W) print("Weight matrix:\n", W_value) b_value = session.run(b) print("Bias:\n", b_value)
程序验证过程
True

注意,现在我们一开始损失相当高,之后让损失迅速下降。

最后我们来画出最终的分割线:

程序说明
画出最终分割线
预处理代码
import matplotlib.pyplot as plt
示例代码
W_value = np.array([[ 1.27496197 -1.77251219], [ 1.11820232 -2.01586474]]) b_value = np.array([-0.45274057 -0.39071841]) # 画出直线 y = -x x_axis = np.linspace(-4, 4, 100) y_axis = -W_value[0][0]/W_value[1][0] * x_axis - b_value[0]/W_value[1][0] plt.plot(x_axis, y_axis) # 把红 / 蓝点画上去 plt.scatter(red_points[:,0], red_points[:,1], color='red') plt.scatter(blue_points[:,0], blue_points[:,1], color='blue') plt.show()
程序验证过程
True

如果你有什么问题,请在评论里提出来。

您也许喜欢这些文章

如何用Dropout降低过拟合风险

发表至系列教程
系列第二篇,介绍了什么是“过拟合”以及如何避免。降低过拟合风险的方法有很多,本文以Dropout为例,图解其基本原理,并以Kaggle入门竞赛“手写数字识别”为例,搭建卷积神经网络在线运行,通过对比两种不同的网络架构,体现Dropout的作用。

玩儿懂深度学习Part 4:深度学习的特征工程

发表至系列教程
这是《玩儿懂深度学习》系列文章的第四篇,主要举例介绍了深度学习计算过程中的一些数据预处理方法。

[Scikit-learn教程] 01 快速入门

发表至系列教程
从本篇文章开始,我们将在一系列的文章中介绍使用scikit-learn——一个著名的机器学习库件来完成机器学习过程的各种方法,包括算法的选择、模型的构建与训练、训练结果的评估等等。同时提供丰富的案例与小测试,帮助读者深刻掌握机器学习与数据挖掘领域中的各类基础知识与技能。

文章评论(0)