一、案例介绍
- 这是一项使用GRU模型的文本生成任务,文本生成任务是NLP领域最具有挑战性的任务之一,我们将以一段文本或字符为输入,使用模型预测之后可能出现的文本内容,我们希望这些文本内容符合语法并能保持语义连贯性。
- 但是到目前为止,这是一项艰巨的任务,因此从实用角度出发,更多的尝试在与艺术类文本相关的任务中。
- 这里我们使用周杰伦的歌词进行文本生成任务
- 数据格式如下:
二、代码
1. 下载数据集并做文本预处理
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
# 打印tensorflow版本
print("Tensorflow Version:", tf.__version__)
import numpy as np
import os
import time
# 读取数据
path = './data/jay_chou.txt'
# path = './data/song.txt'
with open(path, 'rb') as f:
text = f.read().decode(encoding='utf-8')
# 统计字符个数并查看前250个字符
print('Length of text: {} characters'.format(len(text)))
print(text[:250])
# 统计文本中非重复字符数量
vocab = sorted(set(text))
print ('{} unique characters'.format(len(vocab)))
lines = text.strip().split('\n')
lines[:5]
# 按照字的粒度切割
# 对字符进行数值映射,将创建两个映射表:字符映射成数字,数字映射成字符
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
# 使用字符到数字的映射表示所有文本
text_as_int = np.array([char2idx[c] for c in text])
# 按照字的粒度切割
# 对字符进行数值映射,将创建两个映射表:字符映射成数字,数字映射成字符
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
# 使用字符到数字的映射表示所有文本
text_as_int = np.array([char2idx[c] for c in text])
# 生成训练数据
seq_length = 100
# 获取样本总数
examples_per_epoch = len(text)//seq_length
# 将数值映射后的文本转换成dataset对象方便后续处理
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
# 通过dataset的take方法以及映射表查看前5个字符
for i in char_dataset.take(5):
print(idx2char[i.numpy()])
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)
for item in sequences.take(5):
print(repr(''.join(idx2char[item.numpy()])))
def split_input_target(chunk):
"""划分输入序列和目标序列函数"""
# 前100个字符为输入序列,第二个字符开始到最后为目标序列
input_text = chunk[:-1]
target_text = chunk[1:]
return input_text, target_text
# 使用map方法调用该函数对每条序列进行划分
dataset = sequences.map(split_input_target)
# 查看划分后的第一批次结果
for input_example, target_example in dataset.take(1):
print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))
print(dataset)
# 查看将要输入模型中的每个时间步的输入和输出(以前五步为例)
# 循环每个字符,并打印每个时间步对应的输入和输出
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
print("Step {:4d}".format(i))
print(" input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
print(" expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))
# 定义批次大小为64
BATCH_SIZE = 64
# 设定缓冲区大小,以重新排列数据集
# 缓冲区越大数据混乱程度越高,所需内存也越大
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# 打印数据集对象查看数据张量形状
print(dataset)
2. 构建模型并训练模型
# 获得词汇集大小
vocab_size = len(vocab)
# 定义词嵌入维度
embedding_dim = 256
# 定义GRU的隐层节点数量
rnn_units = 1024
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential([
tf.keras.layers.Embedding(vocab_size, embedding_dim,
batch_input_shape=[batch_size, None]),
tf.keras.layers.GRU(rnn_units,
return_sequences=True,
stateful=True,
recurrent_initializer='glorot_uniform'),
tf.keras.layers.Dense(vocab_size)
])
return model
# 传入超参数构建模型
model = build_model(
vocab_size = len(vocab),
embedding_dim=embedding_dim,
rnn_units=rnn_units,
batch_size=BATCH_SIZE)
# 使用一个批次的数据作为输入
# 查看通过模型后的结果形状是否满足预期
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
# 查看模型参数情况
model.summary()
# 使用random categorical
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=6)
print(sampled_indices)
# squeeze表示消减一个维度
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
print(sampled_indices)
# 也将输入映射成文本内容
print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print()
# 映射这些索引查看对应的文本
# 在没有训练之前,生成的文本没有任何规律
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices ])))
# 添加损失函数
def loss(labels, logits):
return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
# 使用损失函数
example_batch_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", example_batch_loss.numpy().mean())
model.compile(optimizer='adam', loss=loss)
# 检查点保存至的目录
checkpoint_dir = './data/jay_chou/training_checkpoints'
# 检查点的文件名
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
# 创建检测点保存的回调对象
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True)
EPOCHS=100
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])
3. 使用模型生成文本内容
# 恢复模型结构
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
# 从检测点中获得训练后的模型参数
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
# 构建生成函数
def generate_text(model, start_string):
"""
:param model: 训练后的模型
:param start_string: 任意起始字符串
"""
# 要生成的字符个数
num_generate = 500
# 将起始字符串转换为数字(向量化)
input_eval = [char2idx[s] for s in start_string]
# 扩展维度满足模型输入要求
input_eval = tf.expand_dims(input_eval, 0)
# 空列表用于存储结果
text_generated = []
# 设定“温度参数”,根据tf.random_categorical方法特点,
# 温度参数能够调节该方法的输入分布中概率的差距,以便控制随机被选中的概率大小
temperature = 1.0
# 初始化模型参数
model.reset_states()
# 开始循环生成
for i in range(num_generate):
# 使用模型获得输出
predictions = model(input_eval)
# 删除批次的维度
predictions = tf.squeeze(predictions, 0)
# 使用“温度参数”和tf.random.categorical方法生成最终的预测字符索引
predictions = predictions / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
# 将预测的输出再扩展维度作为下一次的模型输入
input_eval = tf.expand_dims([predicted_id], 0)
# 将该次输出映射成字符存到列表中
text_generated.append(idx2char[predicted_id])
# 最后将初始字符串和生成的字符进行连接
return (start_string + ''.join(text_generated))
print(generate_text(model, start_string="刮风"))
三、输出
四、模型链接
密码: f6oh checkpoint
五、总结与思考
- 此模型运行了100个epoch,使用了全部的歌词数据
- 存在如下问题:
- 个别语句不通顺,不连贯;
- 在某些情况下会出现大量生僻字和字母;
- 某些歌词并不是真正创作出来的,和原歌词一模一样;
- 歌词和歌词间存在硬拼接,无法表达一首歌曲的核心思想和内涵;
- 原因分析:
- 数据量过少,能够学习的文本特征较少
- 共5862行数据,64806个汉字,存在过拟合问题
- 结合更多歌词,也许效果会更好
- 模型比较简单
- GRU可以捕捉上文的语意并预测下文
- 但不能结合上下文重点、提练核心思想
- 这应该也是目前NLP的瓶颈之一
- 数据量过少,能够学习的文本特征较少