DBSCAN算法
基于密度的聚类算法是数据挖掘技术中被广泛使用的一类方法,其核心思想是用密度邻域和核心点阈值来计算书记空间中的数据点分布情况。该算法可以找出形状不规则(oddly-shaped)的cluser,且聚类时不需事先知道cluser的个数。
DBSCAN,即Density-Dased Spatial Clustering of Applition with Noise,是一种典型的基于密度聚类的方法。
1、基本概念
DBSCAN算法两个重要参数:
Eps:定义密度时的邻域(近邻)半径,记为e
MinPts:定义核心点时的阀值(minimum number of points required to form a cluster),记为M
设数据集合 X={x1,x2,..,xn},引入几个概念和记号:
1.e近邻(邻域)和密度ρ:
定义:Ne(xi)={y∈X:d(y,xi)≤e}
定义:ρ(x)=|Ne(x)|
实际上是通过e定义了以x为圆心,以e为半径的圆形面积,Ne(xi)表示圆面积,ρ(x)表示密度(落在圆内点个数)。
2.核心点(core points)和非核心点:
对ρ(x)给定一个阀值M,定义核心点计算:xi:ρ(x)≥M,定义核心点集合:Xc = {xi:ρ(x)≥M,xi∈X},那么非核心点可表示为:Xnc = X\Xc
3.边界点:
给定一个数据点x,如果:
①:x∈Xnc
②:在Ne(x)内存在核心点y(即y∈Xc)
称x为边界点。通俗解释为:在x的圆内点个数少于M,但是x圆内有点y,以y为圆心的圆内点个数大于M,则x只可能处于边缘位置。
称所有边界点构成的集合为Xbd。
4.噪音点:
给定一个数据点x,记Xnoi=X\(Xc∪Xbd),如果x∈Xnoi,则称x为噪音点。
5.直接密度可达、密度可达、密度相连:
设两个数据点x1,x2,如果满足x1∈Xc,x2∈Ne(x1),则称x2是x1直接密度可达的。实质上是:边界点或核心点是核心点的直接密度可达的。
如果x1,x2,...,xi是核心点,xj是xi的直接密度可达点,那称xj是从x1,x2,...,的密度可达的。
设x1,x2,x3∈X,如果x2和x3均是从x1密度可达的,则称x2和x3是密度相连的
6.类:
称非空集合C∈X是X的一个类(cluster),对于x1,x2∈X,如果它满足:
①(Maximality)若x1∈C且x2是从x1密度可达的,则x2∈C
②(Connectivity)若x1∈C且x2∈C,则x1,x2是密度相连的。
2、算法流程
DBSCAN算法核心思想:
从某个选定的核心点触发,不断向密度可达的区域扩张,从而得到一个包含核心点和边界点的最大化区域,区域中任意两点密度相连。
设数据集合X={x1,x2,...,xn}:
对所有的数据点进行标记mi = {j,xi∈jcluster;-1,xi∈Xnoi}
算法流程:
Step1 初始化
1.给定参数e和M
2.生成Ne(i),i=1,2,...,N
3.令k=1;mi=0,i=1,2,...,N
4.令I={1,2,...,N}
Step2 生成cluster标记数组
while I≠φ:
{
从I中任取一个元素i,并令I:I\{i}.
if(mi=0)//即i号节点还没有被处理过
{
1.初始化T=Ne(i)
2.若|T|<M,则令mi=-1.(暂时将i号节点标记为噪音点)
3.若|T|≥M,(即i为核心点),则:
令mi=k.(将i号节点归属于第k个聚类)
while T≠0:
{
1.从T中任取元素j,并令T=T{j}.
2.若mj=0或-1,则mj=k.
3.若|Ne(j)|≥M(即j为核心点),则令T=T∪Ne(j).
}
令k=k+1(第k个聚类已经完成,开始下一个聚类).
}
}
备注:这个算法构思没有包含对边界点的处理,应该是不妥帖的。
3、python实现DBSCAN算法
import numpy as np from collections import defaultdict, Counter from matplotlib.pyplot import * class DBSCAN(object): def __init__(self,data,E,minPts): """输入三个参数:数据、近邻、核心点阈值""" self.E = E self.minPts = minPts self.data = data def distance(self,list1,list2): """计算两个数据点的欧式距离""" return np.sqrt(sum(map(lambda x,y:(x-y)**2,list1,list2))) def getCBPoints(self): """计算核心点、边界点、噪音点""" self.core_points = []#核心点 self.other_points = [] self.cb_points = [] for index1 in self.data: index1.append(0) total = 0 for index2 in self.data: L = self.distance(index1,index2) if L <= self.E: total += 1 if total > self.minPts: self.core_points.append(index1) self.cb_points.append(index1) else: self.other_points.append(index1) self.border_points = []#边界点 for core in self.core_points: for other in self.other_points: if self.distance(core,other) <= self.E: self.border_points.append(other) self.cb_points.append(other) def getNoisePoints(self): self.noise_points = []#噪音点 for point in self.data: if not point in self.core_points and not point in self.border_points: self.noise_points.append(point) return self.noise_points def cluster(self): """开始聚类""" self.cluster_label = 0 for point in self.core_points: if point[2] == 0: self.cluster_label += 1 point[2] = self.cluster_label for point2 in self.cb_points: L = self.distance(point2,point) if point2[2] == 0 and L <= self.E: point2[2] = point[2] self.cluster_list = defaultdict(lambda: [[],[]]) for point in self.cb_points: self.cluster_list[point[2]][0].append(point[0]) self.cluster_list[point[2]][1].append(point[1]) def plot(self): markers = ['o', '+','*','.','d','^','v','>','<','p', "H", "D", "p", "1", "2", "3", "4"] i = 0 for value in self.cluster_list: cluster = self.cluster_list[value] plot(cluster[0],cluster[1],markers[i]) i = i%10 + 1 noisex = [] noisey = [] for point in self.noise_points: noisex.append(point[0]) noisey.append(point[1]) plot(noisex,noisey,'x') title(str(len(self.cluster_list))+ " clusters created with E ="+str(self.E)+" Min Points="+str(self.minPts)+ " total points="+str(len(self.data))+" noise Points = "+str(len(self.noise_points))) axis((0,60,0,60)) show() if __name__ == "__main__": def testpoints(): all_points = [] for i in range(100): randCoord = [np.random.randint(1,50),np.random.randint(1,50)] if randCoord not in all_points: all_points.append(randCoord) return all_points data = testpoints() dbsc = DBSCAN(data,8,8) dbsc.getCBPoints() dbsc.getNoisePoints() dbsc.cluster() dbsc.plot()
4、sklearn实现DBSACN算法
import numpy as np from sklearn.cluster import DBSCAN from sklearn import metrics from sklearn.datasets.samples_generator import make_blobs from sklearn.preprocessing import StandardScaler centers = [[1,1],[-1,-1],[1,-1]] X,labels_true = make_blobs(n_samples=750,centers=centers,cluster_std=0.4,random_state=0) X = StandardScaler().fit_transform(X) db = DBSCAN(eps=0.3,min_samples=10).fit(X) core_samples_mask = np.zeros_like(db.labels_,dtype=bool) core_samples_mask[db.core_sample_indices_] = True labels = db.labels_ n_clusters_ = len(set(labels))-(1 if -1 in labels else 0) print('Estimated number of clusters: %d' % n_clusters_) print("Homogeneity: %0.3f" % metrics.homogeneity_score(labels_true, labels)) print("Completeness: %0.3f" % metrics.completeness_score(labels_true, labels)) print("V-measure: %0.3f" % metrics.v_measure_score(labels_true, labels)) print("Adjusted Rand Index: %0.3f" % metrics.adjusted_rand_score(labels_true, labels)) print("Adjusted Mutual Information: %0.3f" % metrics.adjusted_mutual_info_score(labels_true, labels)) print("Silhouette Coefficient: %0.3f" % metrics.silhouette_score(X, labels)) import matplotlib.pyplot as plt # Black removed and is used for noise instead. unique_labels = set(labels) colors = [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))] for k, col in zip(unique_labels, colors): if k == -1: # Black used for noise. col = [0, 0, 0, 1] class_member_mask = (labels == k) xy = X[class_member_mask & core_samples_mask] plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col), markeredgecolor='k', markersize=14) xy = X[class_member_mask & ~core_samples_mask] plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col), markeredgecolor='k', markersize=6) plt.title('Estimated number of clusters: %d' % n_clusters_) plt.show()