欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python聚类分析手写版(拒sklearn和scipy、numpy)

程序员文章站 2022-03-11 16:16:43
for 系统聚类分析(python手写代码,matrix有疑问的请见前一条博客)from matrix import copyfrom matrix import calfrom matrix import matrix as mimport xlrdimport matplotlib.pyplot as pltimport openpyxlimport warningsdef read_data(path): wk=xlrd.open_workbook(path) sh=w...

for 系统聚类分析(python手写代码,matrix有疑问的请见前一条博客)

from matrix import copy from matrix import cal from matrix import matrix as m import xlrd import matplotlib.pyplot as plt import openpyxl import warnings def read_data(path): wk=xlrd.open_workbook(path) sh=wk.sheets()[0] row=sh.nrows
    col=sh.ncols
    mat=m.zero(row-1,col-1) for each in range(1,row): for i in range(1,col): mat[each-1][i-1]=sh.row_values(each)[i] namelist=sh.col_values(0,start_rowx=1) col_name=sh.row_values(0,start_colx=1) return mat,namelist,col_name def stand_trans(mat): row=len(mat) col=len(mat[0]) average=cal([sum([mat[each][i] for each in range(row)])/row for i in range(col)]) error=cal([(sum([(mat[each][i]-average[i])**2 for each in range(row)])/(row-1))**0.5 for i in range(col)]) mata=cal([[(mat[each][i]-average[i])/error[i] for i in range(col)] for each in range(row)]) return mata def ming_distance(mat,q=2): target=m.zero(len(mat),len(mat)) for each in range(len(mat)): for i in range(each): data1=mat[each] data2=mat[i] if q!='inf': add_distance=sum([abs(data1[y]-data2[y])**q for y in range(len(data1))])**(1/q) else: add_distance=max([abs(data1[x]-data2[x]) for x in range(len(data1))]) target[each][i]=add_distance
            target[i][each]=add_distance return target def ma_distance(mat): row=len(mat) col=len(mat[0]) if row<=col: raise TypeError('矩阵的指标个数必须小于观测量') average=cal([[sum([mat[each][i] for each in range(row)])/row for i in range(col)]]*row) mat1=mat-average
    coefficient=(mat1.t*mat1)/(row-1) target=coefficient.ni
    result=m.zero(row,row) for each in range(row): for i in range(each): x=cal([mat[each]]) y=cal([mat[i]]) final=((x-y)*target*(x-y).t)**0.5 result[each][i]=final
            result[i][each]=final return result def correlation(mat):#变量相关聚类 stand=stand_trans(mat) corr=(stand.t*stand)/(len(mat)-1) for each in range(len(corr)): for i in range(each+1,len(corr)): corr[each][i]=corr[i][each] corr[each][each]=1 return m.one(len(corr),len(corr))-corr def cos(mat):#变量夹角余弦聚类 target=m.zero(len(mat),len(mat)) mata=mat.t for each in range(len(mata)): for i in range(each): x=mata[each] y=mata[i] multiple=sum([x[j]*y[j] for j in range(len(x))]) add=(sum([x[j]**2 for j in range(len(x))])*sum([y[j]**2 for j in range(len(x))]))**0.5 target[each][i]=multiple/add
            target[i][each]=multiple/add return m.one(len(target),len(target))-target def distance(mat,method='euler',p=None): if method=='euler': return ming_distance(mat) elif method=='absolute': return ming_distance(mat,q=1) elif method=='ming': if not p: raise TypeError('p值未输入') else: return ming_distance(mat,q=p) elif method=='mahalanobis': return ma_distance(mat) elif method=='chebyshev': return ming_distance(mat,q='inf') elif method=='correlation': return correlation(mat) elif method=='cos': return cos(mat) else: raise TypeError('无效命令') def group_dist(group1,group2,method='average',dist_mat=None,base_mat=None): if method=='nearest': if not dist_mat: raise TypeError('未输入距离矩阵') dist=[dist_mat[each][i] for each in group1 for i in group2] return min(dist) elif method=='farthest': if not dist_mat: raise TypeError('未输入距离矩阵') dist=[dist_mat[each][i] for each in group1 for i in group2] return max(dist) elif method=='average': if not dist_mat: raise TypeError('未输入距离矩阵') dist=sum([dist_mat[each][i] for each in group1 for i in group2]) dist/=(len(group1)*len(group2)) return dist elif method=='centroid': if not base_mat: raise TypeError('原始矩阵未输入') else: matchrow1=[base_mat[each] for each in group1] matchrow2=[base_mat[each] for each in group2] row1=len(matchrow1) row2=len(matchrow2) col=len(matchrow1[0]) result1=[sum([matchrow1[each][i] for each in range(row1)])/row1 for i in range(col)] result2=[sum([matchrow2[each][i] for each in range(row2)])/row2 for i in range(col)] result=sum([(result2[each]-result1[each])**2 for each in range(col)])**0.5 return result elif method=='square':#数模书上的离差平方和ward法 if not base_mat: raise TypeError('原始矩阵未输入') else: matchrow1=[base_mat[each] for each in group1] matchrow2=[base_mat[each] for each in group2] matchrow=matchrow1+matchrow2
            row1=len(matchrow1) row2=len(matchrow2) col=len(matchrow1[0]) average1=cal([[sum([matchrow1[each][i] for each in range(row1)])/row1 for i in range(col)]]) average2=cal([[sum([matchrow2[each][i] for each in range(row2)])/row2 for i in range(col)]]) average=cal([[sum([matchrow[each][i] for each in range(row1+row2)])/(row1+row2) for i in range(col)]]) D1=sum([(cal([matchrow1[each]])-average1)*(cal([matchrow1[each]])-average1).t for each in range(row1)]) D2=sum([(cal([matchrow2[each]])-average2)*(cal([matchrow2[each]])-average2).t for each in range(row2)]) D12=sum([(cal([matchrow[each]])-average)*(cal([matchrow[each]])-average).t for each in range(row1+row2)]) return D12-D1-D2 elif method=='ward':#matlab的ward法 if not base_mat: raise TypeError('原始矩阵未输入') else: init=group_dist(group1,group2,method='centroid',dist_mat=dist_mat,base_mat=base_mat) len1=len(group1) len2=len(group2) return ((2*len1*len2)/(len1+len2))**0.5*init else: raise TypeError('无效命令') def linkage(base_mat1,simple_method='euler',group_method='average',ming_p=None): if (simple_method=='cos' or simple_method=='correlation') and (group_method=='centroid' or group_method=='ward'): warnings.warn('重心法或离差平方和法对于R型聚类不太适用,请谨慎使用') base_mat1=cal(base_mat1) dist_mat1=distance(base_mat1,method=simple_method,p=ming_p) row=len(dist_mat1) change_mat=copy.deepcopy(dist_mat1) accompany=list(range(row)) adddict={} result_mat=cal([]) accumulate={} accumulate[0]=[[each] for each in range(row)] margin={} i=1 while True: maxnum=max(accompany) if len(change_mat)==1: break else: accumulate[i]=copy.deepcopy(accumulate[i-1]) next1=maxnum+1 compare=change_mat[1][0] min_key=(1,0) for each in range(len(change_mat)): for j in range(each): if change_mat[each][j]<compare: min_key=(each,j) compare=change_mat[each][j] else: pass minvalue=compare
            real_key=accompany[min_key[0]],accompany[min_key[1]] result_mat.append([real_key[0],real_key[1],minvalue]) if real_key[0] not in adddict and real_key[1] not in adddict: adddict[maxnum+1]=[real_key[0],real_key[1]] accumulate[i].remove([real_key[0]]) accumulate[i].remove([real_key[1]]) accumulate[i].append([real_key[0],real_key[1]]) margin[real_key]=minvalue elif real_key[0] not in adddict: adddict[maxnum+1]=adddict[real_key[1]]+[real_key[0]] accumulate[i].remove(adddict[real_key[1]]) accumulate[i].remove([real_key[0]]) accumulate[i].append(adddict[real_key[1]]+[real_key[0]]) margin[(adddict[real_key[1]][-1],real_key[0])]=minvalue elif real_key[1] not in adddict: adddict[maxnum+1]=adddict[real_key[0]]+[real_key[1]] accumulate[i].remove(adddict[real_key[0]]) accumulate[i].remove([real_key[1]]) accumulate[i].append(adddict[real_key[0]]+[real_key[1]]) margin[(adddict[real_key[0]][-1],real_key[1])]=minvalue else: adddict[maxnum+1]=adddict[real_key[0]]+adddict[real_key[1]] accumulate[i].remove(adddict[real_key[0]]) accumulate[i].remove(adddict[real_key[1]]) accumulate[i].append(adddict[real_key[0]]+adddict[real_key[1]]) margin[(adddict[real_key[0]][-1],adddict[real_key[1]][0])]=minvalue
            accompany.pop(min_key[0]) accompany.pop(min_key[1]) accompany.append(maxnum+1) change_mat.pop(min_key[0]) change_mat.pop(min_key[1]) for each in change_mat: each.pop(min_key[0]) each.pop(min_key[1]) each.append(0) addlist=[] for each in accompany: if each not in adddict: addlist.append(group_dist([each],adddict[maxnum+1],method=group_method,dist_mat=dist_mat1,base_mat=base_mat1)) else: addlist.append(group_dist(adddict[each],adddict[maxnum+1],method=group_method,dist_mat=dist_mat1,base_mat=base_mat1)) change_mat.append(addlist) change_mat=m.symmetric(change_mat) i+=1 return result_mat,accumulate,adddict,margin def cluster(result_tuple,max_clust,namelist=None): result=result_tuple[1] extract=result[len(result)-max_clust] totalsort={} for each in range(len(extract)): if len(extract[each])==1: print('第%s类:'%(each+1),end='') if namelist: print(namelist[extract[each][0]]) totalsort[each+1]=namelist[extract[each][0]] else: print(extract[each][0]) totalsort[each+1]=str(extract[each][0]) else: print('第%s类:'%(each+1),end='') if namelist: name=[namelist[i] for i in extract[each]] totalstr='、'.join(name) print(totalstr) totalsort[each+1]=name else: transform=[str(i) for i in extract[each]] totalstr='、'.join(transform) print(totalstr) totalsort[each+1]=transform return totalsort def write_excel(path,extract,namelist1): wk=openpyxl.load_workbook(path) sh=wk.create_sheet('系统聚类结果') sh.cell(1,1).value='序列' sh.cell(1,2).value='分类结果' totalrow=2 for each in extract: if type(extract[each])!=list: sh.cell(totalrow,1).value=extract[each] sh.cell(totalrow,2).value=each
            totalrow+=1 else: for i in extract[each]: sh.cell(totalrow,1).value=i
                sh.cell(totalrow,2).value=each
                totalrow+=1 wk.save(path) def plot(result_tuple,namelist1): margin=result_tuple[3] sequence=list(result_tuple[2].values())[-1] y_distance=[margin[(sequence[each],sequence[each+1])] for each in range(len(sequence)-1)] name_seq=[namelist1[each] for each in sequence] real_seq=[] addition=[] for each in range(len(name_seq)): real_seq.append(name_seq[each]) if each!=len(name_seq)-1: real_seq.append(str(each)) addition.append(str(each)) real_y=[] for each in range(len(real_seq)): try: h=int(real_seq[each]) real_y.append(y_distance[int((each-1)/2)]) except: real_y.append(0) plt.rcParams['font.sans-serif']=['STSONG'] plt.barh(real_seq,real_y,height=2) ax=plt.gca() ax.set_yticks(name_seq) ax.set_yticks(addition,minor=True) plt.grid(True) plt.ylabel('聚类类别') plt.xlabel('聚类距离') plt.show() path=input('请输入路径:') datamat,namelist,col_name=read_data(path) stand=input('请选择是否标准化') choice_type=input('请选择是Q型还是R型聚类') if stand: datamat=stand_trans(datamat) if choice_type=='Q': link=linkage(datamat) max_clust=int(input('请输入最大聚类数:')) clust=cluster(link,max_clust,namelist=namelist) plot(link,namelist) judge=input('请输入是否写入excel:') if judge: write_excel(path,clust,namelist) else: pass else: link=linkage(datamat,simple_method='correlation') max_clust=int(input('请输入最大聚类数:')) clust=cluster(link,max_clust,namelist=col_name) plot(link,col_name) 

利用前期国家统计局爬得的31个省份250余个经济指标(人均)进行系统聚类(欧式距离,组间平均连接),结果如下
Python聚类分析手写版(拒sklearn和scipy、numpy)

Python聚类分析手写版(拒sklearn和scipy、numpy)
Python聚类分析手写版(拒sklearn和scipy、numpy)

图是柱形图,跟树状图类似,每一条柱形的含义即在该距离下柱形底部的两个省份聚成一类,当然这两个“省份”并不是说一定是单独的两个省份,可能这两个省份之前已经和其他省份聚类而变成了两个“省份群”(具体情况可以通过画一条y=0的线,然后通过x轴平移得到不同聚类距离下的聚类情况),仍有不懂请百度“怎么看聚类图(或SPSS冰状图)“或在评论区回复

本文地址:https://blog.csdn.net/python_solver/article/details/108877155