深度有趣 | 23 歌词古诗自动生成

简介

使用RNN实现歌词和古诗的自动生成git

RNN多用于处理序列数据,经过学习数据上下文之间的关系,能够在给定若干个连续数据点的基础上,预测下一个可能的数据点github

如下是最基础的RNN公式,固然也能够使用LSTM(Long Short-Term Memory)或GRU(Gated Recurrent Unit)生成序列算法

h_t=tanh(W_{hh}h_{t-1}+W_{xh}x_t)+b_h
y_t=W_{hy}h_t+b_y

准备

一些序列数据,这里咱们主要使用文本,例如歌词和古诗等数据库

手动版

先来个最手动的版本,用numpy实现歌词生成。歌词爬取自网络,主要参考了如下代码,gist.github.com/karpathy/d4…json

加载库和歌词,去掉英文占比较多的歌词(可能为英文歌),还剩36616首歌缓存

# -*- coding: utf-8 -*-

import numpy as np

sentences = []
with open('../lyrics.txt', 'r', encoding='utf8') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip()
        count = 0
        for c in line:
            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):
                count += 1
        if count / len(line) < 0.1:
            sentences.append(line)

print('共%d首歌' % len(sentences))
复制代码

整理字和id之间的映射,共10131个字bash

chars = {}
for sentence in sentences:
    for c in sentence:
        chars[c] = chars.get(c, 0) + 1
chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)
chars = [char[0] for char in chars]
vocab_size = len(chars)
print('共%d个字' % vocab_size, chars[:20])

char2id = {c: i for i, c in enumerate(chars)}
id2char = {i: c for i, c in enumerate(chars)}
复制代码

定义一些训练参数和模型参数,整理训练数据网络

hidden_size = 100
maxlen = 25
learning_rate = 0.1

X_data = []
Y_data = []
for sentence in sentences:
    for i in range(0, len(sentence) - maxlen - 1, maxlen):
        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])
        Y_data.append([char2id[c] for c in sentence[i + 1: i + maxlen + 1]])

print(len(X_data))

Wxh = np.random.randn(hidden_size, vocab_size) * 0.01
Whh = np.random.randn(hidden_size, hidden_size) * 0.01
Why = np.random.randn(vocab_size, hidden_size) * 0.01
bh = np.zeros((hidden_size, 1))
by = np.zeros((vocab_size, 1))
复制代码

损失函数app

def lossFun(inputs, targets, hprev):
    xs, hs, ys, ps = {}, {}, {}, {}
    hs[-1] = np.copy(hprev)
    loss = 0

    # forward pass
    for t in range(len(inputs)):
        xs[t] = np.zeros((vocab_size, 1))
        xs[t][inputs[t]] = 1
        hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t - 1]) + bh)
        ys[t] = np.dot(Why, hs[t]) + by
        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t]))
        loss += -np.log(ps[t][targets[t], 0])
    
    # backward pass
    dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
    dbh, dby = np.zeros_like(bh), np.zeros_like(by)
    dhnext = np.zeros_like(hs[0])
    for t in reversed(range(len(inputs))):
        dy = np.copy(ps[t])
        dy[targets[t]] -= 1
        dWhy += np.dot(dy, hs[t].T)
        dby += dy
        dh = np.dot(Why.T, dy) + dhnext
        dhraw = (1 - hs[t] * hs[t]) * dh
        dbh += dhraw
        dWxh += np.dot(dhraw, xs[t].T)
        dWhh += np.dot(dhraw, hs[t-1].T)
        dhnext = np.dot(Whh.T, dhraw)
    
    for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
        np.clip(dparam, -5, 5, out=dparam)

    return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs) - 1]
复制代码

样本生成函数,每通过若干轮迭代就调用一次dom

def sample(h, seed_ix, n):
    x = np.zeros((vocab_size, 1))
    x[seed_ix] = 1
    ixes = []
    for t in range(n):
        h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh)
        y = np.dot(Why, h) + by
        p = np.exp(y) / np.sum(np.exp(y))
        ix = np.random.choice(range(vocab_size), p=p.ravel())
        ixes.append(ix)
        
        x = np.zeros((vocab_size, 1))
        x[ix] = 1
    
    return ixes
复制代码

初始化训练变量,这里使用Adagrad优化算法,因此须要一些额外的缓存变量

n = 0
mWxh, mWhh, mWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
mbh, mby = np.zeros_like(bh), np.zeros_like(by)
smooth_loss = -np.log(1.0 / vocab_size) * maxlen
复制代码

训练模型,会一直循环进行

while True:
    if n == 0 or n == len(X_data): 
        hprev = np.zeros((hidden_size, 1))
        n = 0
    
    X = X_data[n]
    Y = Y_data[n]

    loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(X, Y, hprev)
    smooth_loss = smooth_loss * 0.999 + loss * 0.001

    for param, dparam, mem in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby], [mWxh, mWhh, mWhy, mbh, mby]):
        mem += dparam * dparam
        param += -learning_rate * dparam / np.sqrt(mem + 1e-8)

    if n % 100 == 0:
        print('iter %d, loss: %f' % (n, smooth_loss))
        sample_ix = sample(hprev, X[0], 200)
        txt = ''.join(id2char[ix] for ix in sample_ix)
        print(txt)

    n += 1
复制代码

通过54W次迭代后,生成了这么一段话,虽然并不通顺,但彷佛确实学习到了一些词语和句法

颜悲 心已中雨著街眼泪不知 留在这时祈忘的本身同样无常 你个人欢 当时是你能止学了绽开瞥袖 前朝来去勇气 让你是一双睡过之后  由于你飞雪中的街音里飞   此模糊的爱 只有谁要再多少时 管只是无度美醉不给主题衬  曾流盲双脚一片城自己边 来并肩常与满是一点和缺 好爱得也还记得证着多梦 愛 作人来 这吃碎 咱们精神蹲着你的门 口不信心终究理想透完了谁几度 我都在凭营力的光体 卖爱不说 爱你是个人好
复制代码

Keras

Keras官方提供了使用LSTM生成文本的示例

github.com/fchollet/ke…

简单地改一下,数据仍是使用以前的歌词

加载库

# -*- coding: utf-8 -*-

from keras.models import Sequential
from keras.layers import Dense, LSTM, Embedding
from keras.callbacks import LambdaCallback
import numpy as np
import random
import sys
import pickle
复制代码

加载数据,整理字和id之间的映射

sentences = []
with open('../lyrics.txt', 'r', encoding='utf8') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip()
        count = 0
        for c in line:
            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):
                count += 1
        if count / len(line) < 0.1:
            sentences.append(line)
print('共%d首歌' % len(sentences))

chars = {}
for sentence in sentences:
    for c in sentence:
        chars[c] = chars.get(c, 0) + 1
chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)
chars = [char[0] for char in chars]
vocab_size = len(chars)
print('共%d个字' % vocab_size, chars[:20])

char2id = {c: i for i, c in enumerate(chars)}
id2char = {i: c for i, c in enumerate(chars)}

with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([char2id, id2char], fw)
复制代码

整理训练数据,定义模型并编译

maxlen = 10
step = 3
embed_size = 128
hidden_size = 128
vocab_size = len(chars)
batch_size = 64
epochs = 20

X_data = []
Y_data = []
for sentence in sentences:
    for i in range(0, len(sentence) - maxlen, step):
        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])
        y = np.zeros(vocab_size, dtype=np.bool)
        y[char2id[sentence[i + maxlen]]] = 1
        Y_data.append(y)
X_data = np.array(X_data)
Y_data = np.array(Y_data)
print(X_data.shape, Y_data.shape)

model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embed_size, input_length=maxlen))
model.add(LSTM(hidden_size, input_shape=(maxlen, embed_size)))
model.add(Dense(vocab_size, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')
复制代码

定义序列样本生成函数

def sample(preds, diversity=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds + 1e-10) / diversity
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)
复制代码

定义每轮训练结束后的回调函数

def on_epoch_end(epoch, logs):
    print('-' * 30)
    print('Epoch', epoch)

    index = random.randint(0, len(sentences))
    for diversity in [0.2, 0.5, 1.0]:
        print('----- diversity:', diversity)
        sentence = sentences[index][:maxlen]
        print('----- Generating with seed: ' + sentence)
        sys.stdout.write(sentence)

        for i in range(400):
            x_pred = np.zeros((1, maxlen))
            for t, char in enumerate(sentence):
                x_pred[0, t] = char2id[char]

            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_char = id2char[next_index]

            sentence = sentence[1:] + next_char

            sys.stdout.write(next_char)
            sys.stdout.flush()
复制代码

训练模型并保存

model.fit(X_data, Y_data, batch_size=batch_size, epochs=epochs, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)])
model.save('song_keras.h5')
复制代码

使用如下代码调用模型生成歌词,需提供一句起始歌词

# -*- coding: utf-8 -*-

from keras.models import load_model
import numpy as np
import pickle
import sys

maxlen = 10
model = load_model('song_keras.h5')

with open('dictionary.pkl', 'rb') as fr:
    [char2id, id2char] = pickle.load(fr)

def sample(preds, diversity=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds + 1e-10) / diversity
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

sentence = '能不能给我一首歌的时间'
sentence = sentence[:maxlen]

diversity = 1.0
print('----- Generating with seed: ' + sentence)
print('----- diversity:', diversity)
sys.stdout.write(sentence)

for i in range(400):
    x_pred = np.zeros((1, maxlen))
    for t, char in enumerate(sentence):
        x_pred[0, t] = char2id[char]

    preds = model.predict(x_pred, verbose=0)[0]
    next_index = sample(preds, diversity)
    next_char = id2char[next_index]

    sentence = sentence[1:] + next_char

    sys.stdout.write(next_char)
    sys.stdout.flush()
复制代码

生成结果以下,比以前的结果彷佛好一些,有意义的词语和短句更多

能不能给我一首歌的时间 要去人还有古年 你表明我所的 只愿为你作下一个成熟 从那个歌声中  你的别思量 写你的画面走过了西陌上雨张 小水没忘了 我欲再感觉   我终于你开心哭过心事流出了我心痛 就看口提幽纹太多 独自一直行 你也在想 我感到最此的第一次 只想要闲想 穿行多高楼的星云 看见鞍上云 青竹琼楼又新叶 人潮春涌成度过 幸福呜 风雪落入丽筝凄凄 万顷枯枝回伸离袖弦  不幸以潮 到底必经认来我不变 都想你 这星辰 暮鼓 WA Lsevemusich hey Live 走进不在意 不肯天涯 如此温柔 不够支离 多巧认真和你还太平行 哎呀呀呀 呀呀呀呀呀呀呀啊嘿 饿很差去哪儿呀 那个人聪明? 王王之如下 下也难改徒有爱还能敢相离 拨开你的嘴角 相识的一见 到你的世界所世 才发现我也不会躲藏 让我决定有人担忧善良 像一我的世界心里长着  夜晚需来又头 与我专车征 战天几天不懂配游戏 也是本身应吗 你给我来的狠也
复制代码

TensorFlow

换一下工具和数据,使用TensorFlow实现古诗生成,使用如下数据,github.com/chinese-poe…

加载库

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import glob
import json
from collections import Counter
from tqdm import tqdm
from snownlp import SnowNLP
复制代码

加载数据,共105336首诗

poets = []
paths = glob.glob('chinese-poetry/json/poet.*.json')
for path in paths:
    data = open(path, 'r').read()
    data = json.loads(data)
    for item in data:
        content = ''.join(item['paragraphs'])
        if len(content) >= 24 and len(content) <= 32:
            content = SnowNLP(content)
            poets.append('[' + content.han + ']')

poets.sort(key=lambda x: len(x))
print('共%d首诗' % len(poets), poets[0], poets[-1])
复制代码

整理字和id之间的映射,共8072个不一样的字

chars = []
for item in poets:
    chars += [c for c in item]
print('共%d个字' % len(chars))

chars = sorted(Counter(chars).items(), key=lambda x:x[1], reverse=True)
print('共%d个不一样的字' % len(chars))
print(chars[:10])

chars = [c[0] for c in chars]
char2id = {c: i + 1 for i, c in enumerate(chars)}
id2char = {i + 1: c for i, c in enumerate(chars)}
复制代码

整理训练数据

batch_size = 64
X_data = []
Y_data = []

for b in range(len(poets) // batch_size):
    start = b * batch_size
    end = b * batch_size + batch_size
    batch = [[char2id[c] for c in poets[i]] for i in range(start, end)]
    maxlen = max(map(len, batch))
    X_batch = np.full((batch_size, maxlen - 1), 0, np.int32)
    Y_batch = np.full((batch_size, maxlen - 1), 0, np.int32)

    for i in range(batch_size):
        X_batch[i, :len(batch[i]) - 1] = batch[i][:-1]
        Y_batch[i, :len(batch[i]) - 1] = batch[i][1:]
    
    X_data.append(X_batch)
    Y_data.append(Y_batch)
    
print(len(X_data), len(Y_data))
复制代码

定义模型结构和优化器

hidden_size = 256
num_layer = 2
embedding_size = 256

X = tf.placeholder(tf.int32, [batch_size, None])
Y = tf.placeholder(tf.int32, [batch_size, None])
learning_rate = tf.Variable(0.0, trainable=False)

cell = tf.nn.rnn_cell.MultiRNNCell(
    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], 
    state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)

embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))
embedded = tf.nn.embedding_lookup(embeddings, X)

# outputs: batch_size, max_time, hidden_size
# last_states: 2 tuple(two LSTM), 2 tuple(c and h)
# batch_size, hidden_size
outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)

outputs = tf.reshape(outputs, [-1, hidden_size])                # batch_size * max_time, hidden_size
logits = tf.layers.dense(outputs, units=len(char2id) + 1)       # batch_size * max_time, len(char2id) + 1
logits = tf.reshape(logits, [batch_size, -1, len(char2id) + 1]) # batch_size, max_time, len(char2id) + 1
probs = tf.nn.softmax(logits)                                   # batch_size, max_time, len(char2id) + 1

loss = tf.reduce_mean(tf.contrib.seq2seq.sequence_loss(logits, Y, tf.ones_like(Y, dtype=tf.float32)))
params = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)
optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))
复制代码

训练模型,共训练50轮

sess = tf.Session()
sess.run(tf.global_variables_initializer())

for epoch in range(50):
    sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch)))
    
    data_index = np.arange(len(X_data))
    np.random.shuffle(data_index)
    X_data = [X_data[i] for i in data_index]
    Y_data = [Y_data[i] for i in data_index]
    
    losses = []
    for i in tqdm(range(len(X_data))):
        ls_,  _ = sess.run([loss, optimizer], feed_dict={X: X_data[i], Y: Y_data[i]})
        losses.append(ls_)
    
    print('Epoch %d Loss %.5f' % (epoch, np.mean(losses)))
复制代码

保存模型,以便在单机上使用

saver = tf.train.Saver()
saver.save(sess, './poet_generation_tensorflow')

import pickle
with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([char2id, id2char], fw)
复制代码

在单机上使用模型生成古诗,可随机生成或生成藏头诗

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import pickle

with open('dictionary.pkl', 'rb') as fr:
    [char2id, id2char] = pickle.load(fr)

batch_size = 1
hidden_size = 256
num_layer = 2
embedding_size = 256

X = tf.placeholder(tf.int32, [batch_size, None])
Y = tf.placeholder(tf.int32, [batch_size, None])
learning_rate = tf.Variable(0.0, trainable=False)

cell = tf.nn.rnn_cell.MultiRNNCell(
    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], 
    state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)

embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))
embedded = tf.nn.embedding_lookup(embeddings, X)

outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)

outputs = tf.reshape(outputs, [-1, hidden_size])
logits = tf.layers.dense(outputs, units=len(char2id) + 1)
probs = tf.nn.softmax(logits)
targets = tf.reshape(Y, [-1])

loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets))
params = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)
optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('./'))

def generate():
    states_ = sess.run(initial_state)
    
    gen = ''
    c = '['
    while c != ']':
        gen += c
        x = np.zeros((batch_size, 1))
        x[:, 0] = char2id[c]
        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})
        probs_ = np.squeeze(probs_)
        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))
        c = id2char[pos]
    
    return gen[1:]

def generate_with_head(head):
    states_ = sess.run(initial_state)
    
    gen = ''
    c = '['
    i = 0
    while c != ']':
        gen += c
        x = np.zeros((batch_size, 1))
        x[:, 0] = char2id[c]
        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})
        probs_ = np.squeeze(probs_)
        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))

        if (c == '[' or c == '。' or c == ',') and i < len(head):
            c = head[i]
            i += 1
        else:
            c = id2char[pos]
    
    return gen[1:]

print(generate())
print(generate_with_head('深度学习'))
复制代码

生成结果以下,字数和标点符号都对上了,内容也像那么回事,反正也看不太懂

百计无意魄可无,知君又到两家书。自知君子有天禄,天下名通赤子虚。
深山宜数月交驰,度世曾徒有客期。学子今来能入楚,习家不瘿莫辞卑。
复制代码

参考

视频讲解课程

深度有趣(一)

相关文章
相关标签/搜索