NLP

命名实体审核

Posted by 新宇 on October 22, 2020

一、 任务简介

  • NE审核任务:
    • 一般在实体进入数据库存储前, 中间都会有一道必不可少的工序, 就是对识别出来的实体进行合法性的检验, 即命名实体(NE)审核任务. 它的检验过程不使用上下文信息, 更关注于字符本身的组合方式来进行判断, 本质上,它是一项短文本二分类问题.
  • 选用的模型及其原因:
    • 针对短文本任务, 无须捕捉长距离的关系, 因此我们使用了传统的RNN模型来解决, 性能和效果可以达到很好的均衡.
    • 短文本任务往往适合使用字嵌入的方式, 但是如果你的训练集不是很大,涉及的字数有限, 那么可以直接使用预训练模型的字向量进行表示即可. 我们这里使用了bert-chinese预训练模型来获得中文汉字的向量表示.

二、优化思路

  • 数据集
    • 训练集和数据集(train:valid = 9:1)
    • 数据增强
      • 正样本(未实现)
        • 爬虫+搜索, 关键词替换(关键词表 - 人工构造)
      • 负样本
        • 随机删除字符
        • 随机替换
        • 主题无关负样本(90%)
  • 模型
    • 使用nn.GRU模型
  • 参数优化
    • 自适应学习率,使用Adam算法进行梯度下降
  • 其他:
    • 增加训练轮数 epoch=10

三、训练过程

  • 仅切分数据集, 验证集 Acc = 81% ~ 84%
  • 使用nn.GRU模型, 验证集 Acc = 83% ~ 86%
  • 负样本增强, 验证集 Acc = 85% ~ 87%
  • 自适应学习率, 验证集 Acc = 89% ~ 90%
  • 增加轮数(epoch=10), 验证集 Acc = 89% ~ 91.8%

四、代码实现

1. bert_chinese_encode.py

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from tensorflow.keras.preprocessing import sequence
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = torch.hub.load('huggingface/pytorch-transformers', 'model', 'bert-base-chinese').to(device)
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-chinese')

def get_bert_encode_for_single(text):
    index_tokens = tokenizer.encode(text)[1:-1] 
    tokens_tensor = torch.tensor([index_tokens])
    with torch.no_grad():
        encoded_layers, _ = model(tokens_tensor.to(device))
    encoded_layers = encoded_layers[0]
    return encoded_layers

if __name__=="__main__":
    text = "你好, 卡路里"
    outputs = get_bert_encode_for_single(text)
    print(outputs)
    print(outputs.shape)

2. RNN_MODEL.py

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """初始化函数中有三个参数,分别是输入张量最后一维的尺寸大小,
         隐层张量最后一维的尺寸大小, 输出张量最后一维的尺寸大小"""

        super(GRU, self).__init__()
        self.hidden_size = hidden_size
        # self.linear1 = nn.Linear(input_size+hidden_size, hidden_size)

        # 优化:这里使用GRU

        self.gru = nn.GRU(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, input, hidden):
        # conbined = torch.cat((input, hidden), 1)

        # hidden = self.linear1(conbined)

        op, hn = self.gru(input.unsqueeze(0), hidden.unsqueeze(0))
        output = self.linear2(hn[0])
        output = self.softmax(output)
        return output, hn[0]

    def initHidden(self):
        return torch.zeros(1, self.hidden_size).to(device)
 

if __name__ == "__main__":
    input_size = 768
    hidden_size = 128
    n_categories = 2
    input = torch.rand(1, input_size).to(device)
    hidden = torch.rand(1, hidden_size).to(device)
    rnn = GRU(input_size, hidden_size, n_categories).to(device)
    outputs, hidden = rnn(input, hidden)
    print("outputs:", outputs)
    print("hidden:", hidden)

3. train.py

# -*- coding: utf-8 -*-

# 导入bert中文编码的预训练模型

from bert_chinese_encode import get_bert_encode_for_single
from RNN_MODEL import GRU
import random
import torch
import torch.nn as nn
import pandas as pd
import torch.optim as optim

from collections import Counter

# 这里使用GPU进行训练

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('device: ', device)

# 读取数据

print('--- 开始读取数据,并划分训练集与验证集 ---')
# data_path = "./train_data.csv"

'''
优化: 使用负样本进行数据增强后的数据
'''

train_data_path = "./data_new/train_data_new.csv"
valie_data_path = "./data_new/valid_data_new.csv"

train_data_df = pd.read_csv(train_data_path, header=None, sep="\t")
train_data = train_data_df.values.tolist()
random.shuffle(train_data)
valid_data_df = pd.read_csv(valie_data_path, header=None, sep="\t")
valid_data = valid_data_df.values.tolist()
random.shuffle(valid_data)
# 打印正负标签比例

print('训练集正负标签比例: ', dict(Counter(train_data_df[0].values)))
print('验证集正负标签比例: ', dict(Counter(valid_data_df[0].values)))

train_data_len = len(train_data)

print('--- 构建模型 ---')
# 构建RNN模型

input_size = 768
hidden_size = 128
n_categories = 2
input = torch.rand(1, input_size).to(device)
hidden = torch.rand(1, hidden_size).to(device)

rnn = GRU(input_size, hidden_size, n_categories)
rnn.to(device)
outputs, hidden = rnn(input, hidden)
print("outputs:", outputs)
print("hidden:", hidden)

# # 预加载的模型参数路径
# print('--- 加载模型 ---')
# MODEL_PATH = './BERT_RNN.pth'
#
# # 隐层节点数, 输入层尺寸, 类别数都和训练时相同即可
# n_hidden = 128
# input_size = 768
# n_categories = 2
#
# # 实例化RNN模型, 并加载保存模型参数
# rnn = GRU(input_size, n_hidden, n_categories).to(device)
# rnn.load_state_dict(torch.load(MODEL_PATH))


# 第一步: 构建随机选取数据函数

def randomTrainingExample(train_data):
    """随机选取数据函数, train_data是训练集的列表形式数据"""

    # 从train_data随机选择一条数据

    category, line = random.choice(train_data)
    # 将里面的文字使用bert进行编码, 获取编码后的tensor类型数据

    line_tensor = get_bert_encode_for_single(line)
    # 将分类标签封装成tensor

    category_tensor = torch.tensor([int(category)])
    # 返回四个结果

    return category, line, category_tensor.to(device), line_tensor.to(device)

'''
优化:使用全部数据进行训练和验证
'''

def getData(data, index):
    category, line = data[index]
    line_tensor = get_bert_encode_for_single(line)
    # 将分类标签封装成tensor

    category_tensor = torch.tensor([int(category)])
    # 返回四个结果

    return category, line, category_tensor.to(device), line_tensor.to(device)

# 第二步: 构建模型训练函数
# 选取损失函数为NLLLoss()

criterion = nn.NLLLoss()
# 学习率为0.005
# learning_rate = 0.005

optimizer = optim.Adam(rnn.parameters(), lr=0.005)

def train(category_tensor, line_tensor):
    """模型训练函数, category_tensor代表类别张量, line_tensor代表编码后的文本张量"""
    # 初始化隐层

    hidden = rnn.initHidden()
    # 模型梯度归0

    rnn.zero_grad()
    # 遍历line_tensor中的每一个字的张量表示

    for i in range(line_tensor.size()[0]):
        # 然后将其输入到rnn模型中, 因为模型要求是输入必须是二维张量, 因此需要拓展一个维度, 循环调用rnn直到最后一个字

        output, hidden = rnn(line_tensor[i].unsqueeze(0), hidden)
    # 根据损失函数计算损失, 输入分别是rnn的输出结果和真正的类别标签

    loss = criterion(output, category_tensor)
    # 将误差进行反向传播

    loss.backward()

    # 更新模型中所有的参数

    optimizer.step()
    # for p in rnn.parameters():
    #     # 将参数的张量表示与参数的梯度乘以学习率的结果相加以此来更新参数
    #     p.data.add_(-learning_rate, p.grad.data)

    # 返回结果和损失的值

    return output, loss.item()

# 第三步: 模型验证函数

def valid(category_tensor, line_tensor):
    """模型验证函数, category_tensor代表类别张量, line_tensor代表编码后的文本张量"""

    # 初始化隐层

    hidden = rnn.initHidden()
    # 验证模型不自动求解梯度

    with torch.no_grad():
        # 遍历line_tensor中的每一个字的张量表示

        for i in range(line_tensor.size()[0]):
            # 然后将其输入到rnn模型中, 因为模型要求是输入必须是二维张量, 因此需要拓展一个维度, 循环调用rnn直到最后一个字

            output, hidden = rnn(line_tensor[i].unsqueeze(0), hidden)
        # 获得损失

        loss = criterion(output, category_tensor)
     # 返回结果和损失的值

    return output, loss.item()

# 第四步: 调用训练和验证函数

import time
import math

def timeSince(since):
    "获得每次打印的训练耗时, since是训练开始时间"

    # 获得当前时间

    now = time.time()
    # 获得时间差,就是训练耗时

    s = now - since
    # 将秒转化为分钟, 并取整

    m = math.floor(s / 60)
    # 计算剩下不够凑成1分钟的秒数

    s -= m * 60
    # 返回指定格式的耗时

    return '%dm %ds' % (m, s)


# 设置迭代次数

epoches = 10

# 打印间隔为1000步

plot_every = 1000


# 初始化打印间隔中训练和验证的损失和准确率

train_current_loss = 0
train_current_acc = 0
valid_current_loss = 0
valid_current_acc = 0


# 初始化盛装每次打印间隔的平均损失和准确率

all_train_losses = []
all_train_acc = []
all_valid_losses = []
all_valid_acc = []

# 获取开始时间戳

start = time.time()

# 记录预测错误的样本

errors = set()

# 循环遍历n_iters次
# for iter in range(1, n_iters + 1):

print('--- 开始模型训练 --- ')
for epoch in range(epoches):
    print('epoch: %d' % (epoch + 1))
    for iter in range(1, train_data_len + 1):
        # # 调用两次随机函数分别生成一条训练和验证数据

        # category, line, category_tensor, line_tensor = randomTrainingExample(train_data)

        # category_, line_, category_tensor_, line_tensor_ = randomTrainingExample(train_data)

        # 优化: 使用划分后的训练集和验证集

        category, line, category_tensor, line_tensor = getData(train_data, (iter - 1) % train_data_len)
        ategory_, line_, category_tensor_, line_tensor_ = getData(valid_data, (iter - 1) % len(valid_data))

        # 分别调用训练和验证函数, 获得输出和损失

        train_output, train_loss = train(category_tensor, line_tensor)
        valid_output, valid_loss = valid(category_tensor_, line_tensor_)
        # 进行训练损失, 验证损失,训练准确率和验证准确率分别累加

        train_current_loss += train_loss
        train_current_acc += (train_output.argmax(1) == category_tensor).sum().item()
        valid_current_loss += valid_loss
        valid_current_acc += (valid_output.argmax(1) == category_tensor_).sum().item()
        # 当迭代次数是指定打印间隔的整数倍时

        if iter % plot_every == 0:
            # 用刚刚累加的损失和准确率除以间隔步数得到平均值

            train_average_loss = train_current_loss / plot_every
            train_average_acc = train_current_acc/ plot_every
            valid_average_loss = valid_current_loss / plot_every
            valid_average_acc = valid_current_acc/ plot_every
            # 打印迭代步, 耗时, 训练损失和准确率, 验证损失和准确率

            print("epoch:", epoch + 1, "Iter:", iter, "|", "TimeSince:", timeSince(start))
            print("epoch:", epoch + 1, "Train Loss:", train_average_loss, "|", "Train Acc:", train_average_acc)
            print("epoch:", epoch + 1, "Valid Loss:", valid_average_loss, "|", "Valid Acc:", valid_average_acc)
            # 将结果存入对应的列表中,方便后续制图

            all_train_losses.append(train_average_loss)
            all_train_acc.append(train_average_acc)
            all_valid_losses.append(valid_average_loss)
            all_valid_acc.append(valid_average_acc)
            # 将该间隔的训练和验证损失及其准确率归0

            train_current_loss = 0
            train_current_acc = 0
            valid_current_loss = 0
            valid_current_acc = 0
        elif iter == train_data_len:
            num = train_data_len % plot_every
            # 用刚刚累加的损失和准确率除以间隔步数得到平均值

            train_average_loss = train_current_loss / num
            train_average_acc = train_current_acc / num
            valid_average_loss = valid_current_loss / num
            valid_average_acc = valid_current_acc / num
            # 打印迭代步, 耗时, 训练损失和准确率, 验证损失和准确率

            print("epoch:", epoch + 1, "Iter:", iter, "|", "TimeSince:", timeSince(start))
            print("epoch:", epoch + 1, "Train Loss:", train_average_loss, "|", "Train Acc:", train_average_acc)
            print("epoch:", epoch + 1, "Valid Loss:", valid_average_loss, "|", "Valid Acc:", valid_average_acc)
            # 将结果存入对应的列表中,方便后续制图

            all_train_losses.append(train_average_loss)
            all_train_acc.append(train_average_acc)
            all_valid_losses.append(valid_average_loss)
            all_valid_acc.append(valid_average_acc)
            # 将该间隔的训练和验证损失及其准确率归0

            train_current_loss = 0
            train_current_acc = 0
            valid_current_loss = 0
            valid_current_acc = 0

print('--- 绘制训练和验证的损失和准确率对照曲线 ---')
# 第五步: 绘制训练和验证的损失和准确率对照曲线

import matplotlib.pyplot as plt

plt.figure(0)
plt.plot(all_train_losses, label="Train Loss")
plt.plot(all_valid_losses, color="red", label="Valid Loss")
plt.legend(loc='upper left')
plt.savefig("./loss.png")


plt.figure(1)
plt.plot(all_train_acc, label="Train Acc")
plt.plot(all_valid_acc, color="red", label="Valid Acc")
plt.legend(loc='upper left')
plt.savefig("./acc.png")

print('--- 模型保存 ---')
# 第六步: 模型保存
# 保存路径

MODEL_PATH = './BERT_RNN.pth'
# 保存模型参数

torch.save(rnn.state_dict(), MODEL_PATH)

print('--- 模型训练完毕 ---')

4. data_strong.py

# -*- coding: utf-8 -*-

import pandas as pd
import random

# 分离正负样本

def split_pos_neg():
    data_path = "./train_data.csv"
    data = pd.read_csv(data_path, header=None, sep="\t")
    # 生成正样本 train_data_pos.csv
    data[data[0] == 1].to_csv('./data_new/train_data_pos.csv', header=None, index=None, sep="\t")
    # 生成负样本 train_data_neg.csv
    data[data[0] == 0].to_csv('./data_new/train_data_neg.csv', header=None, index=None, sep="\t")

# 优化: 生成新的负样本样本(随机删减字符 + 随机替换字符 + 医学无关样本)

def to_neg_data(random_del = 0.2, random_replace = 0.2, others = 0.6):
    assert (random_del+random_replace+others) == 1.0
    pos_path = './data_new/train_data_pos.csv'
    pos_data = pd.read_csv(pos_path, header=None, sep="\t")
    texts = pos_data.iloc[:, 1].values.tolist()
    print(texts[:5])
    # 生成随机删减字符的负样本

    del_len = int(len(texts)*random_del)
    texts_new = []
    for i in range(del_len):
        if len(texts[i]) < 6:
            # 如果长度小于6,则反转

            texts_new.append(''.join(reversed(texts[i])))
        else:
            # 如果长度大于6,则裁剪

            num = random.randint(1, len(texts[i])//3)
            texts_new.append(''.join(texts[i][num: -num]))
    # 生成随机替换字符的负样本

    replace_len = int(len(texts)*random_replace)
    with open('./data_new/jay_chou.txt') as f:
        jay = f.readlines()
        for i in range(del_len, del_len + replace_len):
            line = random.choice(jay).strip()
            num = random.randint(0, len(line) + 1)
            texts_new.append(''.join(texts[i][0:random.randint(1, len(texts[i])//3 + 1)] +
                                     line[:num] +
                                     texts[i][-random.randint(1, len(texts[i])//3 + 1)]
                                     )
                             )

    # 生成医学无关负样本

    others_len = len(texts) - del_len - replace_len
    with open('./data_new/jay_chou.txt') as f:
        jay = f.readlines()
        for i in range(del_len + replace_len, len(texts)):
            line = random.choice(jay).strip().split(' ')[0]
            texts_new.append(''.join(line))

    print("texts_new的长度: ", len(texts_new))
    random.shuffle(texts_new)
    new_df = pd.DataFrame({'labels': [0] * len(pos_data), 'texts': texts_new})
    new_df.to_csv('./data_new/train_data_neg_new.csv', header=None, index=None,  sep="\t")

def concat_pos_and_neg(radio = 0.9):
    pos_path = './data_new/train_data_pos.csv'
    neg_path = './data_new/train_data_neg_new.csv'
    pos_data = pd.read_csv(pos_path, header=None, sep="\t")
    neg_data = pd.read_csv(neg_path, header=None, sep="\t")
    train_len = int(len(pos_data) * radio)
    # 生成train_data

    pos_train = pos_data.iloc[:train_len, :]
    neg_train = neg_data.iloc[:train_len, :]
    print(pos_train.shape)
    print(neg_train.shape)
    df_train = pd.concat([pos_train, neg_train], axis=0)
    write_to_csv(df_train, './data_new/train_data_new.csv')
    
    # 生成valid_data

    pos_valid = pos_data.iloc[train_len:, :]
    neg_valid = neg_data.iloc[train_len:, :]
    df_valid = pd.concat([pos_valid, neg_valid], axis=0)
    write_to_csv(df_valid, './data_new/valid_data_new.csv')
    
def write_to_csv(dataframe, path):
    data = dataframe.values.tolist()
    print(len(data))
    random.shuffle(data)
    df = pd.DataFrame({'label': [i for i, _ in data], "text": [_ for i, _ in data]})
    df.to_csv(path, header=None, index=None, sep="\t")


if __name__=="__main__":
    # split_pos_neg()

    to_neg_data(0.05, 0.05, 0.9)
    concat_pos_and_neg()

5. predict.py

# -*- coding: utf-8 -*-

import os
import torch
import torch.nn as nn

# 导入RNN模型结构

from RNN_MODEL import GRU
# 导入bert预训练模型编码函数

from bert_chinese_encode import get_bert_encode_for_single

import pandas as pd
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 预加载的模型参数路径

MODEL_PATH = './BERT_RNN.pth'

# 隐层节点数, 输入层尺寸, 类别数都和训练时相同即可

n_hidden = 128
input_size = 768
n_categories = 2

# 实例化RNN模型, 并加载保存模型参数

rnn = GRU(input_size, n_hidden, n_categories).to(device)
rnn.load_state_dict(torch.load(MODEL_PATH))


def _test(line_tensor):
    """模型测试函数, 它将用在模型预测函数中, 用于调用RNN模型并返回结果.它的参数line_tensor代表输入文本的张量表示"""
    # 初始化隐层张量

    hidden = rnn.initHidden()
    # 与训练时相同, 遍历输入文本的每一个字符

    for i in range(line_tensor.size()[0]):
        # 将其逐次输送给rnn模型

        output, hidden = rnn(line_tensor[i].unsqueeze(0), hidden)
    # 获得rnn模型最终的输出

    return output


def predict(input_line):
    """模型预测函数, 输入参数input_line代表需要预测的文本"""

    # 不自动求解梯度

    with torch.no_grad():
        # 将input_line使用bert模型进行编码

        output = _test(get_bert_encode_for_single(input_line))
        # 从output中取出最大值对应的索引, 比较的维度是1

        _, topi = output.topk(1, 1)
        # 返回结果数值

        return topi.item()

def batch_predict(input_path, output_path):
    """批量预测函数, 以原始文本(待识别的命名实体组成的文件)输入路径
       和预测过滤后(去除掉非命名实体的文件)的输出路径为参数"""

    # 待识别的命名实体组成的文件是以疾病名称为csv文件名,
    # 文件中的每一行是该疾病对应的症状命名实体
    # 读取路径下的每一个csv文件名, 装入csv列表之中

    csv_list = os.listdir(input_path)
    # 遍历每一个csv文件

    for csv in csv_list:
        # 以读的方式打开每一个csv文件

        with open(os.path.join(input_path, csv), "r") as fr:
            # 再以写的方式打开输出路径的同名csv文件

            with open(os.path.join(output_path, csv), "w") as fw:
                # 读取csv文件的每一行

                input_line = fr.readline()
                # 使用模型进行预测

                res = predict(input_line)
                # 如果结果为1

                if res:
                    # 说明审核成功, 写入到输出csv中

                    fw.write(input_line + "\n")
                else:
                    pass

def bad_case(path):
    print('开始predict,并记录badcase信息')
    data = pd.read_csv(data_path, header=None, sep="\t")
    labels = data[0].values.tolist()
    texts = data[1].values.tolist()
    pos_errors = []
    neg_errors = []
    for i in range(len(labels)):
        if predict(texts[i]) != labels[i]:
            if labels[i] == 1:
                pos_errors.append(texts[i])
            else:
                neg_errors.append(texts[i])
    pos_out = open('./bad_case/pos_errors.txt', 'w', encoding='utf-8')
    neg_out = open('./bad_case/neg_errors.txt', 'w', encoding='utf-8')
    for i in range(len(pos_errors)):
        pos_out.write(pos_errors[i])
        pos_out.write('\n')
    pos_out.close()
    for i in range(len(neg_errors)):
        neg_out.write(neg_errors[i])
        neg_out.write('\n')
    neg_out.close()


if __name__ == '__main__':
    # input_path = "/data/doctor_offline/structured/noreview/"
    # output_path = "/data/doctor_offline/structured/reviewed/"
    # batch_predict(input_path, output_path)
    # print( predict('耳朵疼'))

    data_path = "./data_new/train_data_new.csv"
    bad_case(data_path)