NLP

周杰伦歌词的文本生成任务

整体介绍、架构图

Posted by 新宇 on September 17, 2020

一、案例介绍

  • 这是一项使用GRU模型的文本生成任务,文本生成任务是NLP领域最具有挑战性的任务之一,我们将以一段文本或字符为输入,使用模型预测之后可能出现的文本内容,我们希望这些文本内容符合语法并能保持语义连贯性。
  • 但是到目前为止,这是一项艰巨的任务,因此从实用角度出发,更多的尝试在与艺术类文本相关的任务中。
  • 这里我们使用周杰伦的歌词进行文本生成任务
  • 数据格式如下:

二、代码

1. 下载数据集并做文本预处理

from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
# 打印tensorflow版本

print("Tensorflow Version:", tf.__version__)
import numpy as np
import os
import time

# 读取数据

path = './data/jay_chou.txt'
# path = './data/song.txt'

with open(path, 'rb') as f:
    text = f.read().decode(encoding='utf-8')
    # 统计字符个数并查看前250个字符

    print('Length of text: {} characters'.format(len(text)))
    print(text[:250])
    # 统计文本中非重复字符数量

    vocab = sorted(set(text))
    print ('{} unique characters'.format(len(vocab)))

lines = text.strip().split('\n')
lines[:5]

# 按照字的粒度切割
# 对字符进行数值映射,将创建两个映射表:字符映射成数字,数字映射成字符

char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
# 使用字符到数字的映射表示所有文本

text_as_int = np.array([char2idx[c] for c in text])

# 按照字的粒度切割
# 对字符进行数值映射,将创建两个映射表:字符映射成数字,数字映射成字符

char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
# 使用字符到数字的映射表示所有文本

text_as_int = np.array([char2idx[c] for c in text])

# 生成训练数据

seq_length = 100
# 获取样本总数

examples_per_epoch = len(text)//seq_length
# 将数值映射后的文本转换成dataset对象方便后续处理

char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)

# 通过dataset的take方法以及映射表查看前5个字符

for i in char_dataset.take(5):
    print(idx2char[i.numpy()])


sequences = char_dataset.batch(seq_length+1, drop_remainder=True)
for item in sequences.take(5):
    print(repr(''.join(idx2char[item.numpy()])))

def split_input_target(chunk):
    """划分输入序列和目标序列函数"""
    # 前100个字符为输入序列,第二个字符开始到最后为目标序列

    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

# 使用map方法调用该函数对每条序列进行划分

dataset = sequences.map(split_input_target)

# 查看划分后的第一批次结果

for input_example, target_example in  dataset.take(1):
    print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
    print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))

print(dataset)

# 查看将要输入模型中的每个时间步的输入和输出(以前五步为例)
# 循环每个字符,并打印每个时间步对应的输入和输出

for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
    print("Step {:4d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

# 定义批次大小为64

BATCH_SIZE = 64

# 设定缓冲区大小,以重新排列数据集
# 缓冲区越大数据混乱程度越高,所需内存也越大

BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# 打印数据集对象查看数据张量形状

print(dataset)

2. 构建模型并训练模型

# 获得词汇集大小

vocab_size = len(vocab)

# 定义词嵌入维度

embedding_dim = 256

# 定义GRU的隐层节点数量

rnn_units = 1024

def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim,
                              batch_input_shape=[batch_size, None]),
          tf.keras.layers.GRU(rnn_units,
                              return_sequences=True,
                              stateful=True,
                              recurrent_initializer='glorot_uniform'),
          tf.keras.layers.Dense(vocab_size)
    ])
    return model 

# 传入超参数构建模型

model = build_model(
    vocab_size = len(vocab),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units,
    batch_size=BATCH_SIZE)

# 使用一个批次的数据作为输入
# 查看通过模型后的结果形状是否满足预期

for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

# 查看模型参数情况

model.summary()

# 使用random categorical

sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=6)
print(sampled_indices)
# squeeze表示消减一个维度

sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
print(sampled_indices)

# 也将输入映射成文本内容

print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print()

# 映射这些索引查看对应的文本
# 在没有训练之前,生成的文本没有任何规律

print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices ])))

# 添加损失函数

def loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

# 使用损失函数

example_batch_loss  = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", example_batch_loss.numpy().mean())

model.compile(optimizer='adam', loss=loss)

# 检查点保存至的目录

checkpoint_dir = './data/jay_chou/training_checkpoints'

# 检查点的文件名

checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

# 创建检测点保存的回调对象

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)


EPOCHS=100

history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])


3. 使用模型生成文本内容

# 恢复模型结构

model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)

# 从检测点中获得训练后的模型参数

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

# 构建生成函数

def generate_text(model, start_string):
    """
    :param model: 训练后的模型
    :param start_string: 任意起始字符串
    """
    # 要生成的字符个数

    num_generate = 500
    # 将起始字符串转换为数字(向量化)

    input_eval = [char2idx[s] for s in start_string]

    # 扩展维度满足模型输入要求

    input_eval = tf.expand_dims(input_eval, 0)

    # 空列表用于存储结果

    text_generated = []

    # 设定“温度参数”,根据tf.random_categorical方法特点,
    # 温度参数能够调节该方法的输入分布中概率的差距,以便控制随机被选中的概率大小 

    temperature = 1.0

    # 初始化模型参数

    model.reset_states()
    
    # 开始循环生成

    for i in range(num_generate):
        # 使用模型获得输出

        predictions = model(input_eval)
        # 删除批次的维度

        predictions = tf.squeeze(predictions, 0)

        # 使用“温度参数”和tf.random.categorical方法生成最终的预测字符索引

        predictions = predictions / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

        # 将预测的输出再扩展维度作为下一次的模型输入

        input_eval = tf.expand_dims([predicted_id], 0)

        # 将该次输出映射成字符存到列表中

        text_generated.append(idx2char[predicted_id])

    # 最后将初始字符串和生成的字符进行连接

    return (start_string + ''.join(text_generated))

print(generate_text(model, start_string="刮风"))

三、输出

四、模型链接

密码: f6oh checkpoint

五、总结与思考

  • 此模型运行了100个epoch,使用了全部的歌词数据
  • 存在如下问题:
    • 个别语句不通顺,不连贯;
    • 在某些情况下会出现大量生僻字和字母;
    • 某些歌词并不是真正创作出来的,和原歌词一模一样;
    • 歌词和歌词间存在硬拼接,无法表达一首歌曲的核心思想和内涵;
  • 原因分析:
    • 数据量过少,能够学习的文本特征较少
      • 共5862行数据,64806个汉字,存在过拟合问题
      • 结合更多歌词,也许效果会更好
    • 模型比较简单
      • GRU可以捕捉上文的语意并预测下文
      • 但不能结合上下文重点、提练核心思想
        • 这应该也是目前NLP的瓶颈之一