python中文文本分类算法总结汇总
程序员文章站
2023-11-29 11:16:22
这是我将文本分类算法作为大包装,然后得到的作品。1.数据预处理2.文本分类算法模型管道搭建。(1)机器学习模型(2)深度学习模型...
这是我将机器学习与深度学习各类算法进行汇总,对京东评论数据集进行文本分类。包含各种机器学习算法(sklearn实现)与常用文本分类模型(keras、keras-bert、fastnlp)实现。这算是我自己能力的一个总结。以后做中文文本分类就靠这个大杀器了。
项目源码地址在这里:
github的:
https://github.com/yingdajun/TextClassifierMLAndDLByBertOrFastnlpDemo
码云的:
https://gitee.com/www.ydj.com/TextClassifierMLAndDLByBertOrFastnlpDemo
使用工具:
anconda 中jupyter notebook ,jieba 0.39,sklearn 0.22,keras 2.3.1,torch 1.01,fastnlp 0.5.5
# 机器学习部分
1.数据预处理
读取数据并且将当前数据进行清理的函数,这个是针对于机器学习的预处理
import numpy as np
import re
import pandas as pd
# clean useless characters
'''
html_clean = ['& ldquo ;', '& hellip ;', '& rdquo ;', '& yen ;']
punctuation_replace = '[,。!?]+'
strange_num = ['①','②','③','④']
'''
punctuation_remove = '[:;……()『』《》【】~!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~]+'
def clean(sent):
sent = re.sub(r'ldquo', "", sent)
sent = re.sub(r'hellip', "", sent)
sent = re.sub(r'rdquo', "", sent)
sent = re.sub(r'yen', "", sent)
sent = re.sub(r'⑦', "7", sent)
sent = re.sub(r'(, ){2,}', "", sent)
sent = re.sub(r'(! ){2,}', "", sent) # delete too many!,?,。等
sent = re.sub(r'(? ){2,}', "", sent)
sent = re.sub(r'(。 ){2,}', "", sent)
sent = re.sub(punctuation_remove, "", sent) #delete punctuations
s = ' '.join(sent.split()) #delete additional space
return s
def sent_filter(l):
l_new = []
for s,k in enumerate(l):
if len(k) > 2:
l_new.append(k)
return l_new
# 这里是深度学习模式下的读取数据集
def dl_load_data_and_labels(good_data_file, bad_data_file, mid_data_file):
#load reviews and save them in the list
good_examples = list(open(good_data_file, "r", encoding='utf-8').readlines())
good_examples = [s.strip() for s in good_examples]
bad_examples = list(open(bad_data_file, "r", encoding='utf-8').readlines())
bad_examples = [s.strip() for s in bad_examples]
mid_examples = list(open(mid_data_file, "r", encoding='utf-8').readlines())
mid_examples = [s.strip() for s in mid_examples]
#Call the clean () and sent_filter () functions to process the comments, save them in the x_text list
good_examples = [clean(sent) for sent in good_examples]
bad_examples = [clean(sent) for sent in bad_examples]
mid_examples = [clean(sent) for sent in mid_examples]
good_examples = [i.strip() for i in good_examples]
bad_examples = [i.strip() for i in bad_examples]
mid_examples = [i.strip() for i in mid_examples]
good_examples = sent_filter(good_examples)
bad_examples = sent_filter(bad_examples)
mid_examples = sent_filter(mid_examples)
x_text = good_examples + bad_examples + mid_examples
#Add a label for each comment and save it in y
good_labels = [[1, 0, 0] for _ in good_examples]
bad_labels = [[0, 1, 0] for _ in bad_examples]
mid_labels = [[0, 0, 1] for _ in mid_examples]
y = np.concatenate([good_labels, bad_labels, mid_labels], 0)
return [x_text, y]
# 机器学习模式下的读取到的数据集
def ml_load_data_and_labels(good_data_file, bad_data_file, mid_data_file):
#load reviews and save them in the list
good_examples = list(open(good_data_file, "r", encoding='utf-8').readlines())
good_examples = [s.strip() for s in good_examples]
bad_examples = list(open(bad_data_file, "r", encoding='utf-8').readlines())
bad_examples = [s.strip() for s in bad_examples]
mid_examples = list(open(mid_data_file, "r", encoding='utf-8').readlines())
mid_examples = [s.strip() for s in mid_examples]
#Call the clean () and sent_filter () functions to process the comments, save them in the x_text list
good_examples = [clean(sent) for sent in good_examples]
bad_examples = [clean(sent) for sent in bad_examples]
mid_examples = [clean(sent) for sent in mid_examples]
good_examples = [i.strip() for i in good_examples]
bad_examples = [i.strip() for i in bad_examples]
mid_examples = [i.strip() for i in mid_examples]
good_examples = sent_filter(good_examples)
bad_examples = sent_filter(bad_examples)
mid_examples = sent_filter(mid_examples)
x_text = good_examples + bad_examples + mid_examples
#Add a label for each comment and save it in y
good_labels = [0 for _ in good_examples]
bad_labels = [1 for _ in bad_examples]
mid_labels = [2 for _ in mid_examples]
y = np.concatenate([good_labels, bad_labels, mid_labels], 0)
return [x_text, y]
# when you use tensorflow, you need to generate batches yourself, this function may helpe you
def batch_iter(data, batch_size, num_epochs, shuffle=True):
"""
Generates a batch iterator for a dataset.
"""
data = np.array(data)
data_size = len(data)
num_batches_per_epoch = int((len(data)-1)/batch_size) + 1
for epoch in range(num_epochs):
# Shuffle the data at each epoch
if shuffle:
shuffle_indices = np.random.permutation(np.arange(data_size))
shuffled_data = data[shuffle_indices]
else:
shuffled_data = data
for batch_num in range(num_batches_per_epoch):
start_index = batch_num * batch_size
end_index = min((batch_num + 1) * batch_size, data_size)
yield shuffled_data[start_index:end_index]
2 数据读取
good_data_file = "./data/good_cut_jieba.txt"
bad_data_file = "./data/bad_cut_jieba.txt"
mid_data_file = "./data/mid_cut_jieba.txt"
x_text, y = ml_load_data_and_labels(good_data_file, bad_data_file, mid_data_file)
print(y)
3.导入机器学习包与导入停用词
import sklearn
#机器学习算法模型
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier,BaggingClassifier,AdaBoostClassifier
from sklearn.svm import SVC,LinearSVC
from sklearn.naive_bayes import BernoulliNB,MultinomialNB,GaussianNB
from sklearn.neighbors import KNeighborsClassifier
# 特征提取
from sklearn.feature_extraction.text import CountVectorizer,TfidfVectorizer
from sklearn.model_selection import train_test_split
#Pipeline 使用一系列 (key, value) 键值对来构建,其中 key 是你给这个步骤起的名字, value 是一个评估器对象:
from sklearn.pipeline import Pipeline
#准确率,精确率,召回率,f1
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,classification_report
import xgboost as xgb
import joblib
# 读取停用词
stwlist=[line.strip() for line in open('stopword.txt','r',encoding='utf-8').readlines()]
4.文本向量化工具
# 创建各类cv=CountVectorizer()和tf_idf工具
cv=CountVectorizer(min_df=3,
max_df=0.5,
ngram_range=(1,2),
stop_words = stwlist)
tdf=TfidfVectorizer()
5.各种管道模型
注意高斯模型并不能接受太多大维度的矩阵,因此需要将向量化的矩阵加上todesnse,但是速度很慢
#%%
# Count vectoriser --> LogisticRegression()
# 分类模型
#1.逻辑回归
lr=LogisticRegression()
# 贝叶斯
#2.多项式贝叶斯
mb=MultinomialNB()
gb=GaussianNB()
#3.伯努利贝叶斯
bb=BernoulliNB()
# 支持向量机
#4.支持向量机
svc=SVC(kernel='rbf')
svc1=SVC(kernel='linear')
svc2=SVC(kernel='poly')
svc3=SVC(kernel='sigmoid')
#5.
linearsvc=LinearSVC()
#6.决策树
dtc=DecisionTreeClassifier(random_state=22)
#7.随机森林
rfc=RandomForestClassifier(random_state=22)
#9.KNN分类器
knn=KNeighborsClassifier()
modelList=[lr,mb,bb,svc,svc1,svc2,svc3,linearsvc,dtc,rfc,knn]
#11个模型
m_len=len(modelList)
# # 形成 9个模型 2个提取特征 5个指标
# # 提取特征分类器
textVectoriser=[cv,tdf]
textv_len=len(textVectoriser)
new_ticks = []
name=[]
# modelNamelist=['逻辑回归','多项式贝叶斯','伯努利贝叶斯','高斯贝叶斯','RBF核SVM'
# ,'线性核SVM','多项式核SVM','sigmoid核SVM'
# ,'线性分类SVM','决策树','随机森林','KNN']
# modelNamelist2=['lr','mb','gb','bb','svc','svc1','svc2','svc3','l'+'\n'+'svc','dtc','rfc','knn']
modelNamelist=['逻辑回归','多项式贝叶斯','伯努利贝叶斯','RBF核SVM'
,'线性核SVM','多项式核SVM','sigmoid核SVM'
,'线性分类SVM','决策树','随机森林','KNN']
modelNamelist2=['lr','mb','bb','svc','svc1','svc2','svc3','l'+'\n'+'svc','dtc','rfc','knn']
# textVectorNamelist = ['词袋','TDF']
for i in range(m_len):
new_ticks.append([modelNamelist2[i]])
name.append(modelNamelist[i])
name_dict={"name":modelNamelist,"model":modelList}
label_dict={"name":modelNamelist2,"model":modelList}
accuracy_score_list=[]
# 莫名其妙搞不出来
for i in range(m_len):
for j in range(textv_len):
# pipeline =make_pipeline(textVectoriser[j], modelList[i])
train_vec=textVectoriser[j].fit_transform(x_train)
test_vec=textVectoriser[j].transform(x_test)
# train_vec_dense=train_vec.todense()
# test_vec_dense=test_vec.todense()
modelList[i].fit(train_vec,y_train)
# print(pipeline)
# pred=pipeline.predict(x_test)
pred=modelList[i].predict(test_vec)
# modelList[i].fit(x_train,y_train)
# pred=modelList[i].predict(x_test)
# pred=pipeline.predict(np.array(X_test).reshape(-1,1))
print('='*150)
if(j==0):
print('当前模型是:',modelNamelist[i],'当前文本向量化是','词袋',"当前准确率是:",round(accuracy_score(y_test,pred),5))
if(j==1):
print('当前模型是:',modelNamelist[i],'当前文本向量化是','TF-IDF',"当前准确率是:",round(accuracy_score(y_test,pred),5))
# fpr[i], tpr[i], _ = roc_curve(y_test, y_pred)
# roc_auc[i] = auc(fpr[i], tpr[i])
accuracy_score_list.append(round(accuracy_score(y_test,pred),5))
#准确率这些是不支持二分类以上的分类的
# precision_score_list.append(precision_score(y_test,pred))
# f1_score_list.append(f1_score(y_test,pred))
# recall_score_list.append(f1_score(y_test,pred))
# #获取标签与最后结果
# fpr[i], tpr[i], _ = roc_curve(y_test, pred)
# roc_auc[i] = auc(fpr[i], tpr[i])
# fpr, tpr, thresholds = roc_curve(y_test, pred, pos_label=2)
# # fpr,tpr,thresholds=roc_curve(y_test,y_0) #计算fpr,tpr,thresholds
# # auc=roc_auc_score(y_test,y_0) #计算auc
# #画曲线图
# plt.figure()
# plt.plot(fpr,tpr)
# plt.title('$ROC curve$')
# plt.show()
# classification_report_list.append(classification_report(y_test,pred))
# modelClass.append(pipeline)
xgboost也可以
import xgboost as xgb
# 算法参数
# 应该是用于分类
params = {
'booster': 'gbtree',
'objective': 'multi:softmax',
'num_class': 3,
'gamma': 0.1,
'max_depth': 6,
'lambda': 2,
'subsample': 0.7,
'colsample_bytree': 0.75,
'min_child_weight': 3,
'silent': 0,
'eta': 0.1,
'seed': 1,
'nthread': 4,
}
for i in range(m_len):
for j in range(textv_len):
# pipeline =make_pipeline(textVectoriser[j], modelList[i])
train_vec=textVectoriser[j].fit_transform(x_train)
test_vec=textVectoriser[j].transform(x_test)
plst = params.items()
dtrain = xgb.DMatrix(train_vec, y_train) # 生成数据集格式
num_rounds = 500
model = xgb.train(plst, dtrain, num_rounds) # xgboost模型训练
# 对测试集进行预测
dtest = xgb.DMatrix(test_vec)
pred = model.predict(dtest)
# 计算准确率
accuracy = accuracy_score(y_test,y_pred)
# print('当前是xgboost')
print('='*150)
if(j==0):
print('当前模型是:xgboost','当前文本向量化是','词袋',"当前准确率是:",round(accuracy_score(y_test,pred),5))
if(j==1):
print('当前模型是:xgboost','当前文本向量化是','TF-IDF',"当前准确率是:",round(accuracy_score(y_test,pred),5))
# fpr[i], tpr[i], _ = roc_curve(y_test, y_pred)
# roc_auc[i] = auc(fpr[i], tpr[i])
modelNamelist.append('xgboost')
accuracy_score_list.append(round(accuracy,5))
print("accuarcy: %.2f%%" % (accuracy*100.0))
绘制图像
import matplotlib.pyplot as plt
from pylab import *
mpl.rcParams['font.sans-serif'] = ['SimHei']
#plot根据列表绘制出有意义的图形,linewidth是图形线宽,可省略
# plt.plot(input_values,squares,linewidth=5)
plt.figure(figsize=(12,5),dpi=80)
plt.bar(range(len(accuracy_score_list)),accuracy_score_list,linewidth=5)
#设置图标标题
plt.title("不同管道模型准确率",fontsize = 24)
#设置坐标轴标签
plt.xlabel("模型类型",fontsize = 0.2)
plt.ylabel("准确率",fontsize = 0.5)
#设置刻度标记的大小
plt.tick_params(axis='both',labelsize = 14)
#打开matplotlib查看器,并显示绘制图形
#这是一半
plt.xticks(range(new_ticks),new_ticks)
plt.show()
#深度学习模型
数据处理
# 保证映射后结构一样
from keras.preprocessing.sequence import pad_sequences
# 文本预处理
from keras.preprocessing.text import Tokenizer
# 将类别映射成需要的格式
from keras.utils.np_utils import to_categorical
# 这个是连接层
from keras.layers.merge import concatenate
# 搭建模型
from keras.models import Sequential, Model
# 这个是层的搭建
from keras.layers import Dense, Embedding, Activation, Input
from keras.layers import Convolution1D, Flatten, Dropout, MaxPool1D
from keras.layers import BatchNormalization
from keras.layers import Conv1D,MaxPooling1D
# 导入使用到的库
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.layers.merge import concatenate
from keras.models import Sequential, Model
from keras.layers import Dense, Embedding, Activation, merge, Input, Lambda, Reshape
from keras.layers import Convolution1D, Flatten, Dropout, MaxPool1D, GlobalAveragePooling1D
from keras.layers import LSTM, GRU, TimeDistributed, Bidirectional
from keras.utils.np_utils import to_categorical
from keras import initializers
from keras import backend as K
from keras.engine.topology import Layer
# from sklearn.naive_bayes import MultinomialNB
# from sklearn.linear_model import SGDClassifier
# from sklearn.feature_extraction.text import TfidfVectorizer
# import pandas as pd
# import numpy as np
# 数据处理
# from data_helper_ml import load_data_and_labels
# 数据可视化
import matplotlib.pyplot as plt
# 文本标签分类数量
NUM_CLASS=3
# 输入维度
INPUT_SIZE=64
# # 序列对齐文本数据
# LENTH=100
# Tokenizer是一个用于向量化文本,或将文本转换为序列
tokenizer = Tokenizer(filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',lower=True,split=" ")
tokenizer.fit_on_texts(x_text)
vocab = tokenizer.word_index
#映射成数字
x_train_word_ids = tokenizer.texts_to_sequences(x_train)
x_test_word_ids = tokenizer.texts_to_sequences(x_test)
#让他共同化
x_train_padded_seqs = pad_sequences(x_train_word_ids, maxlen=INPUT_SIZE)
x_test_padded_seqs = pad_sequences(x_test_word_ids, maxlen=INPUT_SIZE)
CNN
def cnn():
model = Sequential()
model.add(Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE)) #使用Embeeding层将每个词编码转换为词向量
model.add(Conv1D(256, 5, padding='same'))
model.add(MaxPooling1D(3, 3, padding='same'))
model.add(Conv1D(128, 5, padding='same'))
model.add(MaxPooling1D(3, 3, padding='same'))
model.add(Conv1D(64, 3, padding='same'))
model.add(Flatten())
model.add(Dropout(0.1))
model.add(BatchNormalization()) # (批)规范化层
model.add(Dense(256, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(NUM_CLASS, activation='softmax'))
model.summary()
textCNN
def textCNN():
main_input = Input(shape=(64,), dtype='float64')
# 词嵌入(使用预训练的词向量)
embedder = Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE, trainable=False)
embed = embedder(main_input)
# 词窗大小分别为3,4,5
cnn1 = Conv1D(256, 3, padding='same', strides=1, activation='relu')(embed)
cnn1 = MaxPooling1D(pool_size=48)(cnn1)
cnn2 = Conv1D(256, 4, padding='same', strides=1, activation='relu')(embed)
cnn2 = MaxPooling1D(pool_size=47)(cnn2)
cnn3 = Conv1D(256, 5, padding='same', strides=1, activation='relu')(embed)
cnn3 = MaxPooling1D(pool_size=46)(cnn3)
# 合并三个模型的输出向量
cnn = concatenate([cnn1, cnn2, cnn3], axis=-1)
flat = Flatten()(cnn)
drop = Dropout(0.2)(flat)
main_output = Dense(NUM_CLASS, activation='softmax')(drop)
model = Model(inputs=main_input, outputs=main_output)
model.summary()
# 使用Word2Vec词向量的TextCNN
# w2v_model=Word2Vec.load('sentiment_analysis/w2v_model.pkl')
# # 预训练的词向量中没有出现的词用0向量表示
# embedding_matrix = np.zeros((len(vocab) + 1, 300))
# for word, i in vocab.items():
# try:
# embedding_vector = w2v_model[str(word)]
# embedding_matrix[i] = embedding_vector
# except KeyError:
# continue
# #构建TextCNN模型
# def TextCNN_model_2():
# # 模型结构:词嵌入-卷积池化*3-拼接-全连接-dropout-全连接
# main_input = Input(shape=(INPUT_SIZE,), dtype='float64')
# # 词嵌入(使用预训练的词向量)
# embedder = Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE, weights=[embedding_matrix], trainable=False)
# #embedder = Embedding(len(vocab) + 1, 300, input_length=50, trainable=False)
# embed = embedder(main_input)
# # 词窗大小分别为3,4,5
# cnn1 = Conv1D(256, 3, padding='same', strides=1, activation='relu')(embed)
# cnn1 = MaxPooling1D(pool_size=38)(cnn1)
# cnn2 = Conv1D(256, 4, padding='same', strides=1, activation='relu')(embed)
# cnn2 = MaxPooling1D(pool_size=37)(cnn2)
# cnn3 = Conv1D(256, 5, padding='same', strides=1, activation='relu')(embed)
# cnn3 = MaxPooling1D(pool_size=36)(cnn3)
# # 合并三个模型的输出向量
# cnn = concatenate([cnn1, cnn2, cnn3], axis=-1)
# flat = Flatten()(cnn)
# drop = Dropout(0.2)(flat)
# main_output = Dense(3, activation='softmax')(drop)
# model = Model(inputs=main_input, outputs=main_output)
# model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# one_hot_labels = keras.utils.to_categorical(y_train, num_classes=NUM_CLASS) # 将标签转换为one-hot编码
# # model.fit(x_train_padded_seqs, one_hot_labels, batch_size=800, epochs=20)
# # #y_test_onehot = keras.utils.to_categorical(y_test, num_classes=3) # 将标签转换为one-hot编码
# # result = model.predict(x_test_padded_seqs) # 预测样本属于每个类别的概率
# # result_labels = np.argmax(result, axis=1) # 获得最大概率对应的标签
# # y_predict = list(map(str, result_labels))
# # print('准确率', metrics.accuracy_score(y_test, y_predict))
# # print('平均f1-score:', metrics.f1_score(y_test, y_predict, average='weighted'))
RNN
def rnn():
# 模型结构:词嵌入-LSTM-全连接
model = Sequential()
model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE))
model.add(LSTM(256, dropout=0.2, recurrent_dropout=0.1))
model.add(Dense(NUM_CLASSM, activation='softmax'))
model.summary()
Bi-GRU
def digru():
# 模型结构:词嵌入-双向GRU*2-全连接
model = Sequential()
# 64是序列号
model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE))
model.add(Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1, return_sequences=True)))
model.add(Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1)))
model.add(Dense(NUM_CLASSM_C, activation='softmax'))
model.summary()
CNN+RNN 串联
def clstm():
# 模型结构:词嵌入-卷积池化-GRU*2-全连接
model = Sequential()
model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE))
model.add(Convolution1D(256, 3, padding='same', strides = 1))
model.add(Activation('relu'))
model.add(MaxPool1D(pool_size=2))
model.add(GRU(256, dropout=0.2, recurrent_dropout=0.1, return_sequences = True))
model.add(GRU(256, dropout=0.2, recurrent_dropout=0.1))
model.add(Dense(NUM_CLASS, activation='softmax'))
model.summary()
CNN+RNN 并联
def blstm():
# 模型结构:词嵌入-卷积池化-全连接 ---拼接-全连接
# -双向GRU-全连接
main_input = Input(shape=(INPUT_SIZE,), dtype='float64')
embed = Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE)(main_input)
cnn = Convolution1D(256, 3, padding='same', strides = 1, activation='relu')(embed)
cnn = MaxPool1D(pool_size=4)(cnn)
cnn = Flatten()(cnn)
cnn = Dense(256)(cnn)
rnn = Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1))(embed)
rnn = Dense(256)(rnn)
con = concatenate([cnn,rnn], axis=-1)
main_output = Dense(NUM_CLASS, activation='softmax')(con)
model = Model(inputs = main_input, outputs = main_output)
fasttext
# 模型结构:词嵌入(n-gram)-最大化池化-全连接
# 生成n-gram组合的词(以3为例)
ngram = 3
# 将n-gram词加入到词表
def create_ngram(sent, ngram_value):
return set(zip(*[sent[i:] for i in range(ngram_value)]))
ngram_set = set()
for sentence in x_train_padded_seqs:
for i in range(2, ngram+1):
set_of_ngram = create_ngram(sentence, i)
ngram_set.update(set_of_ngram)
# 给n-gram词汇编码
start_index = len(vocab) + 2
token_indice = {v: k + start_index for k, v in enumerate(ngram_set)} # 给n-gram词汇编码
indice_token = {token_indice[k]: k for k in token_indice}
max_features = np.max(list(indice_token.keys())) + 1
# 将n-gram词加入到输入文本的末端
def add_ngram(sequences, token_indice, ngram_range):
new_sequences = []
for sent in sequences:
new_list = sent[:]
for i in range(len(new_list) - ngram_range + 1):
for ngram_value in range(2, ngram_range + 1):
ngram = tuple(new_list[i:i + ngram_value])
if ngram in token_indice:
new_list.append(token_indice[ngram])
new_sequences.append(new_list)
return new_sequences
x_train = add_ngram(x_train_word_ids, token_indice, ngram)
x_test = add_ngram(x_test_word_ids, token_indice, ngram)
# x_train = pad_sequences(x_train, maxlen=25)
# x_test = pad_sequences(x_test, maxlen=25)
x_train_padded_seqs = pad_sequences(x_train_word_ids, maxlen=INPUT_SIZE)
x_test_padded_seqs = pad_sequences(x_test_word_ids, maxlen=INPUT_SIZE)
def fasttext():
model = Sequential()
model.add(Embedding(max_features, 300, input_length=INPUT_SIZE))
model.add(GlobalAveragePooling1D())
model.add(Dense(NUM_CLASS, activation='softmax'))
model.summary()
训练模型:
model=cnn() #rnn
model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
one_hot_labels = to_categorical(y_train, num_classes=NUM_CLASS) # 将标签转换为one-hot编码
# one_hot_labels=y_train
model.fit(x_train_padded_seqs, one_hot_labels,epochs=5, batch_size=800)
y_predict = model.predict_classes(x_test_padded_seqs) # 预测的是类别,结果就是类别号
y_predict = list(map(str, y_predict))
keras_bert
import pandas as pd
import codecs, gc
import numpy as np
from sklearn.model_selection import KFold
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from keras.metrics import top_k_categorical_accuracy
from keras.layers import *
from keras.callbacks import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam
from keras.utils import to_categorical
maxlen = INPUT_SIZE #设置序列长度为120,要保证序列长度不超过512
#预训练好的模型
# 还是放在原有样本中
# path=os.path.join(dirpath,config_path)
# os.path.join()
config_path = 'bert_config.json'
# config_path=os.path.join(dirpath,config_path)
checkpoint_path = 'bert_model.ckpt'
# checkpoint_path=os.path.join(dirpath,checkpoint_path)
dict_path = 'vocab.txt'
# dict_path=os.path.join(dirpath,checkpoint_path)
#将词表中的词编号转换为字典
token_dict = {}
with codecs.open(dict_path, 'r', 'utf8') as reader:
for line in reader:
token = line.strip()
token_dict[token] = len(token_dict)
#重写tokenizer
class OurTokenizer(Tokenizer):
def _tokenize(self, text):
R = []
for c in text:
if c in self._token_dict:
R.append(c)
elif self._is_space(c):
R.append('[unused1]') # 用[unused1]来表示空格类字符
else:
R.append('[UNK]') # 不在列表的字符用[UNK]表示
return R
tokenizer = OurTokenizer(token_dict)
#让每条文本的长度相同,用0填充
def seq_padding(X, padding=0):
L = [len(x) for x in X]
ML = max(L)
return np.array([
np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
])
#data_generator只是一种为了节约内存的数据方式
class data_generator:
def __init__(self, data, batch_size=32, shuffle=True):
self.data = data
self.batch_size = batch_size
self.shuffle = shuffle
self.steps = len(self.data) // self.batch_size
if len(self.data) % self.batch_size != 0:
self.steps += 1
def __len__(self):
return self.steps
def __iter__(self):
while True:
idxs = list(range(len(self.data)))
if self.shuffle:
np.random.shuffle(idxs)
X1, X2, Y = [], [], []
for i in idxs:
d = self.data[i]
text = d[0][:maxlen]
x1, x2 = tokenizer.encode(first=text)
y = d[1]
X1.append(x1)
X2.append(x2)
Y.append([y])
if len(X1) == self.batch_size or i == idxs[-1]:
X1 = seq_padding(X1)
X2 = seq_padding(X2)
Y = seq_padding(Y)
yield [X1, X2], Y[:, 0, :]
[X1, X2, Y] = [], [], []
#bert模型设置
def build_bert(nclass):
bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) #加载预训练模型
for l in bert_model.layers:
l.trainable = True
x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))
x = bert_model([x1_in, x2_in])
x = Lambda(lambda x: x[:, 0])(x) # 取出[CLS]对应的向量用来做分类
p = Dense(nclass, activation='softmax')(x)
model = Model([x1_in, x2_in], p)
model.compile(loss='categorical_crossentropy',
optimizer=Adam(1e-5), #用足够小的学习率
metrics=['accuracy', acc_top2])
print(model.summary())
return model
#计算top-k正确率,当预测值的前k个值中存在目标类别即认为预测正确
def acc_top2(y_true, y_pred):
return top_k_categorical_accuracy(y_true, y_pred, k=2)
#训练数据、测试数据和标签转化为模型输入格式
DATA_LIST = []
for data_row in train_df1.iloc[:].itertuples():
DATA_LIST.append((xtrain, to_categorical(ytraim, NUM_CLASS)))
DATA_LIST = np.array(DATA_LIST)
DATA_LIST_TEST = []
for data_row in test_df1.iloc[:].itertuples():
DATA_LIST_TEST.append((xtest, to_categorical(0, NUM_CLASS)))
DATA_LIST_TEST = np.array(DATA_LIST_TEST)
#交叉验证训练和测试模型
def run_cv(nfold, data, data_labels, data_test):
kf = KFold(n_splits=nfold, shuffle=True, random_state=520).split(data)
train_model_pred = np.zeros((len(data), 3))
test_model_pred = np.zeros((len(data_test), 3))
for i, (train_fold, test_fold) in enumerate(kf):
X_train, X_valid, = data[train_fold, :], data[test_fold, :]
model = build_bert(NUM_CLASS)
early_stopping = EarlyStopping(monitor='val_acc', patience=3) #早停法,防止过拟合
plateau = ReduceLROnPlateau(monitor="val_acc", verbose=1, mode='max', factor=0.5, patience=2) #当评价指标不在提升时,减少学习率
checkpoint = ModelCheckpoint('./bert_dump/' + str(i) + '.hdf5', monitor='val_acc',verbose=2, save_best_only=True, mode='max', save_weights_only=True) #保存最好的模型
train_D = data_generator(X_train, shuffle=True)
valid_D = data_generator(X_valid, shuffle=True)
test_D = data_generator(data_test, shuffle=False)
#模型训练
model.fit_generator(
train_D.__iter__(),
steps_per_epoch=len(train_D),
epochs=2,
validation_data=valid_D.__iter__(),
validation_steps=len(valid_D),
callbacks=[early_stopping, plateau, checkpoint],
)
# model.load_weights('./bert_dump/' + str(i) + '.hdf5')
# return model
train_model_pred[test_fold, :] = model.predict_generator(valid_D.__iter__(), steps=len(valid_D), verbose=1)
test_model_pred += model.predict_generator(test_D.__iter__(), steps=len(test_D), verbose=1)
del model
gc.collect() #清理内存
K.clear_session() #clear_session就是清除一个session
# break
return train_model_pred, test_model_pred
# import os
# print(os.getcwd()) #打印出当前工作路径
# 很容易就崩溃了
#n折交叉验证
train_model_pred, test_model_pred = run_cv(2, DATA_LIST, None, DATA_LIST_TEST)
(3)fastnlp部分:
先将当前文本进行读取然后导出成fastnlp需要的样子。
# 数据分割
from sklearn.model_selection import train_test_split
# 数据管道
from sklearn.pipeline import Pipeline,make_pipeline
# 数据分割
x_train, x_test, y_train, y_test = train_test_split(x_text, y, test_size=0.2, random_state=2017)
data_dict1={"raw_words":x_train,"target":y_train}
data_dict2={"raw_words":x_test}
df_train=pd.DataFrame(data_dict1)
df_train.head()
# 为fastnlp做准备
df_train.to_csv('train.txt',sep='\t', index=False,header=None,encoding='utf-8')
df_test=pd.DataFrame(data_dict2)
df_test.head()
# 为fastnlp做准备
df_test.to_csv('test.txt',sep='\t', index=False,header=None,encoding='utf-8')
#导入Pytorch包
import torch
import torch.nn as nn
from fastNLP.io.loader import CSVLoader
dataset_loader = CSVLoader(headers=('raw_words','target'), sep='\t')
testset_loader = CSVLoader( headers=['raw_words'],sep='\t')
# 表示将CSV文件中每一行的第一项将填入'raw_words' field,第二项填入'target' field。
# 其中项之间由'\t'分割开来
train_path=r'train.txt'
test_path=r'test.txt'
dataset = dataset_loader._load(train_path)
testset = testset_loader._load(test_path)
# 将句子分成单词形式, 详见DataSet.apply()方法
import jieba
from itertools import chain
print(jieba.__version__)
# from itertools import chain
# '''
# @params:
# data: 数据的列表,列表中的每个元素为 [文本字符串,0/1标签] 二元组
# @return: 切分词后的文本的列表,列表中的每个元素为切分后的词序列
# '''
def get_tokenized(data,words=True):
def tokenizer(text):
return [tok for tok in jieba.cut(text, cut_all=False)]
if words:
#按词语进行编码
return tokenizer(data)
else:
#按字进行编码
return [tokenizer(review) for review in data]
print(dataset)
dataset.apply(lambda ins:get_tokenized(ins['raw_words']), new_field_name='words', is_input=True)
print(dataset)
dataset.apply(lambda ins: len(ins['words']) ,new_field_name='seq_len', is_input=True)
print(dataset)
dataset.apply(lambda x: int(x['target']), new_field_name='target', is_target=True)
print(dataset)
#testset.apply(lambda ins: list(chain.from_iterable(get_tokenized(ins['raw_words']))), new_field_name='words', is_input=True)
testset.apply(lambda ins: get_tokenized(ins['raw_words']), new_field_name='words', is_input=True)
testset.apply(lambda ins: len(ins['words']) ,new_field_name='seq_len',is_input=True)
print(testset)
###
from fastNLP import Vocabulary
#将DataSet按照ratio的比例拆分,返回两个DataSet
#ratio (float) -- 0<ratio<1, 返回的第一个DataSet拥有 (1-ratio) 这么多数据,第二个DataSet拥有`ratio`这么多数据
train_data, dev_data = dataset.split(0.1, shuffle=False)
print(train_data)
print(len(train_data),len(dev_data),len(testset))
vocab = Vocabulary(min_freq=2).from_dataset(dataset, field_name='words')
vocab.index_dataset(train_data, dev_data, testset, field_name='words', new_field_name='words')
from fastNLP.embeddings import StaticEmbedding,StackEmbedding
fastnlp_embed = StaticEmbedding(vocab, model_dir_or_name='cn-char-fastnlp-100d',min_freq=2)
# # 不知道咋用
# from fastNLP.models import ESIM
# # 这个不照
# model_scim=ESIM(fastnlp_embed,num_labels=2, dropout_rate=0.3, dropout_embed=0.1)
# print(model_scim)
from fastNLP.models.star_transformer import STSeqCls
# 这个不照
model_stsc=STSeqCls(fastnlp_embed,num_cls=3, hidden_size=300
, num_layers=4, num_head=8
, head_dim=32, max_len=512, cls_hidden_size=600, emb_dropout=0.1, dropout=0.1)
# ESIM(fastnlp_embed,num_labels=2, dropout_rate=0.3, dropout_embed=0.1)
print(model_stsc)
from fastNLP.models import CNNText
model_CNN = CNNText(fastnlp_embed, num_classes=3,dropout=0.1)
print(model_CNN)
from fastNLP import Trainer, CrossEntropyLoss, AccuracyMetric,BCELoss
trainer_CNN = Trainer(model=model_CNN, train_data=train_data, dev_data=dev_data,loss=CrossEntropyLoss(), metrics=AccuracyMetric())
trainer_CNN.train()
本文地址:https://blog.csdn.net/yingdajun/article/details/107078481
推荐阅读