隐马尔科夫模型的例子实现
文章目录
4.例子
4.1.解释
隐藏状态和观察状态的概率转移如下:
求观测状态为normal,cold,dizzy时的隐藏状态。
第一步:
P(“cold”|newState):隐藏状态下观察状态的概率,隐藏状态到观测状态的发射概率
第二步
P(oldState):上一步的结果
P_trans(oldState-newState):旧的隐藏状态到新的隐藏状态的概率
P(“cold”|newState):新的隐藏状态下观察状态的概率,隐藏状态到观测状态的发射概率
第三步
选择概率最高的路径。
对于每个healthy和fever的隐藏状态之间连线,选择概率最高的。
P(H->H)>P(F->H),P(H|Day2)=0.084
P(H->F)>P(F->F),P(F|Day2)=0.027
第四步
第五步
对于每个healthy和fever的隐藏状态之间连线,选择概率最高的。
P(H->H)>P(F->H),P(H|Day2)=0.00588
P(H->F)>P(F->F),P(F|Day2)=0.01512
最终的结果是:
P(start)->P(H)->P(H)->P(F)
4.2.代码
# coding: utf-8
states = ('Healthy', 'Fever')
observations = ('normal', 'cold', 'dizzy')
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
transition_probability = {
'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
emission_probability = {
'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}
def print_dptable(V):
print('节点的概率')
print(" ",end='')
for i in range(len(V)):
print("%7d" % i,end='')
print()
for y in V[0].keys():
print("%.5s: " % y,' ',end='')
for t in range(len(V)):
print("%.7s" % ("%f" % V[t][y]),' ',end='')
print()
print()
def viterbi(obs, states, start_p, trans_p, emit_p):
"""
viterbi算法
:param obs: ('normal', 'cold', 'dizzy')
:param states: ('Healthy', 'Fever')
:param start_p: {'Healthy': 0.6, 'Fever': 0.4}
:param trans_p: {
'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
:param emit_p: {
'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}
:return:
"""
V = [{}] # 节点的概率
path = {} # key:当前节点的隐藏状态,value:前面所有节点的最好隐藏状态
# obs只有一个的情况
for y in states: # ('Healthy', 'Fever')
V[0][y] = start_p[y] * emit_p[y][obs[0]]
path[y] = [y]
# obs大于一个的情况
for t in range(1, len(obs)): # 每个观测状态的计算
V.append({})
newpath = {}
print('path', path)
for y in states: # 计算每个隐藏状态下的概率
# 前一个隐藏状态的节点概率*前一个隐藏状态到新隐藏状态的转移概率*发射概率,旧隐藏状态
# 计算所有前一个隐藏状态的所有情况
# 保留前一个隐藏状态下到此隐藏状态下最大的概率,旧隐藏状态
(prob, state) = max([(V[t - 1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states])
# print(prob, state)
V[t][y] = prob # 保存此观测状态下和隐藏状态下的节点概率
newpath[y] = path[state] + [y]
print('newpath', newpath)
# Don't need to remember the old paths
path = newpath
print_dptable(V)
# 找到最后一个状态的最大的节点的概率的隐藏状态
(prob, state) = max([(V[len(obs) - 1][y], y) for y in states])
return (prob, path[state])
def example():
return viterbi(observations,
states,
start_probability,
transition_probability,
emission_probability)
if __name__ == '__main__':
print(example())
path {‘Healthy’: [‘Healthy’], ‘Fever’: [‘Fever’]}
newpath {‘Healthy’: [‘Healthy’, ‘Healthy’]}
newpath {‘Healthy’: [‘Healthy’, ‘Healthy’], ‘Fever’: [‘Healthy’, ‘Fever’]}
path {‘Healthy’: [‘Healthy’, ‘Healthy’], ‘Fever’: [‘Healthy’, ‘Fever’]}
newpath {‘Healthy’: [‘Healthy’, ‘Healthy’, ‘Healthy’]}
newpath {‘Healthy’: [‘Healthy’, ‘Healthy’, ‘Healthy’], ‘Fever’: [‘Healthy’, ‘Healthy’, ‘Fever’]}
节点的概率
0 1 2
Healt: 0.30000 0.08400 0.00588
Fever: 0.04000 0.02700 0.01512
(0.01512, [‘Healthy’, ‘Healthy’, ‘Fever’])
5.中文分词
5.1.解释
下面结合中文分词来说明HMM,HMM的典型介绍就是这个模型是一个五元组:
- StatusSet: 状态值集合(隐状态)
- ObservedSet: 观察值集合(输出文字集合)
- TransProbMatrix: 转移概率矩阵(隐状态)
- EmitProbMatrix: 发射概率矩阵(隐状态表现为显状态的概率)
- InitStatus: 初始状态概率(隐状态)
结合参数说明HMM解决的三种问题:
(1)参数(StatusSet, TransProbMatrix, EmitRobMatrix, InitStatus)已知的情况下,求解观察值序列。(Forward-backward算法)
(2)参数(ObservedSet, TransProbMatrix, EmitRobMatrix, InitStatus)已知的情况下,求解状态值序列。(Viterbi算法)
(3)参数(ObservedSet)已知的情况下,求解(TransProbMatrix, EmitRobMatrix, InitStatus)。(Baum-Welch算法)
其中,第二种问题是Viterbi算法求解状态值序列最常用,语音识别、中文分词、新词发现、词性标注都有它的一席之地。
五元组参数在中文分词中的具体含义,直接给五元组参数赋予具体含义:
StatusSet & ObservedSet
状态值集合为(B, M, E, S): {B:begin, M:middle, E:end, S:single}
分别代表每个状态代表的是该字在词语中的位置,B代表该字是词语中的起始字,M代表是词语中的中间字,E代表是词语中的结束字,S则代表是单字成词。
观察值集合为就是所有汉字字符串(“小明硕士毕业于中国科学院计算所”),甚至包括标点符号所组成的集合。
状态值也就是我们要求的值,在HMM模型中文分词中,我们的输入是一个句子(也就是观察值序列),输出是这个句子中每个字的状态值。
举个栗子:
小明硕士毕业于中国科学院计算所
输出的状态序列为:
BEBEBMEBEBMEBES
根据这个状态序列我们可以进行切词:
BE/BE/BME/BE/BME/BE/S
所以切词结果如下:
小明/硕士/毕业于/中国/科学院/计算/所
例子:小明硕士毕业于中国科学院计算所
同时我们可以注意到:B后面只可能接(M or E),不可能接(B or S)。而M后面也只可能接(M or E),不可能接(B, S)。
没错,就是这么简单,现在输入输出都明确了,下文讲讲输入和输出之间的具体过程,里面究竟发生了什么不可告人的秘密?
上面只介绍了五元组中的两元【StatusSet, ObservedSet】,下面介绍剩下的三元【InitStatus, TransProbMatrix, EmitProbMatrix】。
这五元的关系是通过一个叫Viterbi的算法串接起来, ObservedSet序列值是Viterbi的输入, 而StatusSet序列值是Viterbi的输出, 输入和输出之间Viterbi算法还需要借助三个模型参数,分别是InitStatus, TransProbMatrix, EmitProbMatrix, 接下来一一讲解
5.1.1.InitStatus
初始状态概率分布是最好理解的,可以示例如下:
#B
-0.26268660809250016
#E
-3.14e+100
#M
-3.14e+100
#S
-1.4652633398537678
PS:示例数值是对概率值取对数之后的结果(可以让概率相乘的计算变成对数相加),其中-3.14e+100作为负无穷,也就是对应的概率值是0。下同。
也就是句子的第一个字属于{B,E,M,S}这四种状态的概率,如上可以看出,E和M的概率都是0,这和实际相符合,开头的第一个字只可能是词语的首字(B),或者是单字成词(S)。
5.1.2.TransProbMatrix
转移概率是马尔科夫链很重要的一个知识点,大学里面学过概率论的人都知道,马尔科夫链最大的特点就是当前T=i时刻的状态Status(i),只和T=i时刻之前的n个状态有关。也就是:
{Status(i-1), Status(i-2), Status(i-3), … Status(i - n)}
更进一步的说,HMM模型有三个基本假设作为模型的前提,其中有个“有限历史性假设”,也就是马尔科夫链的n=1。即Status(i)只和Status(i-1)相关,这个假设能大大简化问题。
回过头看TransProbMatrix,其实就是一个4x4(4就是状态值集合的大小)的二维矩阵,示例如下:
矩阵的横坐标和纵坐标顺序是BEMS x BEMS。(数值是概率求对数后的值,别忘了。)
比如TransProbMatrix[0][0]代表的含义就是从状态B转移到状态B的概率,由TransProbMatrix[0][0] = -3.14e+100可知,这个转移概率是0,这符合常理。由状态各自的含义可知,状态B的下一个状态只可能是ME,不可能是BS,所以不可能的转移对应的概率都是0,也就是对数值负无穷,在此记为-3.14e+100。
由上TransProbMatrix矩阵可知,对于各个状态可能转移的下一状态,且转移概率对应如下:
#B
#E:-0.510825623765990,M:-0.916290731874155
#E
#B:-0.5897149736854513,S:-0.8085250474669937
#M
#E:-0.33344856811948514,M:-1.2603623820268226
#S
#B:-0.7211965654669841,S:-0.6658631448798212
5.1.3.EmitProbMatrix
这里的发射概率(EmitProb)其实也是一个条件概率而已,根据HMM模型三个基本假设里的“观察值独立性假设”,观察值只取决于当前状态值,也就是:
P(Observed[i], Status[j]) = P(Status[j]) * P(Observed[i]|Status[j])
其中P(Observed[i]|Status[j])这个值就是从EmitProbMatrix中获取。
EmitProbMatrix示例如下:
#B
耀:-10.460283,涉:-8.766406,谈:-8.039065,伊:-7.682602,洞:-8.668696,…
#E
耀:-9.266706,涉:-9.096474,谈:-8.435707,伊:-10.223786,洞:-8.366213,…
#M
耀:-8.47651,涉:-10.560093,谈:-8.345223,伊:-8.021847,洞:-9.547990,…
#S
蘄:-10.005820,涉:-10.523076,唎:-15.269250,禑:-17.215160,洞:-8.369527…
5.2.训练HMM模型
根据分好的语料库,获得发射矩阵,开始矩阵和转移矩阵。
emit
{‘B’: {‘中’: 0.009226290787680802, ‘儿’: 0.00033344568220249873, ‘踏’: 4.393128858391452e-05, 。。。。
trans
{‘B’: {‘B’: 0.0, ‘M’: 0.1167175117318146, ‘E’: 0.8832824882681853, ‘S’: 0.0}, ‘M’: {‘B’: 0.0, ‘M’: 0.2777743117140081, ‘E’: 0.7222256882859919, ‘S’: 0.0}, ‘E’: {‘B’: 0.46893265693552616, ‘M’: 0.0, ‘E’: 0.0, ‘S’: 0.5310673430644739}, ‘S’: {‘B’: 0.3503213832274479, ‘M’: 0.0, ‘E’: 0.0, ‘S’: 0.46460125869921165}}
start
{‘B’: 0.5820149148537713, ‘M’: 0.0, ‘E’: 0.0, ‘S’: 0.41798844132394497}
Pi_dic #
{‘B’: 0.5820149148537713, ‘M’: 0.0, ‘E’: 0.0, ‘S’: 0.41798844132394497}
Count_dic
{‘B’: 1388532, ‘M’: 224398, ‘E’: 1388532, ‘S’: 1609916}
#-*-coding:utf-8
import sys
import codecs
import numpy as np
state_M = 4
word_N = 0
trans_dic = {}
emit_dic = {}
Count_dic = {} # 隐藏状态的个数
Pi_dic = {} # 位于词首的概率
word_set = set()
state_list = ['B','M','E','S']
line_num = -1
INPUT_DATA = "RenMinData.txt_utf8"
PROB_START = "prob_start.npy" #
PROB_EMIT = "prob_emit.npy" # 发射概率 # B->我# {'B':{'我':0.34, '门':0.54},'S':{'我':0.34, '门':0.54}}
PROB_TRANS = "prob_trans.npy" # 转移概率 BMES*BMES
# PROB_START {'B': 0.5820149148537713, 'M': 0.0, 'E': 0.0, 'S': 0.41798844132394497}
def init():
global state_M
global word_N
for state in state_list:
trans_dic[state] = {}
for state1 in state_list:
trans_dic[state][state1] = 0.0
for state in state_list:
Pi_dic[state] = 0.0
emit_dic[state] = {}
Count_dic[state] = 0
def getList(input_str):
"""
:param input_str: 过年,一个词,
:return: BE,BMMMS,S,BMMS
"""
outpout_str = []
if len(input_str) == 1:
outpout_str.append('S')
elif len(input_str) == 2:
outpout_str = ['B','E']
else:
M_num = len(input_str) -2
M_list = ['M'] * M_num
outpout_str.append('B')
outpout_str.extend(M_list)
outpout_str.append('S')
return outpout_str
def Output():
# start_fp = codecs.open(PROB_START,'a', 'utf-8')
# emit_fp = codecs.open(PROB_EMIT,'a', 'utf-8')
# trans_fp = codecs.open(PROB_TRANS,'a', 'utf-8')
print ("len(word_set) = %s " % (len(word_set)))
for key in Pi_dic.keys():
'''
if Pi_dic[key] != 0:
Pi_dic[key] = -1*math.log(Pi_dic[key] * 1.0 / line_num)
else:
Pi_dic[key] = 0
'''
Pi_dic[key] = Pi_dic[key] * 1.0 / line_num
# print >>start_fp,Pi_dic
np.save(PROB_START, Pi_dic)
for key in trans_dic:
for key1 in trans_dic[key]:
'''
if A_dic[key][key1] != 0:
A_dic[key][key1] = -1*math.log(A_dic[key][key1] / Count_dic[key])
else:
A_dic[key][key1] = 0
'''
trans_dic[key][key1] = trans_dic[key][key1] / Count_dic[key]
# print >>trans_fp,A_dic
# for k, v in A_dic:
# trans_fp.write(k+' '+str(v)+'\n')
np.save(PROB_TRANS, trans_dic)
for key in emit_dic:
for word in emit_dic[key]:
'''
if B_dic[key][word] != 0:
B_dic[key][word] = -1*math.log(B_dic[key][word] / Count_dic[key])
else:
B_dic[key][word] = 0
'''
emit_dic[key][word] = emit_dic[key][word] / Count_dic[key]
# print >> emit_fp,B_dic
np.save(PROB_EMIT, emit_dic)
# for k, v in B_dic:
# emit_fp.write(k+' '+str(v)+'\n')
# start_fp.close()
# emit_fp.close()
# trans_fp.close()
def main():
# python HMM_train.py
# if len(sys.argv) != 2:
# print ("Usage [%s] [input_data] " % (sys.argv[0]))
# sys.exit(0)
ifp = codecs.open('RenMinData.txt_utf8', 'r', 'utf-8')
init()
global word_set
global line_num
for line in ifp.readlines():
line_num += 1
if line_num % 10000 == 0:
print (line_num)
line = line.strip()
if not line:continue
# line = line.decode("utf-8","ignore")
# 1986年 ,
# 十亿 中华 儿女 踏上 新 的 征 程 。
# 过去 的 一 年 ,
word_list = [] # [过,去,的,一,年,]一个一个的字
for i in range(len(line)):
if line[i] == " ":
continue
word_list.append(line[i])
word_set = word_set | set(word_list)
lineArr = line.split(" ") # 【过去,的,一,年,】
line_state = [] # BMMS, BMMS,S,BE
for item in lineArr:
line_state.extend(getList(item))
# pdb.set_trace()
if len(word_list) != len(line_state):
print (sys.stderr,"[line_num = %d][line = %s]" % (line_num, line))
else:
for i in range(len(line_state)):
if i == 0:
Pi_dic[line_state[0]] += 1
Count_dic[line_state[0]] += 1
else:
trans_dic[line_state[i-1]][line_state[i]] += 1
Count_dic[line_state[i]] += 1
# if not B_dic[line_state[i]].has_key(word_list[i]):
if word_list[i] not in emit_dic[line_state[i]].keys():
emit_dic[line_state[i]][word_list[i]] = 0.0
else:
emit_dic[line_state[i]][word_list[i]] += 1
Output()
ifp.close()
if __name__ == "__main__":
# main()
a = np.load(PROB_EMIT, allow_pickle=True).item()
print(a)
print(type(a))
b = np.load(PROB_START, allow_pickle=True).item()
print(b)
print(b['B'])
"""
{'B': {'B': 0.0, 'M': 0.1167175117318146, 'E': 0.8832824882681853, 'S': 0.0},
'M': {'B': 0.0, 'M': 0.2777743117140081, 'E': 0.0, 'S': 0.7222256882859919},
'E': {'B': 0.46951648068515556, 'M': 0.0, 'E': 0.0, 'S': 0.5304835193148444},
'S': {'B': 0.3607655156767958, 'M': 0.0, 'E': 0.0, 'S': 0.47108435638736734}}
"""
5.3.进行分词
#-*-coding:utf-8
import numpy as np
prob_start = np.load("prob_start.npy", allow_pickle=True).item()
prob_trans = np.load("prob_trans.npy", allow_pickle=True).item()
prob_emit = np.load("prob_emit.npy", allow_pickle=True).item()
def viterbi(obs, states, start_p, trans_p, emit_p):
V = [{}] #tabular
path = {}
for y in states: #init
V[0][y] = start_p[y] * emit_p[y].get(obs[0],0)
path[y] = [y]
for t in range(1,len(obs)):
V.append({})
newpath = {}
for y in states:
(prob,state ) = max([(V[t-1][y0] * trans_p[y0].get(y,0) * emit_p[y].get(obs[t],0) ,y0) for y0 in states if V[t-1][y0]>0])
V[t][y] =prob
newpath[y] = path[state] + [y]
path = newpath
(prob, state) = max([(V[len(obs) - 1][y], y) for y in states])
return (prob, path[state])
def cut(sentence):
#pdb.set_trace()
prob, pos_list = viterbi(sentence,('B','M','E','S'), prob_start, prob_trans, prob_emit)
return (prob,pos_list)
def pos2word(test_str, pos_list):
res = ''
for i in range(len(pos_list)):
if pos_list[i]=='B':
res = res+test_str[i]
# print(test_str[i], end='')
elif pos_list[i]=='E':
res = res + test_str[i]+'/'
# print(test_str[i],'/', end='')
elif pos_list[i]=='M':
res = res + test_str[i]
# print(test_str[i], end='')
else:
res = res + test_str[i] + '/'
# print(test_str[i], '/', end='')
print(res.strip('/'))
if __name__ == "__main__":
test_str = u"计算机和电脑"
prob,pos_list = cut(test_str)
print (test_str)
print (pos_list)
pos2word(test_str, pos_list)
test_str = u"他说的确实在理."
prob,pos_list = cut(test_str)
print (test_str)
print (pos_list)
pos2word(test_str, pos_list)
test_str = u"我有一台电脑。"
prob,pos_list = cut(test_str)
print (test_str)
print (pos_list)
pos2word(test_str, pos_list)
计算机和电脑
[‘B’, ‘M’, ‘E’, ‘S’, ‘B’, ‘E’]
计算机/和/电脑
他说的确实在理.
[‘S’, ‘S’, ‘S’, ‘B’, ‘E’, ‘S’, ‘S’, ‘S’]
他/说/的/确实/在/理/.
我有一台电脑。
[‘B’, ‘E’, ‘B’, ‘E’, ‘B’, ‘E’, ‘S’]
我有/一台/电脑/。
6.代码和数据
链接:https://pan.baidu.com/s/1QbeG8g_9JhVwJRhERViTMg
提取码:80su
github:
https://github.com/shelleyHLX/machine-learning
本文地址:https://blog.csdn.net/qq_27009517/article/details/107100833