一、案例介绍
以一段新闻报道中的文本描述内容为输入, 使用模型帮助我们判断它最有可能属于哪一种类型的新闻, 这是典型的文本分类问题, 我们这里假定每种类型是互斥的, 即文本描述有且只有一种类型.
二、优化方案
- 文本长度规范(已实现)
- 进入模型前需要对每条文本数值映射后的长度进行规范
- 超过限制cutlen的语句切割
- 未超过cutlen的语句进行补齐(重复复制该语句至cutlen)
- 这里没有使用0填充,因为梯度爆炸了
- 进入模型前需要对每条文本数值映射后的长度进行规范
- 数据增强(未实现)
- 回译数据增强法
- google API只能一条一条翻译,120000条数据全部翻译完成需要26个小时。。。
- 回译数据增强法
- 设置n-gram特征为2
- 效果不理想
三、代码实现
1. 导入数据并查看数据分布
# 导入相关的torch工具包
import torch
import torchtext
# 导入torchtext.datasets中的文本分类任务
from torchtext.datasets import text_classification
import os
# 定义数据下载路径, 当前路径的data文件夹
load_data_path = "./data"
# 如果不存在该路径, 则创建这个路径
if not os.path.isdir(load_data_path):
os.mkdir(load_data_path)
# 选取torchtext中的文本分类数据集'AG_NEWS'即新闻主题分类数据, 保存在指定目录下
# 并将数值映射后的训练和验证数据加载到内存中
train_dataset, test_dataset = text_classification.DATASETS['AG_NEWS'](root=load_data_path)
# 以上方法失败,使用下面方法导入数据
# 导入google翻译接口工具
from google_trans_new import google_translator
import pandas as pd
translator = google_translator()
# 回译方法
def translate(string):
# 进行第一次翻译, 翻译目标是韩语
korean = translator.translate(string, lang_src='en',lang_tgt='ko')
# 最后在翻译回中文, 完成回译全部流程
english = translator.translate(korean, lang_src='ko',lang_tgt='en')
return english
def setup_datasets(dataset_tar='./data_new/ag_news_csv.tar.gz', ngrams=1, vocab=None, include_unk=False):
extracted_files = extract_archive(dataset_tar)
for fname in extracted_files:
if fname.endswith('train.csv'):
train_csv_path = fname
if fname.endswith('test.csv'):
test_csv_path = fname
'''
优化2: 回译增强法(API太慢了,需要26H才能全部翻译完成)
实例化翻译对象
train_csv = pd.read_csv(train_csv_path, header=None)
ori_len = train_csv.shape[0]
train_csv_new = pd.concat([train_csv,train_csv])
for i in range(ori_len, train_csv_new.shape[0]):
train_csv_new.iloc[i,2] = translate(train_csv_new.iloc[0,2])
train_csv_new.to_csv(train_csv_path)
print('回译增强完成!')
'''
if vocab is None:
vocab = build_vocab_from_iterator(_csv_iterator(train_csv_path, ngrams))
else:
if not isinstance(vocab, Vocab):
raise TypeError("Passed vocabulary is not of type Vocab")
train_data, train_labels = _create_data_from_iterator(
vocab, _csv_iterator(train_csv_path, ngrams, yield_cls=True), include_unk)
test_data, test_labels = _create_data_from_iterator(
vocab, _csv_iterator(test_csv_path, ngrams, yield_cls=True), include_unk)
if len(train_labels ^ test_labels) > 0:
raise ValueError("Training and test labels don't match")
return (TextClassificationDataset(vocab, train_data, train_labels),
TextClassificationDataset(vocab, test_data, test_labels))
# 查看数据分布
# 导入必备工具包
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
# 设置显示风格
plt.style.use('fivethirtyeight')
# 分别读取训练tsv和验证tsv
train_data = pd.read_csv("./data_new/ag_news_csv/train.csv", header=None)
valid_data = pd.read_csv("./data_new/ag_news_csv/test.csv", header=None)
# # 获得训练数据标签数量分布
sns.countplot(train_data.columns[0],data=train_data)
plt.title("train_data")
plt.show()
# # 获取验证数据标签数量分布
sns.countplot(valid_data.columns[0], data=valid_data)
plt.title("valid_data")
plt.show()
# 查看语句长度分布
train_data["sentence_length"] = list(map(lambda x: len(x), train_data[train_data.columns[2]]))
sns.countplot("sentence_length", data = train_data)
plt.xticks([])
plt.show()
# 查看测试集语句长度分布
valid_data["sentence_length"] = list(map(lambda x: len(x), valid_data[valid_data.columns[2]]))
sns.countplot("sentence_length", data = valid_data)
plt.xticks([])
plt.show()
# 查看训练集散点分布
sns.stripplot(y='sentence_length',x=train_data.columns[0],data=train_data)
# 查看测试集散点分布
sns.stripplot(y='sentence_length',x=valid_data.columns[0],data=valid_data)
2. 构建带有Embedding层的文本分类模型
# 指定BATCH_SIZE的大小
BATCH_SIZE = 16
# 定义文本分类模型
class TextSentiment(nn.Module):
# num_embeddings 词向量个数
# embedding_dim 词嵌入维度
def __init__(self, num_embeddings, embedding_dim, num_class):
"""
description: 类的初始化函数
:param vocab_size: 整个语料包含的不同词汇总数
:param embed_dim: 指定词嵌入的维度
:param num_class: 文本分类的类别总数
"""
super(TextSentiment, self).__init__()
# 定义embedding层
self.embedding = nn.Embedding(num_embeddings, embedding_dim)
# self.bn = nn.BatchNorm1d(embedding_dim)
# 定义全链接层
self.fc = nn.Linear(embedding_dim, num_class)
# 初始化权重
self.init_weights()
def init_weights(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange,initrange)
self.fc.weight.data.uniform_(-initrange,initrange)
self.fc.bias.data.zero_()
def forward(self, x):
embedded = self.embedding(x)
# c可以看作该batch(16个)input_sequence中词的平均数量
c = embedded.size(0) // BATCH_SIZE
# 之后再从embedded中取c*BATCH_SIZE个向量得到新的embedded
# 这个新的embedded中的向量个数可以整除BATCH_SIZE
# BATCH_SIZE*c之后的数据全部扔掉
embedded = embedded[:BATCH_SIZE*c]
# 因为我们想利用平均池化的方法求embedded中指定行数的列的平均数,
# 但平均池化方法是作用在行上的, 并且需要3维输入
# 因此我们对新的embedded进行转置并拓展维度
embedded = embedded.transpose(1,0).unsqueeze(0)
# 然后就是调用平均池化的方法, 并且核的大小为c
# 即取每c的元素计算一次均值作为结果
embedded = F.avg_pool1d(embedded, kernel_size=c)
# nn.Linear要求的input是一个二维tensor
# 最后,还需要减去新增的维度, 然后转置回去输送给fc层
return self.fc(embedded[0].transpose(1,0))
# 获取整个训练集的词汇数量(去重)
VOCAB_SIZE = len(train_dataset.get_vocab())
# 指定词嵌入维度
EMBED_DIM = 32
# 获得类别总数
NUM_CLASS = len(train_dataset.get_labels())
# 实例化模型
model = TextSentiment(VOCAB_SIZE, EMBED_DIM, NUM_CLASS).to(device)
3. 对数据进行batch处理
# 优化1: 根据散点图获得训练集、测试集平均词长度: train:183 test:182,然后截断或填充
from tensorflow.keras.preprocessing import sequence
def generate_batch(batch, cutlen=183):
label = torch.tensor([entry[0] for entry in batch ])
text = []
for entry in batch:
# 此方式loss太大
# t = sequence.pad_sequences([entry[1].numpy().tolist()], cutlen)
t = entry[1].numpy().tolist()
while cutlen - len(t) > 0:
t = t*2
t = sequence.pad_sequences([t], cutlen)
text.append(torch.tensor(t[0],dtype=torch.int64))
text = torch.cat(text)
return text, label
# 假设一个输入:
batch = [(1, torch.tensor([3, 23, 2, 8,1,2,43,5,67,4,1,2,3,4])), (0, torch.tensor([3, 45, 21, 6,5]))]
res = generate_batch(batch,cutlen=10)
print(res)
4. 构建训练与验证函数
from torch.utils.data import DataLoader
def train(train_data):
# 初始化训练损失和准确率为0
train_loss = 0
train_acc = 0
# 使用数据加载器生成BATCH_SIZE大小的数据进行批次训练
# data就是N多个generate_batch函数处理后的BATCH_SIZE大小的数据生成器
data = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True,collate_fn=generate_batch)
# 对data进行循环遍历,使用每个batch的数据进行参数更新
for i,(text, cls) in enumerate(data):
# print(f'第i批次的词个数为:',text.size())
text = text.to(device)
cls = cls.to(device)
# 设置优化器的初始梯度为0
optimizer.zero_grad()
# 模型输入一个批次的数据,获得输出
output = model(text)
# 根据真是标签与模型输出计算损失
loss = criterion(output, cls)
# 将该批次的损失加到总损失中
train_loss+=loss.item()
# 误差反向传播
loss.backward()
# 参数更新
optimizer.step()
# 将该批次的准确率加到总准确率中
train_acc += (output.argmax(1) == cls).sum().item()
# 调整优化学习率
scheduler.step()
# 返回本轮训练的平均损失和平均准确率
return train_loss / len(train_data), train_acc / len(train_data)
def valid(valid_data):
"""模型验证函数"""
# 初始化验证损失和准确率为0
loss = 0
acc = 0
# 和训练相同, 使用DataLoader获得训练数据生成器
data = DataLoader(valid_data, batch_size=BATCH_SIZE, collate_fn=generate_batch)
# 验证阶段, 不再求解梯度
with torch.no_grad():
# 按批次取出数据验证
for text, cls in data:
text = text.to(device)
cls = cls.to(device)
# 使用模型获得输出
output = model(text)
# 计算损失
loss = criterion(output, cls)
# 将损失和准确率加到总损失和准确率中
loss += loss.item()
acc += (output.argmax(1) == cls).sum().item()
# 返回本轮验证的平均损失和平均准确率
return loss / len(valid_data), acc / len(valid_data)
5. 进行模型训练和验证
# 导入时间工具包
import time
# 导入数据随机划分方法工具
from torch.utils.data.dataset import random_split
# 指定训练轮数
N_EPOCHS = 10
# 定义初始的验证损失
min_valid_loss = float('inf')
# 选择损失函数, 这里选择预定义的交叉熵损失函数
criterion = torch.nn.CrossEntropyLoss().to(device)
# 选择随机梯度下降优化器
optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
# 选择优化器步长调节方法StepLR, 用来衰减学习率
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)
# 从train_dataset取出0.95作为训练集, 先取其长度
train_len = int(len(train_dataset) * 0.95)
# 然后使用random_split进行乱序划分, 得到对应的训练集和验证集
sub_train_, sub_valid_ = random_split(train_dataset, [train_len, len(train_dataset) - train_len])
# 开始每一轮训练
for epoch in range(N_EPOCHS):
# 记录概论训练的开始时间
start_time = time.time()
# 调用train和valid函数得到训练和验证的平均损失, 平均准确率
train_loss, train_acc = train(sub_train_)
valid_loss, valid_acc = valid(sub_valid_)
# 计算训练和验证的总耗时(秒)
secs = int(time.time() - start_time)
# 用分钟和秒表示
mins = secs / 60
secs = secs % 60
# 打印训练和验证耗时,平均损失,平均准确率
print('Epoch: %d' %(epoch + 1), " | time in %d minutes, %d seconds" %(mins, secs))
print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')
6. 查看embedding层嵌入的词向量
# 打印从模型的状态字典中获得的Embedding矩阵
print(model.state_dict()['embedding.weight'])
7. 查看验证集准确率
def valid_test(test_data):
"""模型验证函数"""
# 初始化验证损失和准确率为0
loss = 0
acc = 0
# 和训练相同, 使用DataLoader获得训练数据生成器
data = DataLoader(test_data, batch_size=BATCH_SIZE, collate_fn=generate_batch)
# 验证阶段, 不再求解梯度
with torch.no_grad():
# 按批次取出数据验证
for text, cls in data:
text = text.to(device)
cls = cls.to(device)
# 使用模型获得输出
output = model(text)
# 计算损失
loss = criterion(output, cls)
# 将损失和准确率加到总损失和准确率中
loss += loss.item()
acc += (output.argmax(1) == cls).sum().item()
# 返回本轮验证的平均损失和平均准确率
return loss / len(test_data), acc / len(test_data)
loss, acc = valid_test(test_dataset)
print('测试集loss为: %.5f' % (torch.Tensor.cpu(loss).numpy()))
print('测试集acc为: % .5f' % (acc))
8. 保存模型
# 模型保存
# 首先设定模型的保存路径
PATH = './news_topic_model.pth'
# 保存模型的状态字典
torch.save(model.state_dict(), PATH)
四、测试集Acc
五、模型
密码: c5pu news_topic_model.pth