CART回归树的原理及python实现的demo
回归树其实就是把相邻的几个点(不纯度较小的点,一般用mse计算) 视为一个区间,所以参数里包括 最小不纯度 min_impurity_decrease, 例如以下的点, 只分成2 类, 大于6.5的就返回右边三个点的平均值8.913 ,小于6.5的返回6.237
决策树张这个样:
tree = {
x : {'<=6.5': 6.236666666666667, '>6.5': 8.912500000000001}
}
如果调整了min_impurity_decrease=0.1 ,则拟合程度就更高了
此时决策树张这个样:
tree = {
x :{
<=6.5 :{
x : {'<=3.5': 5.723333333333334, '>3.5': 6.75}
}
>6.5 : 8.912500000000001
}
}
换另一个数据集
tree = {
x : {'<=0.504087': -0.04465028571428573, '>0.504087': 1.018096767241379}
}
tree = {
x :{
<=0.504087 :{
x : {'<=0.0649725': 0.097105625, '>0.0649725': -0.05957196052631579}
}
>0.504087 :{
x : {'<=0.6216415': 1.1081870645161291, '>0.6216415': 0.985240305882353}
}
}
}
代码:
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import datasets
import matplotlib.pyplot as plt
def loadDataSet(fileName,split):
data = pd.DataFrame()
data['x'] = ''
data['label'] = ''
fr = open(fileName)
i = 0
for line in fr.readlines():
curLine = line.strip().split(split)
try:
fltLine = [float(curLine[0].strip()), float(curLine[1].strip())]
data.loc[i, 'x'] = fltLine[0]
data.loc[i, 'label'] = fltLine[1]
i += 1
except:
continue
return data
def load_data_regression(num):
if(num==1):
x = np.array(list(range(1, 11))).flatten()
y = np.array([5.56, 5.70, 5.91, 6.40, 6.80, 7.05, 8.90, 8.70, 9.00, 9.05]).flatten()
train_data = pd.DataFrame()
train_data['x'] = x
train_data['label'] = y
test_data = pd.DataFrame(np.array([3, 6, 9]), columns=['x'])
elif(num==2):
train_data= loadDataSet("ex00.txt",' ')
test_data = pd.DataFrame(np.array([0.3, 0.6, 0.9]), columns=['x'])
elif(num==3):
train_data = loadDataSet("ex2.txt",'\t')
test_data = pd.DataFrame(np.array([0.3,0.6,0.9]), columns=['x'])
return train_data, test_data
class Node():
def __init__(self):
self.left_child = None
self.right_child = None
self.data = None
self.is_leaf = False
self.split_feature = None
self.split_value = None
self.samples = 0
self.mse = 0.
self.value = 0.
self.depth = 0
class Tree():
def __init__(self, criterion, max_depth, min_samples_split=2, min_samples_leaf=1, min_impurity_decrease=0.,
max_leaf_nodes=2):
if (max_leaf_nodes == None):
max_leaf_nodes = np.inf
elif (max_leaf_nodes < 2):
raise ValueError('max_leaf_nodes 至少是2')
if (min_samples_split == None):
min_samples_split = 2
elif (min_samples_split < 2):
raise ValueError('min_samples_split 至少是2')
if (max_depth == None):
max_depth = 2
elif (max_depth < 1):
raise ValueError('max_depth 至少是1')
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_leaf_nodes = max_leaf_nodes
self.min_impurity_decrease = min_impurity_decrease
def fit(self, data):
self.leaf_nodes_num = 0 # 用来记录已经产生多少个node
self.root_node = self.__build_tree(data, depth=0)
def predict(self, x_df):
y_array = []
for i in range(x_df.shape[0]):
y_array.append(self.__predict(x_df, i, self.root_node))
return np.array(y_array)
def __predict(self, x_df, index, node):
if (node.is_leaf):
if (None != node.value):
return node.value
else:
raise ValueError
else:
try:
value = x_df.loc[index, node.split_feature]
if (value <= node.split_value):
return self.__predict(x_df, index, node.left_child)
else:
return self.__predict(x_df, index, node.right_child)
except:
raise TypeError
def __build_tree(self, data, depth):
# data.reset_index(drop=True,inplace=True)
can_split = False
# 检查是否可以继续分裂
total_impurity = self.cal_se(np.array(data['label']))
if(total_impurity==0 or total_impurity<self.min_impurity_decrease):
#差异太小不用再分了
can_split = False
elif (depth == self.max_depth or data.shape[0] < self.min_samples_split or self.leaf_nodes_num > self.max_leaf_nodes - 1):
can_split = False
else:
# 尝试分裂
criteria_feature, criteria_value, final_left_data, final_right_data = self.__choose_best_split(
data,total_impurity)
if (criteria_feature != None):
can_split = True
node = Node()
node.value = np.mean(data['label'])
node.samples = data.shape[0]
node.mse = total_impurity/node.samples
node.depth = depth
node.data = data
if (can_split):
if (self.leaf_nodes_num == 0):
self.leaf_nodes_num += 2
else:
self.leaf_nodes_num += 1
node.is_leaf = False
node.split_feature = criteria_feature
node.split_value = criteria_value
node.left_child = self.__build_tree(final_left_data, depth + 1)
node.right_child = self.__build_tree(final_right_data, depth + 1)
else:
node.is_leaf = True
return node
def __choose_best_split(self, data, total_impurity):
print('total_impurity',total_impurity)
se = None
criteria_feature = None
criteria_value = 0.
features = np.array(data.columns).tolist()
features.remove('label')
for feature in features:
for i in range(data.shape[0] - 1):
first_value = data.loc[i, feature]
second_value = data.loc[i + 1, feature]
value = (first_value + second_value) / 2
left_data = data[data[feature] <= value]
right_data = data[data[feature] > value]
if (left_data.shape[0] < self.min_samples_leaf or right_data.shape[0] < self.min_samples_leaf):
continue
left_impurity = self.cal_se(np.array(left_data['label']))
right_impurity = self.cal_se(np.array(right_data['label']))
sumSe = left_impurity + right_impurity
impurity_decrease = (total_impurity - sumSe)/data.shape[0]
if (impurity_decrease<0 or impurity_decrease < self.min_impurity_decrease):
# 未满足最小不纯度减少值
continue
if (se == None or sumSe < se):
se = sumSe
criteria_feature = feature
criteria_value = value
final_left_data = left_data
final_right_data = right_data
if (se == None):
return None, None, None, None
else:
final_left_data = final_left_data.dropna(axis=0, how="any")
final_left_data = final_left_data.reset_index(drop=True)
final_right_data = final_right_data.dropna(axis=0, how="any")
final_right_data = final_right_data.reset_index(drop=True)
print('最佳分割feature:{},value:{},减少mse:{:.4f}'.format(criteria_feature, criteria_value,impurity_decrease))
return criteria_feature, criteria_value, final_left_data, final_right_data
def cal_se(self, label):
mean = label.mean()
se = 0.
for y in label:
se += (y - mean) **2
return se
def plot_one_dimemtion(self, data):
select_feature = data.columns[0]
X, label = self.sort_x_y(data, select_feature)
data = pd.DataFrame(X, columns=[select_feature])
predict = self.predict(data)
self.plot(X, label, predict)
def sort_x_y(self, data, select_feature):
donwSet = set()
donwSet.add(-1)
# select_feature='x'
index = -1
X = []
label = []
X_tmp = np.array(data[select_feature].copy(), dtype=float)
for i in range(len(X_tmp)):
while (index in donwSet):
index = np.argmin(X_tmp)
donwSet.add(index)
X_tmp[index] = np.inf
X.append(data.loc[index, select_feature])
label.append(data.loc[index, 'label'])
X = np.array(X)
label = np.array(label)
return X, label
def plot(self, x, lebel, predict):
plt.scatter(x, lebel, s=20)
plt.plot(x, predict, color='green')
plt.show()
def tree_to_dict(self):
return self.__get_node_dict(self.root_node)
def __get_node_dict(self, node):
if (node.is_leaf):
return node.value
else:
left_value = self.__get_node_dict(node.left_child)
right_value = self.__get_node_dict(node.right_child)
left_key = '<=' + str(node.split_value)
right_key = '>' + str(node.split_value)
innerDic = {}
innerDic[left_key] = left_value
innerDic[right_key] = right_value
dic={}
dic[node.split_feature]=innerDic
return dic
def get_args():
parser = argparse.ArgumentParser(description='GBDT-Simple-Tutorial')
parser.add_argument('-criterion', default='mse', type=str, choices=['gini', 'entropy', 'mse'], help='分裂节点时评价准则')
parser.add_argument('-max_depth', default=None, type=int, help='树的深度,停止分裂条件')
parser.add_argument('-min_samples_split', default=2, type=int, help='分裂一个内部节点需要的做少样本数,停止分裂条件')
parser.add_argument('-min_samples_leaf', default=1, type=int, help='每个叶子节点需要的最少样本数,停止分裂条件')
parser.add_argument('-max_leaf_nodes', default=None, type=int, help='叶子节点的最大数量,停止分裂条件')
parser.add_argument('-min_impurity_decrease', default=0., type=float, help='最小不纯度的减少量,停止分裂条件')
args = parser.parse_args()
return args
def print_dict(src_dict, level=0, src_dict_namestr='tree'):
if isinstance(src_dict, dict):
tab_str = '\t'
for i in range(level):
tab_str += '\t'
if 0 == level:
print(src_dict_namestr, '= {')
for key, value in src_dict.items():
if isinstance(value, dict):
has_dict = False
for k, v in value.items():
if isinstance(v, dict):
has_dict = True
if has_dict:
print(tab_str, key, ":{")
print_dict(value, level + 1)
else:
print(tab_str, key, ':', value)
else:
print(tab_str, key, ': ', value, )
print(tab_str, '}')
def run():
train_data, test_data = load_data_regression(2)
args = get_args()
args.max_depth =None
args.min_samples_split = 2
args.min_samples_leaf = 1
args.max_leaf_nodes = None
args.min_impurity_decrease = 0.
model = Tree(args.criterion, args.max_depth, args.min_samples_split, args.min_samples_leaf,
args.min_impurity_decrease, args.max_leaf_nodes)
model.fit(train_data)
dic=model.tree_to_dict()
print_dict(dic)
try:
# 只有一维的数据才可以,否则会默认选择一维的feature,如果判断中用到其他feature会出错
model.plot_one_dimemtion(train_data)
except:
pass
result=model.predict(test_data)
print("测试数据:",test_data['x'].values)
print("预测值:",result)
print('-------------------------------------------------------------------------------')
if __name__ == '__main__':
import sys
run()
sys.exit()
下一篇: 小米便签怎么画画?小米便签涂鸦画画方法