NLP

文本预处理-下

新闻主题分类任务案例

Posted by 新宇 on September 10, 2020

一、案例介绍

以一段新闻报道中的文本描述内容为输入, 使用模型帮助我们判断它最有可能属于哪一种类型的新闻, 这是典型的文本分类问题, 我们这里假定每种类型是互斥的, 即文本描述有且只有一种类型.

二、优化方案

  • 文本长度规范(已实现)
    • 进入模型前需要对每条文本数值映射后的长度进行规范
      • 超过限制cutlen的语句切割
      • 未超过cutlen的语句进行补齐(重复复制该语句至cutlen)
        • 这里没有使用0填充,因为梯度爆炸了
  • 数据增强(未实现)
    • 回译数据增强法
      • google API只能一条一条翻译,120000条数据全部翻译完成需要26个小时。。。
  • 设置n-gram特征为2
    • 效果不理想

三、代码实现

1. 导入数据并查看数据分布

# 导入相关的torch工具包

import torch
import torchtext
# 导入torchtext.datasets中的文本分类任务

from torchtext.datasets import text_classification
import os

# 定义数据下载路径, 当前路径的data文件夹

load_data_path = "./data"
# 如果不存在该路径, 则创建这个路径

if not os.path.isdir(load_data_path):
    os.mkdir(load_data_path)

# 选取torchtext中的文本分类数据集'AG_NEWS'即新闻主题分类数据, 保存在指定目录下
# 并将数值映射后的训练和验证数据加载到内存中

train_dataset, test_dataset = text_classification.DATASETS['AG_NEWS'](root=load_data_path)

# 以上方法失败,使用下面方法导入数据
# 导入google翻译接口工具
from google_trans_new import google_translator
import pandas as pd 

translator = google_translator()
# 回译方法
def translate(string):
    # 进行第一次翻译, 翻译目标是韩语
    korean = translator.translate(string, lang_src='en',lang_tgt='ko')
    # 最后在翻译回中文, 完成回译全部流程
    english = translator.translate(korean, lang_src='ko',lang_tgt='en')
    return english

def setup_datasets(dataset_tar='./data_new/ag_news_csv.tar.gz', ngrams=1, vocab=None, include_unk=False):
    extracted_files = extract_archive(dataset_tar)

    for fname in extracted_files:
        if fname.endswith('train.csv'):
            train_csv_path = fname
        if fname.endswith('test.csv'):
            test_csv_path = fname
	'''
    优化2: 回译增强法(API太慢了,需要26H才能全部翻译完成)
    实例化翻译对象
    
    train_csv = pd.read_csv(train_csv_path, header=None)
    ori_len = train_csv.shape[0]
    train_csv_new = pd.concat([train_csv,train_csv])
    for i in range(ori_len, train_csv_new.shape[0]):
        train_csv_new.iloc[i,2] = translate(train_csv_new.iloc[0,2])
    train_csv_new.to_csv(train_csv_path)
    print('回译增强完成!')
	'''

    if vocab is None:
        vocab = build_vocab_from_iterator(_csv_iterator(train_csv_path, ngrams))
    else:
        if not isinstance(vocab, Vocab):
            raise TypeError("Passed vocabulary is not of type Vocab")

    train_data, train_labels = _create_data_from_iterator(
        vocab, _csv_iterator(train_csv_path, ngrams, yield_cls=True), include_unk)
    test_data, test_labels = _create_data_from_iterator(
        vocab, _csv_iterator(test_csv_path, ngrams, yield_cls=True), include_unk)
    if len(train_labels ^ test_labels) > 0:
        raise ValueError("Training and test labels don't match")
    return (TextClassificationDataset(vocab, train_data, train_labels),
            TextClassificationDataset(vocab, test_data, test_labels))
# 查看数据分布
# 导入必备工具包

import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
# 设置显示风格

plt.style.use('fivethirtyeight') 

# 分别读取训练tsv和验证tsv

train_data = pd.read_csv("./data_new/ag_news_csv/train.csv", header=None)
valid_data = pd.read_csv("./data_new/ag_news_csv/test.csv", header=None)


# # 获得训练数据标签数量分布

sns.countplot(train_data.columns[0],data=train_data)
plt.title("train_data")
plt.show()


# # 获取验证数据标签数量分布

sns.countplot(valid_data.columns[0], data=valid_data)
plt.title("valid_data")
plt.show()

# 查看语句长度分布

train_data["sentence_length"] = list(map(lambda x: len(x), train_data[train_data.columns[2]]))
sns.countplot("sentence_length", data = train_data)
plt.xticks([])
plt.show()

# 查看测试集语句长度分布

valid_data["sentence_length"] = list(map(lambda x: len(x), valid_data[valid_data.columns[2]]))
sns.countplot("sentence_length", data = valid_data)
plt.xticks([])
plt.show()

# 查看训练集散点分布

sns.stripplot(y='sentence_length',x=train_data.columns[0],data=train_data)

# 查看测试集散点分布
sns.stripplot(y='sentence_length',x=valid_data.columns[0],data=valid_data)

2. 构建带有Embedding层的文本分类模型

# 指定BATCH_SIZE的大小

BATCH_SIZE = 16
# 定义文本分类模型

class TextSentiment(nn.Module):
    # num_embeddings 词向量个数
    # embedding_dim 词嵌入维度

    def __init__(self, num_embeddings, embedding_dim, num_class):
        """
        description: 类的初始化函数
        :param vocab_size: 整个语料包含的不同词汇总数
        :param embed_dim: 指定词嵌入的维度
        :param num_class: 文本分类的类别总数
        """ 

        super(TextSentiment, self).__init__()
        # 定义embedding层

        self.embedding = nn.Embedding(num_embeddings, embedding_dim)
#         self.bn = nn.BatchNorm1d(embedding_dim)
        # 定义全链接层

        self.fc = nn.Linear(embedding_dim, num_class)
        # 初始化权重

        self.init_weights()
    
    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange,initrange)
        self.fc.weight.data.uniform_(-initrange,initrange)
        self.fc.bias.data.zero_()
    
    def forward(self, x):
        embedded = self.embedding(x)
        # c可以看作该batch(16个)input_sequence中词的平均数量

        c = embedded.size(0) // BATCH_SIZE
        # 之后再从embedded中取c*BATCH_SIZE个向量得到新的embedded
        # 这个新的embedded中的向量个数可以整除BATCH_SIZE
        # BATCH_SIZE*c之后的数据全部扔掉

        embedded = embedded[:BATCH_SIZE*c]
        # 因为我们想利用平均池化的方法求embedded中指定行数的列的平均数,
        # 但平均池化方法是作用在行上的, 并且需要3维输入
        # 因此我们对新的embedded进行转置并拓展维度

        embedded = embedded.transpose(1,0).unsqueeze(0)
        # 然后就是调用平均池化的方法, 并且核的大小为c
        # 即取每c的元素计算一次均值作为结果

        embedded = F.avg_pool1d(embedded, kernel_size=c)
        # nn.Linear要求的input是一个二维tensor
        # 最后,还需要减去新增的维度, 然后转置回去输送给fc层

        return self.fc(embedded[0].transpose(1,0))

# 获取整个训练集的词汇数量(去重)

VOCAB_SIZE = len(train_dataset.get_vocab())
# 指定词嵌入维度

EMBED_DIM = 32
# 获得类别总数

NUM_CLASS = len(train_dataset.get_labels())
# 实例化模型

model = TextSentiment(VOCAB_SIZE, EMBED_DIM, NUM_CLASS).to(device)

3. 对数据进行batch处理

# 优化1: 根据散点图获得训练集、测试集平均词长度: train:183 test:182,然后截断或填充

from tensorflow.keras.preprocessing import sequence 
def generate_batch(batch, cutlen=183):
    label = torch.tensor([entry[0] for entry in batch ])
    text = []
    for entry in batch:
        # 此方式loss太大
        # t = sequence.pad_sequences([entry[1].numpy().tolist()], cutlen)

        t = entry[1].numpy().tolist()
        while cutlen - len(t) > 0:
            t = t*2
        t = sequence.pad_sequences([t], cutlen)
        text.append(torch.tensor(t[0],dtype=torch.int64))
    text = torch.cat(text)
    return text, label

# 假设一个输入:

batch = [(1, torch.tensor([3, 23, 2, 8,1,2,43,5,67,4,1,2,3,4])), (0, torch.tensor([3, 45, 21, 6,5]))]
res = generate_batch(batch,cutlen=10)
print(res)

4. 构建训练与验证函数

from torch.utils.data import DataLoader

def train(train_data):
    # 初始化训练损失和准确率为0

    train_loss = 0
    train_acc = 0
    
    # 使用数据加载器生成BATCH_SIZE大小的数据进行批次训练
    # data就是N多个generate_batch函数处理后的BATCH_SIZE大小的数据生成器

    data = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True,collate_fn=generate_batch)
    
    # 对data进行循环遍历,使用每个batch的数据进行参数更新

    for i,(text, cls) in enumerate(data):
#         print(f'第i批次的词个数为:',text.size())

        text = text.to(device)
        cls = cls.to(device)
        # 设置优化器的初始梯度为0

        optimizer.zero_grad()
        # 模型输入一个批次的数据,获得输出

        output = model(text)
        # 根据真是标签与模型输出计算损失

        loss = criterion(output, cls)
        # 将该批次的损失加到总损失中

        train_loss+=loss.item()
        # 误差反向传播

        loss.backward()
        # 参数更新

        optimizer.step()
        # 将该批次的准确率加到总准确率中

        train_acc += (output.argmax(1) == cls).sum().item()
    
    # 调整优化学习率

    scheduler.step()
    
    # 返回本轮训练的平均损失和平均准确率

    return train_loss / len(train_data), train_acc / len(train_data)
        
def valid(valid_data):
    """模型验证函数"""
    # 初始化验证损失和准确率为0

    loss = 0
    acc = 0

    # 和训练相同, 使用DataLoader获得训练数据生成器

    data = DataLoader(valid_data, batch_size=BATCH_SIZE, collate_fn=generate_batch)
    
    # 验证阶段, 不再求解梯度

    with torch.no_grad():
        # 按批次取出数据验证

        for text, cls in data:
            text = text.to(device)
            cls = cls.to(device)

            # 使用模型获得输出

            output = model(text)
            # 计算损失

            loss = criterion(output, cls)
            # 将损失和准确率加到总损失和准确率中

            loss += loss.item()
            acc += (output.argmax(1) == cls).sum().item()

    # 返回本轮验证的平均损失和平均准确率

    return loss / len(valid_data), acc / len(valid_data)     

5. 进行模型训练和验证

# 导入时间工具包

import time

# 导入数据随机划分方法工具

from torch.utils.data.dataset import random_split

# 指定训练轮数

N_EPOCHS = 10

# 定义初始的验证损失

min_valid_loss = float('inf')

# 选择损失函数, 这里选择预定义的交叉熵损失函数

criterion = torch.nn.CrossEntropyLoss().to(device)
# 选择随机梯度下降优化器

optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
# 选择优化器步长调节方法StepLR, 用来衰减学习率

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

# 从train_dataset取出0.95作为训练集, 先取其长度

train_len = int(len(train_dataset) * 0.95)

# 然后使用random_split进行乱序划分, 得到对应的训练集和验证集

sub_train_, sub_valid_ = random_split(train_dataset, [train_len, len(train_dataset) - train_len])

# 开始每一轮训练

for epoch in range(N_EPOCHS):
    # 记录概论训练的开始时间

    start_time = time.time()
    # 调用train和valid函数得到训练和验证的平均损失, 平均准确率

    train_loss, train_acc = train(sub_train_)
    valid_loss, valid_acc = valid(sub_valid_)

    # 计算训练和验证的总耗时(秒)

    secs = int(time.time() - start_time)
    # 用分钟和秒表示

    mins = secs / 60
    secs = secs % 60

    # 打印训练和验证耗时,平均损失,平均准确率

    print('Epoch: %d' %(epoch + 1), " | time in %d minutes, %d seconds" %(mins, secs))
    print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
    print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')

6. 查看embedding层嵌入的词向量

# 打印从模型的状态字典中获得的Embedding矩阵

print(model.state_dict()['embedding.weight'])

7. 查看验证集准确率

def valid_test(test_data):
    """模型验证函数"""
    # 初始化验证损失和准确率为0

    loss = 0
    acc = 0

    # 和训练相同, 使用DataLoader获得训练数据生成器

    data = DataLoader(test_data, batch_size=BATCH_SIZE, collate_fn=generate_batch)
    
    # 验证阶段, 不再求解梯度

    with torch.no_grad():
        # 按批次取出数据验证

        for text, cls in data:
            text = text.to(device)
            cls = cls.to(device)
            # 使用模型获得输出

            output = model(text)
            # 计算损失

            loss = criterion(output, cls)
            # 将损失和准确率加到总损失和准确率中

            loss += loss.item()
            acc += (output.argmax(1) == cls).sum().item()

    # 返回本轮验证的平均损失和平均准确率

    return loss / len(test_data), acc / len(test_data)      

loss, acc = valid_test(test_dataset)
print('测试集loss为: %.5f' % (torch.Tensor.cpu(loss).numpy()))
print('测试集acc为: % .5f' % (acc))

8. 保存模型

# 模型保存 
# 首先设定模型的保存路径 

PATH = './news_topic_model.pth'
# 保存模型的状态字典 

torch.save(model.state_dict(), PATH)

四、测试集Acc

五、模型

密码: c5pu news_topic_model.pth