news 2026/4/21 8:30:19

基于双向LSTM的中文情感分类实战:从数据预处理到实时预测

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于双向LSTM的中文情感分类实战:从数据预处理到实时预测

一、项目简介

本文介绍一个完整的中文文本情感分类项目,使用双向三层LSTM模型,对微博评论进行四分类情绪识别:喜悦、愤怒、厌恶、低落。项目包括数据预处理、词汇表构建、预训练词向量加载、模型训练、验证保存以及实时预测脚本。


二、项目结构

. ├── simplifyweibo_4_moods.csv # 原始数据(标签+文本) ├── save_vocab.py # 构建词汇表并保存为pkl ├── load_dataset.py # 加载数据、填充/截断、生成batch迭代器 ├── TextRNN.py # 模型定义(Embedding + BiLSTM + FC) ├── train_eval_test.py # 训练、验证、保存最佳模型 ├── main.py # 整合流程:加载数据、训练、单句预测示例 ├── predict.py # 独立交互式预测脚本 ├── embedding_Tencent.npz # 预训练腾讯中文词向量(200维) └── best_model.pth # 训练后保存的最佳模型权重

三、环境依赖

torch numpy tqdm pickle

四、数据集说明

文件simplifyweibo_4_moods.csv格式如下(第一行是表头,代码会跳过第一行):

标签映射:

  • 0 → 喜悦

  • 1 → 愤怒

  • 2 → 厌恶

  • 3 → 低落


五、代码详解

1. 构建词汇表(save_vocab.py

作用:统计所有字符出现频率,过滤低频字,添加<UNK><PAD>,保存为simplifyweibo_4_moods.pkl

# 导入进度条库,处理文件时显示进度 from tqdm import tqdm # 导入pickle,用于把字典保存成文件 import pickle as pkl # ===================== 配置参数 ===================== # 词表最大容量:最多保留 4760 个常用字 MAX_VOCAB_SIZE = 4760 # 两个特殊符号: # UNK 代表:不认识的字(生僻字) # PAD 代表:填充符号(把句子补成一样长) UNK, PAD = '<UNK>', '<PAD>' # ===================== 核心函数:构建词表 ===================== # 函数作用:读取CSV文本 → 统计字频 → 生成【字→数字ID】词典 → 保存文件 def build_vocab(file_path, max_size, min_freq): # 定义分词器:把一句话拆成【一个一个的字】 tokenizer = lambda x: [y for y in x] # 创建空字典,用来统计【每个字出现多少次】 vocab_dic = {} # 打开CSV文件,读取微博内容 with open(file_path, 'r', encoding='UTF-8') as f: i = 0 # 用来跳过第一行表头 # 逐行读取文件,tqdm显示进度条 for line in tqdm(f): # 第一行是表头 label,review,直接跳过 if i == 0: i += 1 continue # 去掉每行前2个字符(标签+逗号),并去除空格 # 例如:0,今天很开心 → 变成:今天很开心 lin = line[2:].strip() # 如果这一行是空的,跳过 if not lin: continue # 把句子拆成单个字,然后统计每个字出现次数 for word in tokenizer(lin): # 字计数 +1 vocab_dic[word] = vocab_dic.get(word, 0) + 1 # ===================== 筛选常用字 ===================== # 1. 只保留出现次数 > min_freq 的字 # 2. 按出现次数从高到低排序 # 3. 最多保留 max_size 个字(4760) vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] > min_freq], key=lambda x: x[1], reverse=True)[:max_size] # ===================== 给每个字分配数字ID ===================== # 把字变成字典格式:{字1:0, 字2:1, 字3:2 ...} vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)} # ===================== 添加两个特殊符号 ===================== vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1}) # 打印最终的词表 print(vocab_dic) # ===================== 保存词表到文件 ===================== # 保存成pkl文件,给后面训练模型使用 pkl.dump(vocab_dic, open('simplifyweibo_4_moods.pkl', 'wb')) # 打印词表大小 print(f'Vocab size:{len(vocab_dic)}') # 返回生成好的词表 return vocab_dic # ===================== 程序入口:运行这里开始 ===================== if __name__ == "__main__": # 调用函数,生成词表 # 参数:CSV文件路径,最大词数,最少出现次数(3次以上才保留) vocab = build_vocab('simplifyweibo_4_moods.csv', MAX_VOCAB_SIZE, 3) # 打印提示,表示执行完成 print('vocab')

关键点

  • 字符级切分,每个汉字/标点为一个 token。

  • <UNK>索引 = 有效词汇数,<PAD>索引 = 有效词汇数+1,最终词汇表大小为4760+2=4762

输出结果:


2. 数据加载与批处理(load_dataset.py

2.1 加载数据集(完整代码)
from tqdm import tqdm import pickle as pkl import random import torch UNK,PAD ='<UNK>','<PAD>'#未知字,padding符号 def load_dataset(path,pad_size=70):#pad_size表示超过70截断,不超过70用UNK填充 contents=[]#用来存储转换为数值标号的句子 vocab = pkl.load(open('simplifyweibo_4_moods.pkl','rb'))#读取vocab 文作 tokenizers = lambda x:[y for y in x]#创建一个还函数 with open(path,'r',encoding='UTF-8') as f: i = 0 for line in tqdm(f): if i ==0: i +=1 continue if not line:#是不是空行 continue label = int(line[0]) content = line[2:].strip('\n') words_line=[] token = tokenizers(content)# 将每一行的内容进行分字 seq_len = len(token)#获取一行实际内容的长度 if pad_size:#判断每条评论是否超过70 个字 if len(token)<pad_size:# 如果一行的字少于70,则补充<PAD> token.extend([PAD] *(pad_size-len(token))) else:#如果一行的字大于70,则只取前70个字 token = token[:pad_size]#如果一条评论内的字大于或等于70个字,索引的切分 seq_len=pad_size#当前评论的长度 for word in token: words_line.append(vocab.get(word,vocab.get(UNK)))#把每一条评论转换为独热编码 contents.append((words_line,int(label),seq_len))#独热编码,标签值,句子长度 random.shuffle(contents)#打乱顺序 train_data = contents[: int(len(contents)*0.8)]#前80%的评论数据作为训练集 dev_data = contents[int(len(contents)*0.8): int(len(contents)*0.9)]#把80%-90%的评论数据集作为验证数据 test_data = contents[int(len(contents)*0.9):]#90%——最后的数据作为测试数据集 return vocab,train_data,dev_data,test_data class DatasetIterater(object): #将数据batches切分成Batchs_size的包 def __init__(self,batches,batch_size,device): self.batches=batches self.n_batches=len(batches)//batch_size self.residue = False if len(batches)% self.n_batches!=0: self.residue = True self.index=0 self.device=device def _to_tensor(self,datas):#自己定义的一个函数,并不是内置的函数功能 x = torch.LongTensor([_0] for _ in datas).to(self.device)#评论内容 y = torch.LongTensor(_[1] for _ in datas).to(self.device) #pad前的长度 seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) return(x,seq_len),y def __next__(self): if self.residue and self.index == self.n_batches: batches = self.batches[self.index * self.bach_size:len(self.batches)] self.index+=1 batches = self._to_tensor(batches) return batches elif self.index >self.n_batches:#当读取完最后一个batch时: self.index = 0 raise StopIteration else: batches = self.batches[self.index * self.batche_size:(self.index+1)*self.batch_size] self.index +=1 batches = self._to_tensor(batches) return batches def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches +1 else: return self.n_batches if __name__=="__main__": vocab,train_data,dev_data,test_data=load_dataset('simplifyweibo_4_moods.csv') print(train_data,dev_data,test_data) print('结束')
  • pad_size=70:所有句子统一长度70,短则补<PAD>,长则截断。

  • 每个样本是(input_ids, label, seq_len)元组。

输出结果:

2.2 批次迭代器
class DatasetIterater: def __init__(self, batches, batch_size, device): self.batches = batches self.batch_size = batch_size self.n_batches = len(batches) // batch_size self.residue = len(batches) % batch_size != 0 self.index = 0 self.device = device def __next__(self): if self.residue and self.index == self.n_batches: batch = self.batches[self.index*self.batch_size:] self.index += 1 return self._to_tensor(batch) elif self.index > self.n_batches: self.index = 0 raise StopIteration else: batch = self.batches[self.index*self.batch_size:(self.index+1)*self.batch_size] self.index += 1 return self._to_tensor(batch) def _to_tensor(self, datas): x = torch.LongTensor([_[0] for _ in datas]).to(self.device) y = torch.LongTensor([_[1] for _ in datas]).to(self.device) seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) return (x, seq_len), y
  • 实现了迭代器协议,可以用for (x, seq_len), y in train_iter:遍历。

  • 每个 batch 返回((input_ids, seq_len), labels)


3. 模型定义(TextRNN.py

import torch.nn as nn class Model(nn.Module): def __init__(self,embedding_pretrained,n_vocab,embed,num_classes): super(Model,self).__init__() # 初始化嵌入层:优先使用预训练词向量,无则使用随机初始化 if embedding_pretrained is not None: # 加载预训练词向量,指定填充符索引,设置freeze=False允许微调 self.embedding = nn.Embedding.from_pretrained(embedding_pretrained,padding_idx=n_vocab-1,freeze=False) else: # 随机初始化嵌入层,输入维度为词汇表大小,输出维度为词向量维度 self.embedding = nn.Embedding(n_vocab,embed,padding_idx=n_vocab-1) # 定义双向LSTM层:输入维度=词向量维度,隐藏层维度=128,3层,批量优先, dropout=0.3防止过拟合 self.lstm = nn.LSTM( input_size=embed, # 词向量维度 hidden_size=128, # 记忆单元大小 num_layers=3, # 3层LSTM bidirectional=True, # 双向LSTM batch_first=True, dropout=0.3 # 防止过拟合 ) # 定义全连接层:输入维度=双向LSTM输出维度(128*2),输出维度=类别数量(四分类) self.fc = nn.Linear(128*2,num_classes) # 前向传播:定义模型的计算流程 def forward(self,x): # 提取输入文本的索引部分(x为元组,第一元素为文本索引) x,_ = x # 文本嵌入:将文本索引转换为词向量 out=self.embedding(x) # LSTM特征提取:输出包含所有时间步特征,此处暂不使用隐藏状态 out = self.lstm(out) # 取最后一个时间步的特征输入全连接层,输出分类结果 out=self.fc(out[:,-1,:]) return out

关键点

  • 使用预训练词向量(腾讯200维),freeze=True表示不更新。

  • LSTM 隐藏单元128,双向所以输出256维。

  • 取最后一个时间步的输出作为句子表示(LSTM已记住全句信息)。


4. 训练与验证(train_eval_test.py

# coding: UTF-8 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn import metrics #机器学习 import time def evaluate(class_list, model, data_iter, test=False):#验证集的处理, model.eval()#进入测试模型,将model的w设置为只读模式,w中的值都没被修改的权限,保护模型不被修改。 loss_total = 0 predict_all = np.array([], dtype=int) labels_all = np.array([], dtype=int) with torch.no_grad(): #一个上下文管理器,关闭梯度计算。当你确认不会调用Tensor.backward()的时候。这可以减少计算所用内存消耗。 for texts, labels in data_iter: outputs = model(texts)#它就是输出。 loss = F.cross_entropy(outputs, labels) loss_total += loss labels = labels.data.cpu().numpy() predic = torch.max(outputs.data, 1)[1].cpu().numpy()#代表的就是输出的结果 labels_all = np.append(labels_all, labels) predict_all = np.append(predict_all, predic) acc = metrics.accuracy_score(labels_all, predict_all) if test: report = metrics.classification_report(labels_all, predict_all, target_names=class_list, digits=4) return acc, loss_total / len(data_iter), report return acc, loss_total / len(data_iter) def test(model, test_iter , class_list): # test # model.load_state_dict(torch.load('TextRNN.ckpt')) model.eval()#进入测试模式,w只有读的权限 start_time = time.time()#当前的时间 test_acc, test_loss, test_report = evaluate(class_list, model, test_iter, test=True)# msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}' print(msg.format(test_loss, test_acc)) print(test_report) def train(model, train_iter, dev_iter, test_iter,class_list): model.train() #进入训练模式,允许model训练的权限,w optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)#优化器 total_batch = 0 # 记录进行到多少batch dev_best_loss = float('inf') #表示无穷大,inf 无穷大 last_improve = 0 # 记录上次验证集loss下降的batch数 flag = False # 记录是否很久没有效果提升 epochs = 20 #设置训练次数 for epoch in range(epochs): #训练次数 print('Epoch [{}/{}]'.format(epoch + 1, epochs)) for i, (trains, labels) in enumerate(train_iter):#(([23,34,..,13],79),2)__getitem__,里面包含__next__ #经过DatasetIterater中的_to_tensor返回的数据格式为:(x, seq_len), y outputs = model(trains) loss = F.cross_entropy(outputs, labels)# loss_fn = nn·)创建一个交叉熵损失函数层 model.zero_grad() loss.backward() optimizer.step() if total_batch % 100 == 0: # 每多少轮输出在训练集和验证集上的效果 predic = torch.max(outputs.data, 1)[1].cpu() train_acc = metrics.accuracy_score(labels.data.cpu(), predic)# dev_acc, dev_loss = evaluate(class_list, model, dev_iter) #将验证数据集传入模型,获得验证结果 if dev_loss < dev_best_loss: dev_best_loss = dev_loss #保存最优模型 torch.save(model.state_dict(), 'TextRNN.ckpt') last_improve = total_batch #保存最优模型的batch值 800batchs 21000 msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}' print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc)) model.train() total_batch += 1 if total_batch - last_improve > 10000: # 验证集loss超过1000batch没下降,结束训练 4000 14001 print("No optimization for a long time, auto-stopping...") flag = True if flag: break # writer.close() test(model, test_iter,class_list)
  • 每个 epoch 后计算验证集损失和准确率,保存最佳模型。


5. 主流程(main.py

# 导入所需库和自定义模块 import torch # 深度学习核心库 import numpy as np # 数值计算库 import load_dataset,TextRNN # 自定义数据加载、模型模块 from train_eval_test import train # 导入训练函数 # 自动选择运行设备(GPU优先,无则用CPU) device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" # 设置随机种子,保证实验结果可复现 np.random.seed(1)# 设置numpy随机种子 torch.manual_seed(1)# 设置torch随机种子 torch.cuda.manual_seed_all(1)# 设置所有CUDA设备随机种子 torch.backends.cudnn.deterministic=True# 固定cudnn算法,确保结果一致 # 加载预处理后的数据集(词汇表、训练/验证/测试集) vocab,train_data,dev_data,test_data = load_dataset.load_dataset('simplifyweibo_4_moods.csv') # 构建批次迭代器,每批128条数据,适配模型批量训练 train_iter = load_dataset.DatasetIterater(train_data,128,device)# 训练集迭代器 dev_iter = load_dataset.DatasetIterater(dev_data,128,device)# 验证集迭代器 test_iter = load_dataset.DatasetIterater(test_data,128,device)# 测试集迭代器 # 加载腾讯预训练词向量,转换为torch张量 embedding_pretrained = torch.tensor(np.load('embedding_Tencent.npz')["embedding"].astype('float32')) # 配置词向量维度(优先用预训练维度,无则默认200维) embed = embedding_pretrained.size(1) if embedding_pretrained is not None else 200# 词向量维度 # 定义情感类别,计算类别数量(四分类) class_list=['喜悦','愤怒','厌恶','低落'] num_classes = len(class_list) # 初始化TextRNN模型,转移到指定设备 model = TextRNN.Model(embedding_pretrained,len(vocab),embed,num_classes).to(device) # 启动模型训练、验证与测试 train(model,train_iter,dev_iter,test_iter,class_list)

6. 独立预测脚本(predict.py

加载保存的模型和词汇表,实现交互式预测:

import torch import pickle as pkl import numpy as np import TextRNN device = 'cuda' if torch.cuda.is_available() else 'cpu' def load_model_and_vocab(model_path, vocab_path='simplifyweibo_4_moods.pkl'): # 加载词汇表 with open(vocab_path, 'rb') as f: vocab = pkl.load(f) # 加载预训练词向量(需与训练时一致) embedding_pretrained = torch.tensor(np.load('embedding_Tencent.npz')['embeddings'].astype(np.float32)) embed = embedding_pretrained.size(1) num_classes = 4 model = TextRNN.Model(embedding_pretrained, len(vocab), embed, num_classes).to(device) model.load_state_dict(torch.load(model_path, map_location=device)) model.eval() return model, vocab def predict(sentence, model, vocab, pad_size=70): tokenizer = lambda x: [y for y in x] tokens = tokenizer(sentence) if len(tokens) < pad_size: tokens.extend(['<PAD>'] * (pad_size - len(tokens))) else: tokens = tokens[:pad_size] unk_idx = vocab.get('<UNK>', len(vocab)-2) input_ids = [vocab.get(word, unk_idx) for word in tokens] x = torch.LongTensor([input_ids]).to(device) seq_len = torch.LongTensor([len(tokens)]).to(device) with torch.no_grad(): outputs = model((x, seq_len)) pred = outputs.argmax(dim=1).item() class_list = ['喜悦', '愤怒', '厌恶', '低落'] return class_list[pred] if __name__ == '__main__': # 请根据实际保存的模型路径修改 model_path = 'D:/兰智/dlproject/RNN/best_model.pth' model, vocab = load_model_and_vocab(model_path) while True: text = input("请输入一句话(输入 q 退出):") if text.lower() == 'q': break result = predict(text, model, vocab) print(f"预测情感:{result}\n")

结果展示:


六、核心概念解析

1.<UNK><PAD>的作用

  • <UNK>:当遇到词汇表中不存在的字符时,用该标记代替,保证模型输入始终有效。

  • <PAD>:将所有句子填充到相同长度(如70),以便批量输入 LSTM。在嵌入层设置padding_idx可使这些位置的梯度为0,不影响训练。

2. 为什么取 LSTM 最后一个时间步?

LSTM 具有记忆能力,最后一个隐藏状态理论上已经编码了整句话的信息。对于情感分类这类序列级任务,取最后一步是简单有效的做法。若句子较长,可改用全局平均池化或注意力机制。

3. 预训练词向量 vs 随机初始化

预训练词向量(如腾讯800万词向量)在大规模语料上学习到词的语义关系,能显著提升小数据集上的泛化能力。本项目中使用200维版本,freeze=True保持其固定。


七、运行步骤

  1. 准备数据:将simplifyweibo_4_moods.csv放在项目根目录。

  2. 构建词汇表python save_vocab.py→ 生成simplifyweibo_4_moods.pkl

  3. 准备预训练向量:下载腾讯200维词向量,保存为embedding_Tencent.npz(需自己转换格式,保证embeddings键对应的数组)。

  4. 训练模型python main.py→ 训练5个epoch,保存最佳模型为best_model.pth

  5. 交互预测python predict.py→ 输入句子实时得到情感。


八、总结

本项目完整实现了一个基于双向LSTM的中文情感分类系统,涵盖:

  • 字符级词汇表构建

  • 序列填充与批处理迭代器

  • 预训练词向量加载

  • 多层双向LSTM模型

  • 训练、验证、模型保存

  • 单句与交互式预测

该框架稍作修改即可用于其他文本分类任务(如垃圾邮件识别、主题分类等)。希望这篇博客能帮助你理解LSTM文本分类的全流程。


附:改进方向

  • 使用pack_padded_sequence忽略填充位置

  • 引入注意力机制代替最后时间步

  • 尝试不同词向量(如BERT)

  • 超参数调优(学习率、LSTM层数、dropout等)

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/21 8:24:15

从数组到Switch:两种C51代码实现按键控制LED,哪种更适合你的项目?

从数组到Switch&#xff1a;两种C51代码实现按键控制LED的工程化思考 在嵌入式开发中&#xff0c;按键控制LED是最基础的人机交互实现方式之一。当我们需要依次点亮8个LED时&#xff0c;新手开发者往往会纠结于实现方案的选择——是用数组查表法简洁明了&#xff0c;还是采用sw…

作者头像 李华
网站建设 2026/4/21 8:20:20

Zotero-GPT深度解析:AI驱动的文献智能处理技术架构与实战指南

Zotero-GPT深度解析&#xff1a;AI驱动的文献智能处理技术架构与实战指南 【免费下载链接】zotero-gpt GPT Meet Zotero. 项目地址: https://gitcode.com/gh_mirrors/zo/zotero-gpt Zotero-GPT是一个基于GPT技术的Zotero插件&#xff0c;通过AI大模型实现文献摘要生成、…

作者头像 李华
网站建设 2026/4/21 8:19:53

GTE-large实战教程:Prometheus+Grafana监控GPU显存/请求延迟/错误率

GTE-large实战教程&#xff1a;PrometheusGrafana监控GPU显存/请求延迟/错误率 1. 监控需求与方案概述 在现代AI应用部署中&#xff0c;实时监控系统状态至关重要。对于基于GTE-large文本向量模型的多任务Web应用&#xff0c;我们需要重点关注三个核心指标&#xff1a; GPU显…

作者头像 李华
网站建设 2026/4/21 8:13:21

哔哩下载姬DownKyi:5分钟掌握B站视频下载与处理的完整方案

哔哩下载姬DownKyi&#xff1a;5分钟掌握B站视频下载与处理的完整方案 【免费下载链接】downkyi 哔哩下载姬downkyi&#xff0c;哔哩哔哩网站视频下载工具&#xff0c;支持批量下载&#xff0c;支持8K、HDR、杜比视界&#xff0c;提供工具箱&#xff08;音视频提取、去水印等&a…

作者头像 李华
网站建设 2026/4/21 8:11:57

lory.js 测试与调试:确保轮播组件稳定运行

lory.js 测试与调试&#xff1a;确保轮播组件稳定运行 【免费下载链接】lory ☀ Touch enabled minimalistic slider written in vanilla JavaScript. 项目地址: https://gitcode.com/gh_mirrors/lo/lory lory.js 是一款轻量级的原生 JavaScript 轮播组件&#xff0c;以…

作者头像 李华