Naive RNN

Link:

作为底层码畜,有些东西不写一遍总是心里没底。

RNN这块还没写过,于是参考上古文章:

https://karpathy.github.io/2015/05/21/rnn-effectiveness/

我拿各种东西(福尔摩斯、红楼梦、朱子家训、三字经)都试了试,效果都不怎样。但是看现象是有学习到结构的,起码三字经的结构学到了:

人教平,記於字。稻再農,擇句成。凡梁賢,當為興。光太官,錐刺嬴。爾教序,習仁星。趙教學,終機子。頭五者,作四西。爾于子,有公緒。彼弟老,老聰人。遼人常,書一警。教秦歲,人則建。古二子,宜北人。除總常,受春民。朝不子,學訓茲。湯為廷,至易信。

这是从Gutenberg计划里面下载的三字经训练的结果。起码知道三字一逗号,再三字一句号了……训练初期这个模式是没有的,大概都是读三字经30次左右后结构稳定下来。

所以,姑且认为自己代码没出大错吧……

主要是要存整个历史记录算梯度,太麻烦了。梯度也是拐弯抹角的,容易弄糊涂。

import numpy as np
import sys
import math


def load_file(file):
    lines = [line.strip() for line in list(open(file))]
    char_set = set(''.join(lines))
    return lines, char_set


def one_hot(char, chars_table):
    en = np.zeros((1, len(chars_table)))
    en[0, chars_table[char]] = 1
    return en


lines, chars = load_file(sys.argv[1])
char2idx = dict([(char, i) for i, char in enumerate(sorted(chars))])
idx2char = dict([(i, char) for char, i in char2idx.items()])
db = ''.join(lines)


class Node(object):
    def __call__(self, *args):
        return self.forward(*args)


class Linear(Node):
    def __init__(self, in_, out_):
        self.w = np.random.randn(in_, out_) * 0.01
        self.b = np.zeros((1, out_))

    def forward(self, x):
        return np.dot(x, self.w) + self.b

    def backward(self, x, error):
        grad_w = np.dot(x.T, error)
        grad_b = np.zeros_like(self.b) + error
        back_error = np.dot(error, self.w.T)
        return back_error, grad_w, grad_b

    def update(self, steps):
        # assert(len(steps) == 2)
        gw, gb = steps
        self.w -= gw
        self.b -= gb


class Tanh(Node):
    def forward(self, x):
        return np.tanh(x)

    def backward(self, x, error):
        grad = 1 - np.tanh(x)**2
        return grad * error


class Softmax(Node):
    def forward(self, x):
        return np.exp(x) / np.sum(np.exp(x))

    def backward(self, p, target):
        ret = np.copy(p)
        ret[target] -= 1
        return ret


class CrossEntropy(Node):
    def forward(self, probs, target):
        return -np.log(probs[target])


class CharNN(Node):
    def __init__(self, vocab_size, hidden_size):
        self.wh = Linear(hidden_size, hidden_size)
        self.wx = Linear(vocab_size, hidden_size)
        self.wy = Linear(hidden_size, vocab_size)
        self.tanh = Tanh()
        self.softmax = Softmax()
        self.reset_history()
        self.hidden_size = hidden_size

    def reset_history(self):
        self.xs = []
        self.hts = []
        self.hs = []
        self.ys = []
        self.t = 0

    def forward(self, x, h_prev):
        self.t += 1
        # For wx backprop
        self.xs.append(x)
        ht = self.wx(x) + self.wh(h_prev)
        # For tanh backprop
        self.hts.append(ht)
        hidden = self.tanh(ht)
        # For wy backprop
        self.hs.append(hidden)
        y = self.wy(hidden)

        # For softmax backprop
        y = self.softmax(y)
        self.ys.append(y)

        return y, hidden

    def backward(self, target, dh_from_last):
        self.t -= 1
        y_error = self.softmax.backward(self.ys[self.t], target)
        dh_from_y, wy_grad_w, wy_grad_b = self.wy.backward(self.hs[self.t], y_error)
        dh = dh_from_last + dh_from_y
        tanh_error = self.tanh.backward(self.hts[self.t], dh)
        wx_error, wx_grad_w, wx_grad_b = self.wx.backward(self.xs[self.t], tanh_error)

        if self.t == 0:
            h_t = np.zeros_like(self.hs[0])
        else:
            h_t = self.hs[self.t - 1]

        wh_error, wh_grad_w, wh_grad_b = self.wh.backward(h_t, tanh_error)
        dh_to_prev = wh_error

        return [wh_grad_w, wh_grad_b, wx_grad_w, wx_grad_b, wy_grad_w, wy_grad_b], dh_to_prev

    def update(self, steps):
        swh = steps[:2]
        swx = steps[2:4]
        swy = steps[4:6]
        self.wh.update(swh)
        self.wx.update(swx)
        self.wy.update(swy)
        self.reset_history()


def accumulate(dws_t, dws):
    if dws is None:
        return [np.copy(t) for t in dws_t]
    else:
        return [t + w for t, w in zip(dws_t, dws)]


def train_partial_sequence(nn, h, inputs, targets):
    criterion = CrossEntropy()
    loss = 0
    for i, target in zip(inputs, targets):
        input_ = one_hot(i, char2idx)
        pred, h = nn(input_, h)
        t_idx = char2idx[target]
        loss += criterion(pred, (0, t_idx))

    dh = np.zeros_like(h)
    dw = None
    for t in reversed(targets):
        t_idx = char2idx[t]
        dw_t, dh = nn.backward((0, t_idx), dh)
        dw = accumulate(dw_t, dw)

    clipped_dw = [np.clip(w, -5, 5) for w in dw]
    return loss, clipped_dw, h


def sample(nn, h, idx):
    en = np.zeros((1, len(chars)))
    en[0, idx] = 1
    ret = [idx]
    for i in range(120):
        pred, h = nn(en, h)
        ix = np.random.choice(range(len(chars)), p=pred.ravel())
        en = np.zeros((1, len(chars)))
        en[0, ix] = 1
        ret.append(ix)
    return ret


def train(nn, lr):
    seq_len = 40
    p = 0
    n = 0
    epochs = 0
    h = np.zeros(nn.hidden_size)

    smooth_loss = None
    while True:
        n += 1
        if p + seq_len >= len(db):
            epochs += 1
            print("Epoch %d finished." % epochs)
            print(''.join([idx2char[i] for i in sample(nn, h, char2idx['人'])]))
            p = 0
            h = np.zeros(nn.hidden_size)
            nn.reset_history()

        inputs = db[p:p + seq_len]
        targets = db[p + 1:p + 1 + seq_len]
        p += seq_len
        loss, ws, h = train_partial_sequence(nn, h, inputs, targets)
        if smooth_loss is None:
            smooth_loss = loss
        else:
            smooth_loss = 0.999*smooth_loss + 0.001*loss
        if n % 100 == 0:
            print('iter %d, loss: %f, raw loss: %f' % (n, smooth_loss, loss))
        steps = [w * lr for w in ws]
        nn.update(steps)


model = CharNN(len(chars), 100)
train(model, 0.01)