欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

决策树CART分类算法

程序员文章站 2024-02-15 15:11:29
...

决策树算法可以用作分类回归,两者差异在于构造树时节点分裂规则的不同。分类算法用Gini系数分裂,回归则是计算方差。

决策树分类:

假设存在数据集(X, Y),Y包含0和1两中标签

Num

X0

X1

Y

0

X01 (1)

X02

Y0

1

X11 (0)

X12

Y1

2

X21 (2)

X22

Y2

第一步,根据标签Y计算Gini系数:

Gini = 1 – (Count(0)^2 + Count(1)^2) / (Count(0) + Count(1))^2

第二步,取出一列如X0,统计该列所有取值的集合SX,取SX中一个元素X0,大于等于X0的行用作右树生成,如索引为2的行,称之为右子数据集;小于 X0的行用作左树生成,如索引为1的行,称之为左子数据集。分别计算左子数据集和右子数据集的Gini系数,写作Left_Gini和Right_Gini

第三步,计算Gini增益,计算 Left_Gini和Right_Gini后,再计算父数据集由X0分类后的Gini系数:
Gini_Split = (Len(左数据集) * Left_Gini + Len(右数据集) * Right_Gini) / Len(父数据集)

Gini增益为:

Gain = Gini – Gini_Split

若Gain大于0则记录此分裂点,再将Gini置为Gini_Split,重复搜索,一直到每一列的SX遍历完成。

代码如下:

import numpy as np
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from collections import Counter
from sklearn.tree import DecisionTreeClassifier
import time
import asyncio


class Node:
    def __init__(self, feature, value, left, right):
        self.feature, self.value = feature, value
        self.left = left
        self.right = right


class DTree:
    def __init__(self):
        self.tree = None

    def fit(self, x, y):
        self.tree = self.build_tree(x, y, len(x))

    @classmethod
    def get_class_count(cls, _count):
        class_count = np.zeros(4)
        for k in _count.keys():
            class_count[int(k)] = _count[k]
        return class_count

    def build_tree(self, x, y, num_sample):

        best_gain, best_split = 0.0, None   # 划分点
        count = np.bincount(y)
        gn = self._compute_gn(count)    # 计算初始gini系数
        for feature in range(x.shape[1]):
            value = np.array(list(set(x[:, feature])), dtype="float")
            value = np.sort(value)
            value = value[:-1] + (value[1:] - value[:-1]) / 2
            for feature_value in value:
                ind = x[:, feature] >= feature_value
                count = np.bincount(ind * 2 + y)
                left = np.sum(count[:2])
                if left * (len(x) - left) == 0:
                    continue

                current_gn = (left * self._compute_gn(count[:2]) + \
                             (len(x) - left) * self._compute_gn(count[2:])) / len(x)
                gain = gn - current_gn  # 计算增益

                if gain > best_gain and left * (len(x) - left) > 0: # 如果增益比之前的增益大,则重新设置增益
                    best_gain = gain
                    best_split = ind

        if best_gain > 0:   # 如果增益大于0则继续划分
            left_x, left_y = x[np.logical_not(best_split)], y[np.logical_not(best_split)]
            right_x, right_y = x[best_split], y[best_split]
            _left = self.build_tree(left_x, left_y, num_sample)
            _right = self.build_tree(right_x, right_y, num_sample)

            return Node(feature, feature_value, _left, _right)
        return y[0]

    @classmethod
    def _compute_gn(cls, count):
        gn = 1 - np.sum(np.square(count)) / (np.square(np.sum(count)))
        return gn

    def predict(self, x):
        y = np.zeros(len(x))
        for num, sample in enumerate(x):
            y[num] = self._predict_core(sample, self.tree)
        return y

    def _predict_core(self, sample, tree):
        if not isinstance(tree, Node):
            return tree
        brand = tree.right if sample[tree.feature] >= tree.value else tree.left
        return self._predict_core(sample, brand)

用协程实现的版本:

class AsyncDTree(DTree):
    def __init__(self):
        super().__init__()

    def fit(self, x, y):
        loop = asyncio.get_event_loop()
        task = loop.run_until_complete(self.build_tree(x, y, len(x)))
        self.tree = task

    async def get_best_feature(self, x, y, feature, feature_value, gn):
        ind = x[:, feature] >= feature_value
        count = Counter(ind * 2 + y)
        class_count = self.get_class_count(count)
        left = np.sum(class_count[:2])
        if left * (len(x) - left) == 0:
            return None

        current_gn = (left * self._compute_gn(class_count[:2]) + \
                     (len(x) - left) * self._compute_gn(class_count[2:])) / len(x)

        gain = gn - current_gn
        return gain, ind

    async def build_tree(self, x, y, num_sample):
        best_gain, best_split = 0.0, None
        count = Counter(y)
        count = self.get_class_count(count)
        gn = self._compute_gn(count)
        for feature in range(x.shape[1]):
            value = np.array(list(set(x[:, feature])), dtype="float")
            value = np.sort(value)
            value = value[:-1] + (value[1:] - value[:-1]) / 2
            for feature_value in value:
                _ = await self.get_best_feature(x, y, feature, feature_value, gn)
                if _ is None:
                    continue
                gain, ind = _
                if gain > best_gain:
                    best_gain = gain
                    best_split = ind
        if best_gain > 0:
            left_x, left_y = x[np.logical_not(best_split)], y[np.logical_not(best_split)]
            right_x, right_y = x[best_split], y[best_split]
            _left = await self.build_tree(left_x, left_y, num_sample)
            _right = await self.build_tree(right_x, right_y, num_sample)

            return Node(feature, feature_value, _left, _right)
        return y[0]

结果如下:

    x, y = make_blobs(n_samples=1000, centers=2, random_state=1, cluster_std=2)

    train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=0.2)
    dt = DTree()

    dt.fit(train_x, train_y)
    pred_y = dt.predict(test_x)
    # print(pred_y, test_y)
    plt.scatter(test_x[:, 0], test_x[:, 1], c=pred_y)

    plt.show()

决策树CART分类算法