[npnet] 线性回归
我们现在已经知道机器学习三个重要的模块是模型 (model)、标准 (criterion) 以及 优化 (optimization)。在这篇笔记中我们讨论各个模块必要的基本功能,然后用 Python 给出模块的基类 (base class)。 最后我们继承这些基类实现上一篇笔记的线性模型 (linear model) 以及均方误差 (mean square error)。
import numpy as np
import matplotlib.pyplot as plt
%run import_npnet.py
npnet = import_npnet(1)
模型
一个模型抽象来看是一个函数 \(f(x; a)\),其中 \(a\) 是模型的参数。因此一个模型必须要保存参数其自身的参数,以及实现函数 \(f(x; a)\) 的功能 (也叫正向传播/前馈 (feedforward))。另外,我们还需要损失函数对参数的偏导数来进行梯度下降,所以我们还需要在模型中实现反向传播 (backpropagation) 并保存计算好的偏导数。
# import
class Model:
def __init__(self, **kwargs):
self.kwargs = kwargs
self.parameters = {}
self.gradients = {}
@property
def _parameters(self):
if isinstance(self.parameters, list):
return self.parameters
return [self.parameters]
@property
def _gradients(self):
if isinstance(self.gradients, list):
return self.gradients
return [self.gradients]
def _init_gradients(self):
for params, grads in zip(self._parameters, self._gradients):
for name in params:
grads[name] = np.zeros_like(params[name])
def forward(self, input):
raise NotImplementedError()
def backward(self, grad):
raise NotImplementedError()
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def __repr__(self):
return f"<{self.__class__.__name__}>"
Model
中定义了两个属性 (property) Model._parameters
和 Model._gradients
。这两个属性返回对应属性的列表 (list) 形式。比如 Model._parameters
检查 Model.parameters
是否为列表,若是则直接返回,若不是则返回只有其一个元素的列表。这两个属性是为了方便以后的实现。我们还定义了 Model._init_gradients
。当我们定义好 Model.parameters
之后,我们可以用这个方法初始化所有偏导数为0。
线性回归
线性回归模型的数学表达式是
其中 \(x\) 是长度为 \(m\) 的行向量 (row vector),\(y\) 是长度为 \(n\) 的行向量,\(W\) 是 \(m \times n\) 的矩阵 (matrix),\(b\) 是长度为 \(n\) 的行向量,最后 \(x W\) 是矩阵乘法,\(+\) 是向量加法。在这个模型中,\(W\) 和 \(b\) 是模型参数。在 numpy
中,我们使用 @
代表矩阵乘法。知道这点,我们很容易写出 Linear.forward
的代码。难点在于如何正确地用 numpy
计算参数 W
和 b
对于损失函数的偏导数。
在实际应用中,\(x\) 并只是一个行向量,而是由若干个行向量形成的矩阵。假设 \(x\) 是一个 \(l \times m\) 矩阵。这说的是 \(x\) 是由 \(l\) 个长度为 \(m\) 的行向量构成的。因为 numpy
的广播 (broadcasting) 机制,我们可以将这样一个 \(l \times m\) 矩阵 \(x\) 作为输入,得到一个 \(l \times n\) 的矩阵 \(y\)。\(y\) 每一行是 \(x\) 相对应的那行代入上述线性公式得到的。这样我们相当于可以同时计算多个输入。这是便利,也为我们的反向传播的实现增加了难度。
假设观测值也组成一个 \(l \times n\) 矩阵 \(\tilde{y}\)。我们记 \(x = (x_{ij}), y = (y_{ij}), W = (W_{ij}), b = (b_1, \dots, b_n)\). 展开 \(y = xW + b\) 我们得到
根据反向传播的过程,我们会有一个 \(l \times n\) 矩阵 \(g = (g_{ij})\) 作为后面传递回来的梯度。这个传递回来的梯度的矩阵大小是跟我们模型的矩阵大小是一致的。假设最终的损失是 \(L\)。那么对于每个 \(b_h\),我们有
这里 \(\delta_{ij} = 1\) 当且仅当 \(i = j\),否则等于0。同样地,我们对每个 \(W_{uv}\),
通过上面的计算,我们发现对于 \(b\) 的偏导数我们可以将 \(g\) 的每一行加起来得到,而对于 \(W\) 的偏导数,我们可以简单地用矩阵乘法 \(\frac{\partial L}{\partial W} = x^T g\) 来表示。其中 \(x^T\) 是 \(x\) 的转置矩阵。至此我们已经成功推出参数的偏导数公式。但是我们还需要求出对于输入 \(x\) 的偏导数,作为梯度传递到前面去以继续反向传播的过程。对每个 \(x_{uv}\),
也就是说需要反向传播回去的梯度是 \(\frac{\partial L}{\partial x} = g W^T\)。
根据上面的推导公式,我们可以得到一下线性模型的实现。值得注意的是参数 \(W\) 的初始化,我使用的是 He initialization 。选择这样的初始化是因为后面我们常用的是 ReLU 激活函数。在使用 ReLU 时,这样的初始化有助于模型更快地收敛。
# import
class Linear(Model):
def __init__(self, in_features, out_features):
super().__init__(in_features=in_features,
out_features=out_features)
m, n = in_features, out_features
params = self.parameters
params['W'] = np.random.randn(m, n)
params['W'] *= np.sqrt(2.0 / (m + n))
params['b'] = np.zeros(n)
self._init_gradients()
def forward(self, input):
params = self.parameters
self.input = input
output = input @ params['W'] + params['b']
return output
def backward(self, grad):
self.gradients['W'] += self.input.T @ grad
self.gradients['b'] += np.sum(grad, axis=0)
return grad @ self.parameters['W'].T
标准
# import
class Criterion:
def __init__(self, *args, **kwargs):
pass
def forward(self, output, target):
raise NotImplementedError()
def backward(self, grad=1.0):
raise NotImplementedError()
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def __repr__(self):
return f"<{self.__class__.__name__}>"
均方误差
假设 \(y\) 是预测值,\(\tilde{y}\) 是观测值。均方误差的数学公式是
这里的 \(|x|_2^2\) 是求 \(x\) 的 L2 范式 (L2 norm) 的平方,是平方函数的向量推广。如果 \(x = (x_1, \dots, x_n)\), 那么
我们需要计算 \(\frac{\partial L}{\partial y}\):
# import
class MSELoss(Criterion):
def forward(self, output, target):
self.output = output
self.target = target
n = output.shape[0]
loss = np.sum((output - target) ** 2)
loss /= (2 * n)
self.loss = loss
return loss
def backward(self, grad=1.0):
n = self.output.shape[0]
return (self.output - self.target) * grad / n
我们发现 MSELoss.forward
和 MSELoss.backward
都除以了 n = output.shape[0]
。这是因为 targt
是将所有观测到的 y
放在一起形成的矩阵。target
的每一行代表一个观测点的 y
值。同理,output
的每一行代表对一个观测点的预测。按照之前我们的讨论,总的误差是所有观测点误差的平均值,所以要除以观测点的个数。
优化器
我们在后面的笔记中实现类似 torch.nn.Sequential
的简单神经网络。其本质就是将一些简单的模型串起来形成一个更大更深的模型。考虑到这种串型网络,我们将一个基本模型看作包含一个基本模型的串型网络。优化器 (Optimizer) 要将所有模型的参数以及它们的偏导数放在各自的列表来获取统一处理的便利。
# import
class Optimizer:
def __init__(self, model, lr=1e-3, *args, **kwargs):
self.model = model
self.parameters = model._parameters
self.gradients = model._gradients
self.lr = lr
def step(self):
raise NotImplementedError()
def zero_grad(self):
for grads in self.gradients:
for g in grads.values():
g.fill(0.0)
def __repr__(self):
return f"<{self.__class__.__name__}>"
随机梯度下降
我们之前说的梯度下降是使用所有的观测点来计算各个参数的偏导数,才进行一次参数更新。而随机梯度下降 (stochastic gradient descent) 是每次使用一个观测点去进行梯度下降法,也就是只使用一个观测点来计算各个参数的偏导数。随机梯度下降每次计算的梯度必然没有梯度下降计算的梯度准确,但是它使用了更小的内存,而且被证明能达到梯度下降的效果。
在我们的实现中,优化与梯度的计算是分开的,所以随机梯度下降与梯度下降在实现中并没有区别。我选择了 SGD
这个更常见的名字来命名这个实现。
# import
class SGD(Optimizer):
def step(self):
lr = self.lr
for params, grads in zip(self.parameters, self.gradients):
for name in params.keys():
params[name] -= lr * grads[name]
导数验证
每当我们实现一个模型时,我们最担心的就是能否正确地计算偏导数。所以我们需要用数值方法计算导数来验证 Model.backward
的正确性。给定一个函数 \(f(x)\),我们可以通过以下公式估算 \(f^\prime(x)\):
这个想法能推广到多元函数。
# import
class GradientCheck:
def __init__(self, model, input=None, input_shape=None,
sample=100, h=1e-4, allowed_error=1e-7):
self.model = model
self.input = input
if input is None:
self.input = np.random.randn(*input_shape)
self.sample = sample
self.h = h
self.allowed_error = allowed_error
self.criterion = MSELoss()
output = self.forward(self.input)
self.target = np.zeros_like(output)
self.criterion(output, self.target)
grad = self.criterion.backward()
self.grad = model.backward(grad)
def check(self):
check_gradient = self.check_gradient
if not check_gradient(self.input, self.grad):
return False
parameters = self.model._parameters
gradients = self.model._gradients
for params, grads in zip(parameters, gradients):
for name in params.keys():
if not check_gradient(params[name], grads[name]):
return False
return True
def check_gradient(self, comp, grad):
shape = comp.shape
input = self.input
h, sample = self.h, self.sample
criterion, model = self.criterion, self.model
target, allowed_error = self.target, self.allowed_error
for _ in range(sample):
point = tuple(np.random.randint(n) for n in shape)
comp[point] += h
high = criterion(self.forward(input), target)
comp[point] -= 2 * h
low = criterion(self.forward(input), target)
comp[point] += h
numerical_grad_at_point = (high - low) / (2 * h)
analytic_grad_at_point = grad[point]
error = np.abs(numerical_grad_at_point - analytic_grad_at_point)
if error > allowed_error:
return False
return True
def forward(self, *args, **kwargs):
return self.model.forward(*args, **kwargs)
linear = Linear(10, 20)
GradientCheck(linear, input_shape=(10, 10)).check()
True
直线拟合
在上一篇笔记里我们已经用两种方法做过了简单的直线拟合。我们现在使用 Linear
、MSELoss
和 SGD
来重新做一遍直线拟合。首先我们先生成数据。
x = np.arange(0, 5, 0.5).reshape(-1, 1)
z = 3 * x - 1
y = z + np.random.randn(len(x)).reshape(-1, 1)
plt.scatter(x, y, c='r')
plt.plot(x, z)
plt.show()
因为我们将机器学习的三个重要组成部分模块化了,所以使用起来也非常方便以及统一。我将这个过程用注释标记在下面的代码例子中。
#建立模型
model = Linear(1, 1)
# 记录下当前模型的初始值供后面的使用
a = model.parameters['W'][0][0]
b = model.parameters['b'][0]
# 选定超参数 (hyperparameter)
lr = 0.1
# 选择标准以及优化器
criterion = MSELoss()
optim = SGD(model, lr=lr)
# 训练模型
for _ in range(100):
output = model(x) # 预测
loss = criterion(output=output, target=y) # 计算损失
grad = criterion.backward() # 得到反向传播的初始梯度
optim.zero_grad() # 清除之前的偏导数
model.backward(grad) # 通过反向传播计算参数的偏导数
optim.step() # 使用优化器更新参数
# 检测模型
plt.scatter(x, y, c='r')
plt.plot(x, z, 'b', label='truth')
plt.plot(x, model(x), 'g', label='prediction')
plt.legend()
plt.show()
最后,我们重新用上一遍笔记的梯度下降法计算 \(y = ax + b\) 的参数 \(a\) 和 \(b\)。我们使用跟 model
一样的参数初值,通过同样100次迭代得到了跟 model.parameters
一样的参数,这从另一个侧面验证我们的实现是正确的。
n = len(x)
sx, sy, sxy, sx2 = np.sum(x), np.sum(y), np.sum(x * y), np.sum(x ** 2)
for _ in range(100):
ga = (sx2 * a + sx * b - sxy) / n
gb = (sx * a - sy) / n + b
a -= lr * ga
b -= lr * gb
print(f'a: {a:.8f}, b: {b:.8f}')
print(model.parameters)
a: 2.85006934, b: -0.60110195 {'W': array([[2.85006934]]), 'b': array([-0.60110195])}