• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

pytorch+transformers的NER实体识别

武飞扬头像
嫩芽新枝
帮助1

命名实体识别作为一项基础的NLP任务,其用在信息抽取、关系抽取、图谱构建等任务中都作为基础存在。

模型构建models.py:

# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from transformers import BertModel, BertConfig
from torchcrf import CRF
import os
class Bert_BiLSTM_CRF(nn.Module): # BiLSTM加上并无多大用处,速度还慢了,可去掉LSTM层
    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(Bert_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        self.bert = BertModel.from_pretrained("hfl/chinese-roberta-wwm-ext")
        # self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
        #                     num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        # self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.linear = nn.Linear(embedding_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)
    
    def _get_features(self, sentence):
        with torch.no_grad():
            outputs = self.bert(sentence)
        # enc, _ = self.lstm(outputs.last_hidden_state)
        enc = outputs.last_hidden_state
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training,return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing,return decoding
            decode=self.crf.decode(emissions, mask)
            return decode
学新通

utils类(主要负责DataSet类如下)utils.py

# -*- coding: utf-8 -*-
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer
import pandas as pd
tokenizer = BertTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext")
ner_type = pd.read_csv("model_data/bio_type.txt") # 包含ner所有类别的csv文件
ners = ner_type["label"].tolist()
VOCAB = []
for n in ners:
    VOCAB.extend(["B-"   n, "I-"  n])
VOCAB.extend(['<PAD>', '[CLS]', '[SEP]', "O"])
tag2idx = {tag: idx for idx, tag in enumerate(VOCAB)}
idx2tag = {idx: tag for idx, tag in enumerate(VOCAB)}
MAX_LEN = 512 - 5
# MAX_LEN = 128 - 2
class NerDataset(Dataset):
    ''' Generate our dataset '''
    def __init__(self, f_path, inference_df = None):
        self.sents = []
        self.tags_li = []
        if inference_df is not None:
            data = inference_df
        else:
            data = pd.read_csv(f_path)
            
        tags =  data["label"].to_list()
        words = data["word"].to_list()
        print("f_path is {} len_word is {}  len tag is {}".format(f_path, len(words), len(tags)))

        word, tag = [], []
        for char, t in zip(words, tags):
            if char != '。':
                word.append(char)
                tag.append(t)
            else:
                if len(word) >= MAX_LEN-2:
                  self.sents.append(['[CLS]']   word[:MAX_LEN]  [char]   ['[SEP]'])
                  self.tags_li.append(['[CLS]']   tag[:MAX_LEN]   [t]   ['[SEP]'])
                else:
                  self.sents.append(['[CLS]']   word   [char]   ['[SEP]'])
                  self.tags_li.append(['[CLS]']   tag   [t]   ['[SEP]'])
                word, tag = [], []
            
        if word:
            if len(word) >= MAX_LEN-2:
                self.sents.append(['[CLS]']   word[:MAX_LEN]   ['[SEP]'])
                self.tags_li.append(['[CLS]']   tag[:MAX_LEN]   ['[SEP]'])
            else:
                self.sents.append(['[CLS]']   word   ['[SEP]'])
                self.tags_li.append(['[CLS]']   tag   ['[SEP]'])
            word, tag = [], []

    def __getitem__(self, idx):
        words, tags = self.sents[idx], self.tags_li[idx]
        token_ids = tokenizer.convert_tokens_to_ids(words)
        laebl_ids = [tag2idx[tag] for tag in tags]
        seqlen = len(laebl_ids)
        return token_ids, laebl_ids, seqlen

    def __len__(self):
        return len(self.sents)

def PadBatch(batch):
    maxlen = max([i[2] for i in batch])
    token_tensors = torch.LongTensor([i[0]   [0] * (maxlen - len(i[0])) for i in batch])
    label_tensors = torch.LongTensor([i[1]   [0] * (maxlen - len(i[1])) for i in batch])
    mask = (token_tensors > 0)
    return token_tensors, label_tensors, mask

学新通

模型在真实数据上进行train、valid、test、inference的ner_main.py脚本如下:

# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils import data
import os
import warnings
import argparse
import numpy as np
from sklearn import metrics
from transformers import AdamW, get_linear_schedule_with_warmup
import pandas as pd
from models import Bert_BiLSTM_CRF
from utils import NerDataset, PadBatch, VOCAB, tokenizer, tag2idx, idx2tag

warnings.filterwarnings("ignore", category=DeprecationWarning)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

def train(e, model, iterator, optimizer, scheduler, criterion, device):
    model.train()
    losses = 0.0
    step = 0
    for i, batch in enumerate(iterator):
        step  = 1
        x, y, z = batch
        x = x.to(device)
        y = y.to(device)
        z = z.to(device)
        loss = model(x, y, z)
        losses  = loss.item()
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
    print("Epoch: {}, Loss:{:.4f}".format(e, losses/step))

def validate(e, model, iterator, device):
    model.eval()
    Y, Y_hat = [], []
    losses = 0
    step = 0
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            step  = 1

            x, y, z = batch
            x = x.to(device)
            y = y.to(device)
            z = z.to(device)

            y_hat = model(x, y, z, is_test=True)

            loss = model(x, y, z)
            losses  = loss.item()
            # Save prediction
            for j in y_hat:
              Y_hat.extend(j)
            # Save labels
            mask = (z==1)
            y_orig = torch.masked_select(y, mask)
            Y.append(y_orig.cpu())

    Y = torch.cat(Y, dim=0).numpy()
    Y_hat = np.array(Y_hat)
    acc = (Y_hat == Y).mean()*100

    print("Epoch: {}, Val Loss:{:.4f}, Val Acc:{:.3f}%".format(e, losses/step, acc))
    return model, losses/step, acc

def test(model, iterator, device):
    model.eval()
    Y, Y_hat = [], []
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            x, y, z = batch
            x = x.to(device)
            z = z.to(device)
            y_hat = model(x, y, z, is_test=True)
            # Save prediction
            for j in y_hat:
              Y_hat.extend(j)
            # Save labels
            mask = (z==1).cpu()
            y_orig = torch.masked_select(y, mask)
            Y.append(y_orig)

    Y = torch.cat(Y, dim=0).numpy()
    y_true = [idx2tag[i] for i in Y]
    y_pred = [idx2tag[i] for i in Y_hat]

    return y_true, y_pred

if __name__=="__main__o":
      ner_type = pd.read_csv("model_data/type.txt")
      ners = ner_type["label"].tolist()
      labels = []
      for n in ners:
          labels.extend(["B-"   n, "I-"  n])
      print("all type len is {}".format(len(labels)))
      best_model = None
      _best_val_loss = 1e18
      _best_val_acc = 1e-18

      parser = argparse.ArgumentParser()
      parser.add_argument("--batch_size", type=int, default=256)
      parser.add_argument("--lr", type=float, default=0.0005)
      parser.add_argument("--n_epochs", type=int, default=40)
      parser.add_argument("--trainset", type=str, default="model_data/0704_bio_train.csv")
      parser.add_argument("--validset", type=str, default="model_data/0704_bio_test.csv")
      parser.add_argument("--testset", type=str, default="model_data/0704_bio_test.csv")

      ner = parser.parse_args()
      model = Bert_BiLSTM_CRF(tag2idx).cuda()
      print('Initial model Done.')
      train_dataset = NerDataset(ner.trainset)
      print("train data len is {}".format(len(train_dataset)))
      eval_dataset = NerDataset(ner.validset)
      print("validset data len is {}".format(len(eval_dataset)))
      test_dataset = NerDataset(ner.testset)
      print("test_dataset len is {}".format(len(test_dataset)))
      print('Load Data Done.')

      train_iter = data.DataLoader(dataset=train_dataset,
                                    batch_size=ner.batch_size,
                                    shuffle=True,
                                    num_workers=4,
                                    collate_fn=PadBatch)

      eval_iter = data.DataLoader(dataset=eval_dataset,
                                    batch_size=(ner.batch_size)//2,
                                    shuffle=False,
                                    num_workers=4,
                                    collate_fn=PadBatch)

      test_iter = data.DataLoader(dataset=test_dataset,
                                  batch_size=(ner.batch_size)//2,
                                  shuffle=False,
                                  num_workers=4,
                                  collate_fn=PadBatch)

      optimizer = AdamW(model.parameters(), lr=ner.lr, eps=1e-6)

      # Warmup
      len_dataset = len(train_dataset) 
      epoch = ner.n_epochs
      batch_size = ner.batch_size
      total_steps = (len_dataset // batch_size) * epoch if len_dataset % batch_size == 0 else (len_dataset // batch_size   1) * epoch

      warm_up_ratio = 0.1 # Define 10% steps
      scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = warm_up_ratio * total_steps, num_training_steps = total_steps)
      criterion = nn.CrossEntropyLoss(ignore_index=0) 

      print('Start Train...,')
      for epoch in range(1, ner.n_epochs 1):
          train(epoch, model, train_iter, optimizer, scheduler, criterion, device)
          candidate_model, loss, acc = validate(epoch, model, eval_iter, device)

          if loss < _best_val_loss and acc > _best_val_acc:
            best_model = candidate_model
            _best_val_loss = loss
            _best_val_acc = acc

          print("=============================================")

      y_test, y_pred = test(best_model, test_iter, device)
      print(metrics.classification_report(y_test, y_pred, labels=labels, digits=3))
      torch.save(best_model.state_dict(), "checkpoint/0704_ner.pt")
      test_data = pd.read_csv("model_data/0704_bio_test.csv")
      y_test_useful = []
      y_pred_useful = []
      for a, b in zip(y_test, y_pred):
          if a not in ['[CLS]', '[SEP]']:
                y_test_useful.append(a)
                y_pred_useful.append(b)
      test_data["labeled"] = y_test_useful
      test_data["pred"] = y_pred_useful
      test_data.to_csv("result_files/bio_test_result.csv", index=False)
学新通

其中train.csv 和 test.csv只需要满足拥有word和label列即可,sample如下:

word,label
:,O,O
右,B-空间概念,B-位置
肺,B-解剖结构,B-解剖结构
下,I-解剖结构,I-解剖结构
叶,I-解剖结构,I-解剖结构
实,B-限定语,B-限定语
质,I-限定语,I-限定语
性,I-限定语,I-限定语
结,B-异常发现,B-异常发现
节,I-异常发现,I-异常发现
占,I-异常发现,I-异常发现
位,I-异常发现,I-异常发现
",",O,O

requirements.txt如下,以上脚本基于python3.8运行无误:

torch==1.9.1
transformers==4.12.5
hanziconv==0.3.2
pandas==1.4.2
tqdm==4.64.0
scikit-learn==1.1.1
pytorch-crf==0.7.2
numpy==1.22.4

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiacjaj
系列文章
更多 icon
同类精品
更多 icon
继续加载