欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

三维点云学习(3)8- 实现Spectral

程序员文章站 2022-05-19 11:23:40
三维点云学习(3)8- 实现Spectral谱聚类谱聚类代码参考效果图原图效果图步骤:全部代码Spectral.py# 文件功能:实现 Spectral 谱聚类 算法import numpy as npfrom numpy import *import scipyimport pylabimport random, mathimport matplotlib.pyplot as pltfrom matplotlib.patches import Ellipsefr...

三维点云学习(3)8- 实现Spectral谱聚类

谱聚类代码参考
课堂谱聚类理论笔记

效果图

原图
三维点云学习(3)8-  实现Spectral

效果图
三维点云学习(3)8-  实现Spectral
前三种为自写的聚类算法,分别是 KMeans、GMM、Spectral,后面为sklearn自带的聚类算法库
三维点云学习(3)8-  实现Spectral

步骤:
三维点云学习(3)8-  实现Spectral

step1 先计算每个点的距离矩阵,再使用Kd-tree建立近邻矩阵

如下分别为两种建立关系矩阵的办法

        #step1 先计算每个点的距离矩阵,再使用KNN建立近邻矩阵
        self.W = kdtree_contribute_Matrix(data,8)
       #self.W = myKNN(my_distance_Marix(data), 5)

方法一:

#使用二范数先计算点的距离,建立距离矩阵
def my_distance_Marix(data):
    S = np.zeros((len(data), len(data)))  # 初始化 关系矩阵 w 为 n*n的矩阵
    # step1 建立关系矩阵, 每个节点都有连线,权重为距离的倒数
    for i in range(len(data)):  # i:行
        for j in range(len(data)):  # j:列
            if i == j:
                S[i][j] = 0
            else:
                S[i][j] = np.linalg.norm(data[i] - data[j])  # 二范数计算两个点直接的距离,两个点之间的权重为之间距离的倒数
    return S
#使用一维KNN进行最临近选择
def myKNN(S, k):
    N = len(S)
    A = np.zeros((N,N))

    for i in range(N):
        dist_with_index = zip(S[i], range(N))
        dist_with_index = sorted(dist_with_index, key=lambda x:x[0])
        neighbours_id = [dist_with_index[m][1] for m in range(k+1)] # xi's k nearest neighbours

        for j in neighbours_id: # xj is xi's neighbour
            A[i][j] = 1
            A[j][i] = A[i][j] # mutually
    return A

方法二:

#通过kdtree进行临近矩阵的构建
def kdtree_contribute_Matrix(S,K):
    N = len(S)
    A = np.zeros((N, N))
    leaf_size = 4
    root = kdtree.kdtree_construction(S, leaf_size=leaf_size)
    for i in range(N):
        query = S[i]
        result_set = KNNResultSet(capacity=K)
        kdtree.kdtree_knn_search(root, S, result_set, query)
        index = result_set.knn_output_index()
        for j in index:
            A[i][j] = 1  #
            A[j][i] = A[i][j]
            if i == j:
                A[i][j] = 0
    return A

step2 换算Laplacian L 拉普拉斯矩阵

        #step2 换算Laplacian L 拉普拉斯矩阵
        ##换算D矩阵
        self.D = np.diag(np.sum(self.W,axis=1))     #列相加,并转化为对角线矩阵
        self.L = self.D - self.W                 #拉普拉斯矩阵 L = D - W

step3 算拉普拉斯L矩阵最小的K个特征向量记为V

        #step3 算拉普拉斯L矩阵最小的K个特征向量记为V
        ###法一
        # eigval, eigvec = np.linalg.eigh(L)
        # features = np.asarray([eigvec[:,i] for i in range(self.__K)]).T
        ###法二
        _, self.Y = scipy.linalg.eigh(self.L, eigvals=(0, 2))  # 特征值分解

step4 把 N*k维 向量 进行K-means聚类

        #step4 把 N*k维 向量 进行K-means聚类
        # k_means = KMeans.K_Means(n_clusters=self.k)       #初始化kmeans
        # k_means.fit(self.Y)
        # result = k_means.predict(self.Y)
        sp_kmeans = KMeans(n_clusters=self.k).fit(self.Y)
        self.label = sp_kmeans.labels_
        return  sp_kmeans.labels_

个人小结:

1.进行KNN或者radiusNN选取最临近点构建关系矩阵都要注意好 数值 radius 和 k的选择
2.进行特征值、特征向量求解的两个方便函数

#要注意默认输出的特征向量为列向量形式,后续需要进行转置换算,
#np.linalg.eigh()返回的是升序排列的特征值及其对应向量
#np.linalg.eig()返回的是乱序排列的特征值及其对应向量
eigval, eigvec = np.linalg.eigh(L)
推介:
#eigvals 属性值的设置可以规定输出的矩阵形式,同样eigh为升序输出,
#例如eigvals=(0, 2),结果输出特征值最小的三个的特征向量,并且每个特征向量以列向量形式表示
scipy.linalg.eigh(self.L, eigvals=(0, 2))  # 特征值分解

全部代码

Spectral.py

# 文件功能:实现 Spectral 谱聚类 算法

import numpy as np
from numpy import *
import scipy
import pylab
import random, math

import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from scipy.stats import multivariate_normal
from result_set import KNNResultSet, RadiusNNResultSet
from sklearn.cluster import KMeans
import  kdtree as kdtree

plt.style.use('seaborn')

def my_distance_Marix(data):
    S = np.zeros((len(data), len(data)))  # 初始化 关系矩阵 w 为 n*n的矩阵
    # step1 建立关系矩阵, 每个节点都有连线,权重为距离的倒数
    for i in range(len(data)):  # i:行
        for j in range(len(data)):  # j:列
            if i == j:
                S[i][j] = 0
            else:
                S[i][j] = np.linalg.norm(data[i] - data[j])  # 二范数计算两个点直接的距离,两个点之间的权重为之间距离的倒数
    return S

def euclidDistance(x1, x2, sqrt_flag=False):
    res = np.sum((x1-x2)**2)
    if sqrt_flag:
        res = np.sqrt(res)
    return res

def calEuclidDistanceMatrix(X):
    X = np.array(X)
    S = np.zeros((len(X), len(X)))
    for i in range(len(X)):
        for j in range(i+1, len(X)):
            S[i][j] = 1.0 * euclidDistance(X[i], X[j])
            S[j][i] = S[i][j]
    return S

def kdtree_contribute_Matrix(S,K):
    N = len(S)
    A = np.zeros((N, N))
    leaf_size = 4
    root = kdtree.kdtree_construction(S, leaf_size=leaf_size)
    for i in range(N):
        query = S[i]
        result_set = KNNResultSet(capacity=K)
        kdtree.kdtree_knn_search(root, S, result_set, query)
        index = result_set.knn_output_index()
        for j in index:
            A[i][j] = 1  #
            A[j][i] = A[i][j]
            if i == j:
                A[i][j] = 0
    return A

def myKNN(S, k):
    N = len(S)
    A = np.zeros((N,N))

    for i in range(N):
        dist_with_index = zip(S[i], range(N))
        dist_with_index = sorted(dist_with_index, key=lambda x:x[0])
        neighbours_id = [dist_with_index[m][1] for m in range(k+1)] # xi's k nearest neighbours

        for j in neighbours_id: # xj is xi's neighbour
            A[i][j] = 1
            A[j][i] = A[i][j] # mutually
    return A


# 二维点云显示函数
def Point_Show(point,color):
    x = []
    y = []
    point = np.asarray(point)
    for i in range(len(point)):
        x.append(point[i][0])
        y.append(point[i][1])
    plt.scatter(x, y,color=color)


class Spectral(object):
    def __init__(self, n_clusters):
        self.n_clusters = n_clusters
        self.k = n_clusters

    def fit(self,data):
        self.W = np.zeros((len(data),len(data)))       #初始化 关系矩阵 w 为 n*n的矩阵
        #step1 先计算每个点的距离矩阵,再使用KNN建立近邻矩阵
        self.W = kdtree_contribute_Matrix(data,8)
        #self.W = myKNN(my_distance_Marix(data), 5)
        #step2 换算Laplacian L 拉普拉斯矩阵
        ##换算D矩阵
        self.D = np.diag(np.sum(self.W,axis=1))     #列相加,并转化为对角线矩阵
        self.L = self.D - self.W                 #拉普拉斯矩阵 L = D - W
        #step3 算拉普拉斯L矩阵最小的K个特征向量记为V
        ###法一
        # eigval, eigvec = np.linalg.eigh(L)
        # features = np.asarray([eigvec[:,i] for i in range(self.__K)]).T
        ###法二
        _, self.Y = scipy.linalg.eigh(self.L, eigvals=(0, 2))  # 特征值分解
        #step4 把 N*k维 向量 进行K-means聚类
        # k_means = KMeans.K_Means(n_clusters=self.k)       #初始化kmeans
        # k_means.fit(self.Y)
        # result = k_means.predict(self.Y)
        sp_kmeans = KMeans(n_clusters=self.k).fit(self.Y)
        self.label = sp_kmeans.labels_
        return  sp_kmeans.labels_

    def predict(self, data):
        """
        Get cluster labels
        Parameters
        ----------
        data: numpy.ndarray
            Testing set as N-by-D numpy.ndarray
        Returns
        ----------
        result: numpy.ndarray
            data labels as (N, ) numpy.ndarray
        """
        return np.copy(self.label)



# 生成仿真数据
def generate_X(true_Mu, true_Var):
    # 第一簇的数据
    num1, mu1, var1 = 400, true_Mu[0], true_Var[0]
    X1 = np.random.multivariate_normal(mu1, np.diag(var1), num1)
    # 第二簇的数据
    num2, mu2, var2 = 600, true_Mu[1], true_Var[1]
    X2 = np.random.multivariate_normal(mu2, np.diag(var2), num2)
    # 第三簇的数据
    num3, mu3, var3 = 1000, true_Mu[2], true_Var[2]
    X3 = np.random.multivariate_normal(mu3, np.diag(var3), num3)
    # 合并在一起
    X = np.vstack((X1, X2, X3))
    # 显示数据
    plt.figure(figsize=(10, 8))
    plt.axis([-10, 15, -5, 15])
    plt.scatter(X1[:, 0], X1[:, 1], s=5)
    plt.scatter(X2[:, 0], X2[:, 1], s=5)
    plt.scatter(X3[:, 0], X3[:, 1], s=5)
    plt.show()
    return X


if __name__ == '__main__':
    # 生成数据
    true_Mu = [[0.5, 0.5], [5.5, 2.5], [1, 7]]
    true_Var = [[1, 3], [2, 2], [6, 2]]
    X = generate_X(true_Mu, true_Var)
    #X = np.array([[1, 2], [2, 3], [5, 8], [8, 8], [1, 6], [9, 11]])

    spectral = Spectral(n_clusters=3)
    K = 3
    spectral.fit(X)
    cat = spectral.predict(X)
    print(cat)
    cluster =[[] for i in range(K)]
    for i in range(len(X)):
        if cat[i] == 0:
            cluster[0].append(X[i])
        elif cat[i] == 1:
            cluster[1].append(X[i])
        elif cat[i] == 2:
            cluster[2].append(X[i])
    Point_Show(cluster[0],color="red")
    Point_Show(cluster[1], color="orange")
    Point_Show(cluster[2],color="blue")
    plt.show()

result.py

import copy


class DistIndex:
    def __init__(self, distance, index):
        self.distance = distance
        self.index = index

    def __lt__(self, other):
        return self.distance < other.distance


class KNNResultSet:
    def __init__(self, capacity):
        self.capacity = capacity
        self.count = 0
        self.worst_dist = 1e10
        self.dist_index_list = []
        self.output_index = []
        for i in range(capacity):
            self.dist_index_list.append(DistIndex(self.worst_dist, 0))

        self.comparison_counter = 0

    def size(self):
        return self.count

    def full(self):
        return self.count == self.capacity

    def worstDist(self):
        return self.worst_dist

    def add_point(self, dist, index):
        self.comparison_counter += 1
        if dist > self.worst_dist:
            return

        if self.count < self.capacity:
            self.count += 1

        i = self.count - 1
        while i > 0:
            if self.dist_index_list[i - 1].distance > dist:
                self.dist_index_list[i] = copy.deepcopy(self.dist_index_list[i - 1])
                i -= 1
            else:
                break

        self.dist_index_list[i].distance = dist
        self.dist_index_list[i].index = index
        self.worst_dist = self.dist_index_list[self.capacity - 1].distance

    def __str__(self):
        output = ''
        for i, dist_index in enumerate(self.dist_index_list):
            output += '%d - %.2f\n' % (dist_index.index, dist_index.distance)
        output += 'In total %d comparison operations.' % self.comparison_counter
        return output

    def knn_output_index(self):
        output = ''
        for i, dist_index in enumerate(self.dist_index_list):
            output += '%d - %.2f\n' % (dist_index.index, dist_index.distance)
            self.output_index.append(dist_index.index)
        output += 'In total %d comparison operations.' % self.comparison_counter
        return  self.output_index


class RadiusNNResultSet:
    def __init__(self, radius):
        self.radius = radius
        self.count = 0
        self.worst_dist = radius
        self.dist_index_list = []

        self.comparison_counter = 0

    def size(self):
        return self.count

    def worstDist(self):
        return self.radius

    def add_point(self, dist, index):
        self.comparison_counter += 1
        if dist > self.radius:
            return

        self.count += 1
        self.dist_index_list.append(DistIndex(dist, index))

    def __str__(self):
        self.dist_index_list.sort()
        output = ''
        for i, dist_index in enumerate(self.dist_index_list):
            output += '%d - %.2f\n' % (dist_index.index, dist_index.distance)
        output += 'In total %d neighbors within %f.\nThere are %d comparison operations.' \
                  % (self.count, self.radius, self.comparison_counter)
        return output

KMeans.py

# 文件功能: 实现 K-Means 算法

import numpy as np
import random
import matplotlib.pyplot as plt
from result_set import KNNResultSet, RadiusNNResultSet
import  kdtree as kdtree

# 二维点云显示函数
def Point_Show(point,color):
    x = []
    y = []
    point = np.asarray(point)
    for i in range(len(point)):
        x.append(point[i][0])
        y.append(point[i][1])
    plt.scatter(x, y,color=color)
    plt.show()


class K_Means(object):
    # k是分组数;tolerance‘中心点误差’;max_iter是迭代次数
    def __init__(self, n_clusters=2, tolerance=0.0001, max_iter=300):
        self.k_ = n_clusters
        self.tolerance_ = tolerance
        self.max_iter_ = max_iter

# 进行中心点的确定
    def fit(self, data):
        #
        # 屏蔽开始
        #step1 随机选取 K个数据点 作为聚类的中心
        self.centers_ = data[random.sample(range(data.shape[0]),self.k_)]    #random.sample(list,num)
        old_centers = np.copy(self.centers_)                                          #存储old_centers
        #step2 E-Step(expectation):N个点、K个中心,求N个点到K个中心的nearest-neighbor
        #kd-tree config
        leaf_size = 1
        k = 1  # 结果每个点选取属于自己的类中心
        for _ in range(self.max_iter_):
            labels = [[] for i in range(self.k_)]        #用于分类所有数据点
            root = kdtree.kdtree_construction(self.centers_ , leaf_size=leaf_size)    #对中心点进行构建kd-tree
            for i in range(data.shape[0]):       #对每一个点在4个中心点中进行 1-NN的搜索
                result_set = KNNResultSet(capacity=k)
                query =  data[i]
                kdtree.kdtree_knn_search(root, self.centers_, result_set, query)     #返回对应中心点的索引
                # labels[result_set.output_index].append(data[i])
                #print(result_set)
                output_index = result_set.knn_output_index()[0]                 #获取最邻近点的索引
                labels[output_index].append(data[i])             #将点放入类中
            #step3 M-Step(maximization):更新中心点的位置,把属于同一个类的数据点求一个均值,作为这个类的中心值
            for i in range(self.k_):     #求K类里,每个类的的中心点
                points = np.array(labels[i])
                self.centers_[i] = points.mean(axis=0)       #取点的均值,作为新的聚类中心
                # print(points)
                # print(self.centers_[i])
            if np.sum(np.abs(self.centers_ - old_centers)) < self.tolerance_ * self.k_:  # 如果前后聚类中心的距离相差小于self.tolerance_ * self.k_ 输出
                break
            old_centers = np.copy(self.centers_)     #保存旧中心点
        self.fitted = True

        # 屏蔽结
    #计算每个点的类别
    def predict(self, p_datas):
        result = []
        # 作业2
        # 屏蔽开始
        if not self.fitted:
            print('Unfitter. ')
            return result
        for point in p_datas:
            diff = np.linalg.norm(self.centers_ - point, axis=1)     #使用二范数求解每个点对新的聚类中心的距离
            result.append(np.argmin(diff))                           #返回离该点距离最小的聚类中心,标记rnk = 1
        # 屏蔽结束
        return result

if __name__ == '__main__':
    db_size = 10
    dim = 2
    #x = np.random.rand(db_size,dim)
    # x = np.array([[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]])
    x = np.genfromtxt(r"point.txt",delimiter="").reshape((-1,2))
    Point_Show(x,color="blue")
    k_means = K_Means(n_clusters=2)    #计算迭代后的中心点
    k_means.fit(x)                     #计算每个点属于哪个类
    cat = k_means.predict(x)
    print(cat)
    cluster = [[] for i in range(2)]        #用于分类所有数据点
    for i in range(len(x)):
        if cat[i] == 0:
            cluster[0].append(x[i])
        else:cluster[1].append(x[i])
    Point_Show(cluster[0],"red")
    Point_Show(cluster[1], "yellow")

kd_tree.py

import random
import math
import numpy as np

from result_set import KNNResultSet, RadiusNNResultSet


class Node:
    def __init__(self, axis, value, left, right, point_indices):
        self.axis = axis
        self.value = value
        self.left = left
        self.right = right
        self.point_indices = point_indices

    def is_leaf(self):
        if self.value is None:
            return True
        else:
            return False

    def __str__(self):
        output = ''
        output += 'axis %d, ' % self.axis
        if self.value is None:
            output += 'split value: leaf, '
        else:
            output += 'split value: %.2f, ' % self.value
        output += 'point_indices: '
        output += str(self.point_indices.tolist())
        return output


def sort_key_by_vale(key, value):
    assert key.shape == value.shape       #assert 断言操作,用于判断一个表达式,在表达式条件为false的时候触发异常
    assert len(key.shape) == 1            #numpy是多维数组
    sorted_idx = np.argsort(value)        #对value值进行排序
    key_sorted = key[sorted_idx]
    value_sorted = value[sorted_idx]      #进行升序排序
    return key_sorted, value_sorted


def axis_round_robin(axis, dim):         #用于轴的轮换
    if axis == dim-1:
        return 0
    else:
        return axis + 1


def kdtree_recursive_build(root, db, point_indices, axis, leaf_size):    #kd树的建立
    """
    :param root:
    :param db: NxD
    :param db_sorted_idx_inv: NxD
    :param point_idx: M
    :param axis: scalar
    :param leaf_size: scalar
    :return:
    """
    if root is None:
        root = Node(axis, None, None, None, point_indices)           #实例化Node

    # determine whether to split into left and right
    if len(point_indices) > leaf_size:                              #判断是否需要进行分割
        # --- get the split position ---
        point_indices_sorted, _ = sort_key_by_vale(point_indices, db[point_indices, axis])  #对点进行排列,dp存储信息
        middle_left_idx = math.ceil(point_indices_sorted.shape[0] / 2) - 1     #分一半
        middle_left_point_idx = point_indices_sorted[middle_left_idx]          #左边界点
        middle_left_point_value = db[middle_left_point_idx, axis]

        middle_right_idx = middle_left_idx + 1
        middle_right_point_idx = point_indices_sorted[middle_right_idx]
        middle_right_point_value = db[middle_right_point_idx, axis]           #右边界点

        root.value = (middle_left_point_value + middle_right_point_value) * 0.5    #取中值为节点值
        # === get the split position ===
        root.left = kdtree_recursive_build(root.left,
                                           db,
                                           point_indices_sorted[0:middle_right_idx],
                                           axis_round_robin(axis, dim=db.shape[1]),
                                           leaf_size)                                  #对对应的轴值进行排序
        root.right = kdtree_recursive_build(root.right,
                                           db,
                                           point_indices_sorted[middle_right_idx:],
                                           axis_round_robin(axis, dim=db.shape[1]),
                                           leaf_size)                                  #对对应的轴值进行排序
    return root


def traverse_kdtree(root: Node, depth, max_depth):      #计算kdtree的深度
    depth[0] += 1
    if max_depth[0] < depth[0]:
        max_depth[0] = depth[0]

    if root.is_leaf():                                 #打印最后的叶子节点
        print(root)
    else:
        traverse_kdtree(root.left, depth, max_depth)    #累加计算深度
        traverse_kdtree(root.right, depth, max_depth)

    depth[0] -= 1


def kdtree_construction(db_np, leaf_size):
    N, dim = db_np.shape[0], db_np.shape[1]

    # build kd_tree recursively
    root = None
    root = kdtree_recursive_build(root,
                                  db_np,
                                  np.arange(N),
                                  axis=0,
                                  leaf_size=leaf_size)
    return root


def kdtree_knn_search(root: Node, db: np.ndarray, result_set: KNNResultSet, query: np.ndarray):   #KNNResultSet 继承二叉树的结果集
    if root is None:
        return False

    if root.is_leaf():
        # compare the contents of a leaf
        leaf_points = db[root.point_indices, :]
        diff = np.linalg.norm(np.expand_dims(query, 0) - leaf_points, axis=1)
        for i in range(diff.shape[0]):
            result_set.add_point(diff[i], root.point_indices[i])
        return False

    if query[root.axis] <= root.value:          #如果 q[axis] inside the partition   如果查询点在根节点的左边,一定要查找左边
        kdtree_knn_search(root.left, db, result_set, query)
        if math.fabs(query[root.axis] - root.value) < result_set.worstDist():   #如果目标点离轴虚线的距离小于worst_dist 继续搜寻节点的右边
            kdtree_knn_search(root.right, db, result_set, query)
    else:
        kdtree_knn_search(root.right, db, result_set, query)
        if math.fabs(query[root.axis] - root.value) < result_set.worstDist():
            kdtree_knn_search(root.left, db, result_set, query)

    return False


def kdtree_radius_search(root: Node, db: np.ndarray, result_set: RadiusNNResultSet, query: np.ndarray):
    if root is None:
        return False

    if root.is_leaf():
        # compare the contents of a leaf
        leaf_points = db[root.point_indices, :]
        diff = np.linalg.norm(np.expand_dims(query, 0) - leaf_points, axis=1)             #取行的差值,暴力搜索
        for i in range(diff.shape[0]):
            result_set.add_point(diff[i], root.point_indices[i])
        return False

    if query[root.axis] <= root.value:
        kdtree_radius_search(root.left, db, result_set, query)
        if math.fabs(query[root.axis] - root.value) < result_set.worstDist():
            kdtree_radius_search(root.right, db, result_set, query)
    else:
        kdtree_radius_search(root.right, db, result_set, query)
        if math.fabs(query[root.axis] - root.value) < result_set.worstDist():
            kdtree_radius_search(root.left, db, result_set, query)

    return False



def main():
    # configuration
    db_size = 64
    dim = 3                #三维
    leaf_size = 4
    k = 1                  #一个点

    db_np = np.random.rand(db_size, dim)

    root = kdtree_construction(db_np, leaf_size=leaf_size)

    depth = [0]
    max_depth = [0]
    traverse_kdtree(root, depth, max_depth)
    print("tree max depth: %d" % max_depth[0])
    query = np.asarray([0, 0, 0])
    result_set = KNNResultSet(capacity=k)
    kdtree_knn_search(root, db_np, result_set, query)

    print(result_set)
    print(db_np)
    #
    # diff = np.linalg.norm(np.expand_dims(query, 0) - db_np, axis=1)
    # nn_idx = np.argsort(diff)
    # nn_dist = diff[nn_idx]
    # print(nn_idx[0:k])
    # print(nn_dist[0:k])
    #
    #
    # print("Radius search:")
    # query = np.asarray([0, 0, 0])
    # result_set = RadiusNNResultSet(radius = 0.5)
    # radius_search(root, db_np, result_set, query)
    # print(result_set)


if __name__ == '__main__':
    main()

本文地址:https://blog.csdn.net/weixin_41281151/article/details/107449482

相关标签: 3D点云学习