基于transformer模型的立场检测算法,涉及到新疆棉数据集,可联系博主获取
import torch
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from transformers import DataCollatorWithPadding
from sklearn.model_selection import train_test_split
import tqdm as tqdm
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, classification_report
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm
import tensorflow as tf
import tensorflow.python.keras.backend as K
from tensorflow.keras.utils import to_categorical
#from keras.utils import to_categorical
import re
import os
import gc
gc.collect()
import keras
import pandas as pd
import codecs, gc
import numpy as np
from sklearn.model_selection import KFold
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from keras.metrics import top_k_categorical_accuracy
from keras.layers import *
from keras.callbacks import *
from keras.models import Model
import keras.backend as K
from sqlalchemy import create_engine
import pymysql
from keras.initializers import glorot_uniform
def textAll(data):
for i in tqdm(range(len(data))):
data.loc[i, 'all'] = str(data.loc[i, 'target']) + ',' + str(data.loc[i, 'text'])
return data
BERT_PATH='./my_bert_base/'
maxlen = 200
#从数据库中读取训练集和测试集
# 创建数据库引擎
DATABASE_URL = "mysql+pymysql://root:123456@localhost/doubanmovie"
engine = create_engine(DATABASE_URL)
# 使用SQLAlchemy从Texts表中读取target, text, stance三列
query = "SELECT target, text, stance FROM Texts"
df = pd.read_sql(query, engine)
# 假设我们想要将数据的80%用作训练集,20%用作测试集
train_ratio = 0.8
test_ratio = 0.2
# 打乱数据顺序
df = df.sample(frac=1, random_state=42) # random_state用于确保每次打乱的结果一致
# 分割数据为训练集和测试集
df_train = df.head(int(train_ratio * len(df)))
df_test = df.tail(int(test_ratio * len(df)))
#用csv来读取训练集和测试集
#df_train = pd.read_csv('C:/Users/肖天/Desktop/Xinx.csv',sep=',',engine='python',encoding='utf-8').astype(str)
#df_test = pd.read_csv('C:/Users/肖天/Desktop/Sub_Xinx.csv',sep=',',engine='python',encoding='utf8').astype(str)
df_train = textAll(df_train)
df_test = textAll(df_test)
df_train['long'] = df_train['all'].apply(lambda x: len(x))
df_test['long'] = df_test['all'].apply(lambda x: len(x))
input_categories = 'all'
output_categories = 'stance'
df_train = df_train[df_train[output_categories].isin(['FAVOR', 'AGAINST', 'NONE'])]
def label(x):
if x == 'FAVOR':
return 2
if x == 'NONE':
return 1
if x == 'AGAINST':
return 0
df_train.stance = df_train.stance.apply(lambda x: label(x)) # 将情感转为数字表示
# 1. 准备数据
#train_texts, train_labels, val_texts, val_labels = load_data() # 这里你需要实现加载数据的函数
print('加载预训练的BERT tokenizer')
#预训练好的模型
config_path = BERT_PATH + 'bert_config.json'
checkpoint_path = BERT_PATH + 'bert_model.ckpt'
dict_path = BERT_PATH + 'vocab.txt'
# 将词表中的词编号转换为字典
token_dict = {}
with codecs.open(dict_path, 'r', 'utf8') as reader:
for line in reader:
token = line.strip()
token_dict[token] = len(token_dict)
tokenizer = BertTokenizer.from_pretrained(BERT_PATH)
print('对数据进行编码')
def tokenize_and_encode(texts, labels, tokenizer, max_length=128):
input_ids = []
attention_masks = []
for text in texts:
encoded_dict = tokenizer.encode_plus(
text, # Sentence to encode.
add_special_tokens=True, # Add '[CLS]' and '[SEP]'
padding='max_length',
max_length=max_length, # Pad & truncate all sentences.
return_attention_mask=True, # Construct attn. masks.
return_tensors='pt', # Return pytorch tensors.
)
input_ids.append(encoded_dict['input_ids'])
attention_masks.append(encoded_dict['attention_mask'])
for i, tensor in enumerate(input_ids):
if tensor.size(1) > 128: # 假设除了维度0之外,需要匹配的维度是1
tensor = tensor[:, :128] # 截断张量到长度128
input_ids[i] = tensor
for i, tensor in enumerate(attention_masks):
if tensor.size(1) > 128: # 假设除了维度0之外,需要匹配的维度是1
tensor = tensor[:, :128] # 截断张量到长度128
attention_masks[i] = tensor
input_ids = torch.cat(input_ids, dim=0)
attention_masks = torch.cat(attention_masks, dim=0)
labels = torch.tensor(labels)
return input_ids, attention_masks, labels
train_input_ids, train_attention_masks, train_labels = tokenize_and_encode(df_train['text'], df_train['stance'], tokenizer)
val_input_ids, val_attention_masks, val_labels = tokenize_and_encode(df_train['text'], df_train['stance'], tokenizer)
print('定义数据加载器')
train_dataset = torch.utils.data.TensorDataset(train_input_ids, train_attention_masks, train_labels)
val_dataset = torch.utils.data.TensorDataset(val_input_ids, val_attention_masks, val_labels)
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=16)
val_sampler = SequentialSampler(val_dataset)
val_dataloader = DataLoader(val_dataset, sampler=val_sampler, batch_size=64)
print('加载预训练的BERT模型和定义模型')
model = BertForSequenceClassification.from_pretrained(BERT_PATH)
print('训练模型')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
epochs = 3
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader)*epochs)
num_batches = len(train_dataloader)
print(f"总共有 {num_batches} 个批(batches)在训练数据集中。")
val_num_batches = len(val_dataloader)
print(f"总共有 {num_batches} 个批(batches)在测试数据集中。")
# 训练函数
def train(model, train_dataloader, optimizer, epoch):
model.train()
# 创建损失函数(例如,对于多分类任务,使用交叉熵损失)
criterion = torch.nn.CrossEntropyLoss()
pbar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{...}') # 假设你知道总共的epoch数,否则这里可以是占位符
for i,batch in enumerate(pbar):
b_input_ids = batch[0].to(device)
b_attention_mask = batch[1].to(device)
b_labels = batch[2].to(device)
pbar.set_description(f'Epoch {epoch+1}/{...} - Batch {i+1}/{len(train_dataloader)}')
if i>num_batches:
break
model.zero_grad()
outputs = model(b_input_ids, attention_mask=b_attention_mask)
logits = outputs[0] # 这可能因你的模型和任务而异
padded_logits = torch.cat((logits, torch.zeros(16, 1)), dim=1)
# 计算损失(现在传递labels给损失函数)
loss = criterion(padded_logits, b_labels)
#loss = outputs.loss
loss.backward()
optimizer.step()
# 评估函数
def evaluate(model, data_loader):
model.eval()
predictions = []
true_labels = []
i=0
with torch.no_grad():
for batch in data_loader:
b_input_ids = batch[0].to(device)
b_attention_mask = batch[1].to(device)
b_labels = batch[2].to(device)
i+=1
print(i)
if i>val_num_batches:
break
outputs = model(b_input_ids, attention_mask=b_attention_mask)
logits = outputs.logits
preds = torch.argmax(logits, dim=1).flatten().cpu().tolist()
predictions.extend(preds)
true_labels.extend(b_labels.cpu().numpy())
return accuracy_score(true_labels, predictions), classification_report(true_labels, predictions)
#训练循环
for epoch in range(epochs):
train(model, train_dataloader, optimizer, epoch)
print(f'Epoch {epoch+1}/{epochs}')
acc, report = evaluate(model, val_dataloader)
print(f'Test Accuracy: {acc:.4f}')
print(report)
torch.save(model, 'model.pth')
#评估模型
model_xt = torch.load('model.pth')
model_xt.eval() # 设置模型为评估模式
acc, report = evaluate(model_xt, val_dataloader)
print(f'Test Accuracy: {acc:.4f}')
print(report)


Comments | NOTHING