import numpy as np
import matplotlib.pyplot as plt

%run import_npnet.py
npnet = import_npnet(7)


# RNN

RNN 的主要想法是在模型的内部存储一个隐藏的状态来提取之前所有输入的特征。这个隐藏的状态与当前的输入共同决定输出，并根据当前的输入变更。假设在时间点 $t$ 的输入是 $x_t$，隐藏状态是 $h_t$， 输出是 $y_t$。这些 $x_t, y_t$$h_t$ 都是横向量。初始时，$h_0=0$ 为零向量。那么对于 $t \ge 1$

\begin{align*} h_{t} &= \tanh \left( x_t W_{xh} + h_{t-1}W_{hh} + b_h \right) \\ y_{t} &= h_{t} W_{hy} + b_y. \end{align*}

\begin{align*} \frac{\partial L}{\partial b_{y}} &= \frac{\partial L}{\partial y_t}, \\ \frac{\partial L}{\partial W_{hy}} &= h_t^T \cdot \frac{\partial L}{\partial y_t}. \end{align*}

$$\frac{\partial L}{\partial h_t} = \frac{\partial L}{\partial y_t} \cdot W_{hy}^T + \left[\frac{\partial L}{\partial h_{t+1}} \odot \left(1 - h_{t+1} \odot h_{t+1}\right)\right] \cdot W_{hh}^T.$$

$$\frac{\partial L}{\partial h_t} = \frac{\partial L}{\partial y_t} \cdot W_{hy}^T + H_{t+1} \cdot W_{hh}^T.$$

\begin{align*} \frac{\partial L}{\partial b_h} &= H_t, \\ \frac{\partial L}{\partial W_{hh}} &= h_{t-1}^T \cdot H_t, \\ \frac{\partial L}{\partial W_{xh}} &= x_t^T \cdot H_t. \\ \end{align*}

$$\frac{\partial L}{\partial x_t} = H_t \cdot W_{xh}^T.$$

# import
class RNN(npnet.Model):

def __init__(self, input_size, hidden_size, output_size):
super().__init__(input_size=input_size,
hidden_size=hidden_size,
output_size=output_size)
params = self.parameters
n, k, m = input_size, hidden_size, output_size
params['Wxh'] = np.random.randn(n, k) * np.sqrt(2.0 / (n + k))
params['Whh'] = np.random.randn(k, k) * np.sqrt(1.0 / k)
params['Why'] = np.random.randn(k, m) * np.sqrt(2.0 / (k + m))
params['bh'] = np.random.randn(1, k)
params['by'] = np.random.randn(1, m)

# 隐藏状态，因为 h_0 的存在，我们要特别注意指标的使用
self.hiddens = [np.zeros_like(params['bh'])]

def forward(self, input):
self.input = input
hs = [self.hiddens[-1]] # 只保留上次计算的最后一个隐藏状态
params = self.parameters
output = []
for x in input:
h = x @ params['Wxh'] + hs[-1] @ params['Whh'] + params['bh']
h = np.tanh(h)
hs.append(h)
y = h @ params['Why'] + params['by']
output.append(y[0])
self.hiddens = hs
return np.array(output)

params = self.parameters
x = self.input
dx = [None] * len(x)
h = self.hiddens
H = np.zeros_like(h[0])

for t in range(len(grad)-1, -1, -1):
# 注意隐藏状态数组的指标使用

dh = dy @ params['Why'].T + H @ params['Whh'].T
H = dh * (1 - h[t+1] * h[t+1]) # H_t
pgrad['Wxh'] += x[t: t+1].T @ H
dx[t] = (H @ params['Wxh'].T)[0]

return np.array(dx)

def clear_hidden_states(self):
self.hiddens = [np.zeros_like(self.parameters['bh'])]


# import

def forward(self, *args, **kwargs):
self.model.clear_hidden_states()
return self.model.forward(*args, **kwargs)


RNNGradientCheck(RNN(10, 20, 10), input_shape=(10, 10)).check()

True

# 拟合 sin 函数

x = np.arange(200).reshape(-1, 1)
y = np.sin(x)

sin = RNN(input_size=1, hidden_size=100, output_size=1)
train_x = y[:-1]
train_y = y[1:]

epoch = 100
learning_rate = 1e-4
optim = npnet.RMSprop(sin, lr=learning_rate)
criterion = npnet.MSELoss()
batch_size = 2
batch_num = len(train_x) // batch_size
if len(train_x) % batch_size != 0:
batch_num += 1

losses = []
for ep in range(epoch):
loss = 0
for i in range(batch_num):
b = i * batch_size
e = b + batch_size
inp = train_x[b: e]
tar = train_y[b: e]
out = sin.forward(inp)
loss += criterion.forward(out, tar)
optim.step()
losses.append(loss)

plt.plot(np.arange(len(losses)), losses)
plt.title('Loss vs Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()


plt.scatter(test_y, pred_y)
plt.plot(np.arange(-1.2, 1.2, 0.1), np.arange(-1.2, 1.2, 0.1), color='r')
plt.xlabel('true sin values')
plt.ylabel('predicted sin values')
plt.show()