决策树CART分类算法
程序员文章站
2024-02-15 15:11:29
...
决策树算法可以用作分类回归,两者差异在于构造树时节点分裂规则的不同。分类算法用Gini系数分裂,回归则是计算方差。
决策树分类:
假设存在数据集(X, Y),Y包含0和1两中标签
Num |
X0 |
X1 |
… |
Y |
0 |
X01 (1) |
X02 |
… |
Y0 |
1 |
X11 (0) |
X12 |
… |
Y1 |
2 |
X21 (2) |
X22 |
… |
Y2 |
第一步,根据标签Y计算Gini系数:
Gini = 1 – (Count(0)^2 + Count(1)^2) / (Count(0) + Count(1))^2
第二步,取出一列如X0,统计该列所有取值的集合SX,取SX中一个元素X0,大于等于X0的行用作右树生成,如索引为2的行,称之为右子数据集;小于 X0的行用作左树生成,如索引为1的行,称之为左子数据集。分别计算左子数据集和右子数据集的Gini系数,写作Left_Gini和Right_Gini
第三步,计算Gini增益,计算 Left_Gini和Right_Gini后,再计算父数据集由X0分类后的Gini系数:
Gini_Split = (Len(左数据集) * Left_Gini + Len(右数据集) * Right_Gini) / Len(父数据集)
Gini增益为:
Gain = Gini – Gini_Split
若Gain大于0则记录此分裂点,再将Gini置为Gini_Split,重复搜索,一直到每一列的SX遍历完成。
代码如下:
import numpy as np
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from collections import Counter
from sklearn.tree import DecisionTreeClassifier
import time
import asyncio
class Node:
def __init__(self, feature, value, left, right):
self.feature, self.value = feature, value
self.left = left
self.right = right
class DTree:
def __init__(self):
self.tree = None
def fit(self, x, y):
self.tree = self.build_tree(x, y, len(x))
@classmethod
def get_class_count(cls, _count):
class_count = np.zeros(4)
for k in _count.keys():
class_count[int(k)] = _count[k]
return class_count
def build_tree(self, x, y, num_sample):
best_gain, best_split = 0.0, None # 划分点
count = np.bincount(y)
gn = self._compute_gn(count) # 计算初始gini系数
for feature in range(x.shape[1]):
value = np.array(list(set(x[:, feature])), dtype="float")
value = np.sort(value)
value = value[:-1] + (value[1:] - value[:-1]) / 2
for feature_value in value:
ind = x[:, feature] >= feature_value
count = np.bincount(ind * 2 + y)
left = np.sum(count[:2])
if left * (len(x) - left) == 0:
continue
current_gn = (left * self._compute_gn(count[:2]) + \
(len(x) - left) * self._compute_gn(count[2:])) / len(x)
gain = gn - current_gn # 计算增益
if gain > best_gain and left * (len(x) - left) > 0: # 如果增益比之前的增益大,则重新设置增益
best_gain = gain
best_split = ind
if best_gain > 0: # 如果增益大于0则继续划分
left_x, left_y = x[np.logical_not(best_split)], y[np.logical_not(best_split)]
right_x, right_y = x[best_split], y[best_split]
_left = self.build_tree(left_x, left_y, num_sample)
_right = self.build_tree(right_x, right_y, num_sample)
return Node(feature, feature_value, _left, _right)
return y[0]
@classmethod
def _compute_gn(cls, count):
gn = 1 - np.sum(np.square(count)) / (np.square(np.sum(count)))
return gn
def predict(self, x):
y = np.zeros(len(x))
for num, sample in enumerate(x):
y[num] = self._predict_core(sample, self.tree)
return y
def _predict_core(self, sample, tree):
if not isinstance(tree, Node):
return tree
brand = tree.right if sample[tree.feature] >= tree.value else tree.left
return self._predict_core(sample, brand)
用协程实现的版本:
class AsyncDTree(DTree):
def __init__(self):
super().__init__()
def fit(self, x, y):
loop = asyncio.get_event_loop()
task = loop.run_until_complete(self.build_tree(x, y, len(x)))
self.tree = task
async def get_best_feature(self, x, y, feature, feature_value, gn):
ind = x[:, feature] >= feature_value
count = Counter(ind * 2 + y)
class_count = self.get_class_count(count)
left = np.sum(class_count[:2])
if left * (len(x) - left) == 0:
return None
current_gn = (left * self._compute_gn(class_count[:2]) + \
(len(x) - left) * self._compute_gn(class_count[2:])) / len(x)
gain = gn - current_gn
return gain, ind
async def build_tree(self, x, y, num_sample):
best_gain, best_split = 0.0, None
count = Counter(y)
count = self.get_class_count(count)
gn = self._compute_gn(count)
for feature in range(x.shape[1]):
value = np.array(list(set(x[:, feature])), dtype="float")
value = np.sort(value)
value = value[:-1] + (value[1:] - value[:-1]) / 2
for feature_value in value:
_ = await self.get_best_feature(x, y, feature, feature_value, gn)
if _ is None:
continue
gain, ind = _
if gain > best_gain:
best_gain = gain
best_split = ind
if best_gain > 0:
left_x, left_y = x[np.logical_not(best_split)], y[np.logical_not(best_split)]
right_x, right_y = x[best_split], y[best_split]
_left = await self.build_tree(left_x, left_y, num_sample)
_right = await self.build_tree(right_x, right_y, num_sample)
return Node(feature, feature_value, _left, _right)
return y[0]
结果如下:
x, y = make_blobs(n_samples=1000, centers=2, random_state=1, cluster_std=2)
train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=0.2)
dt = DTree()
dt.fit(train_x, train_y)
pred_y = dt.predict(test_x)
# print(pred_y, test_y)
plt.scatter(test_x[:, 0], test_x[:, 1], c=pred_y)
plt.show()
上一篇: 一文深刻理解决策树(系列三)