欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CART回归树的原理及python实现的demo

程序员文章站 2022-06-18 11:38:34
...

回归树其实就是把相邻的几个点(不纯度较小的点,一般用mse计算) 视为一个区间,所以参数里包括 最小不纯度 min_impurity_decrease, 例如以下的点, 只分成2 类, 大于6.5的就返回右边三个点的平均值8.913 ,小于6.5的返回6.237

CART回归树的原理及python实现的demo

决策树张这个样:

tree = {
     x : {'<=6.5': 6.236666666666667, '>6.5': 8.912500000000001}
     }

 

如果调整了min_impurity_decrease=0.1 ,则拟合程度就更高了

CART回归树的原理及python实现的demo

此时决策树张这个样:

tree = {
     x :{
         <=6.5 :{
             x : {'<=3.5': 5.723333333333334, '>3.5': 6.75}
             }
         >6.5 :  8.912500000000001
         }
     }

换另一个数据集

CART回归树的原理及python实现的demo

 

tree = {
     x : {'<=0.504087': -0.04465028571428573, '>0.504087': 1.018096767241379}
     }

CART回归树的原理及python实现的demo

 

tree = {
     x :{
         <=0.504087 :{
             x : {'<=0.0649725': 0.097105625, '>0.0649725': -0.05957196052631579}
             }
         >0.504087 :{
             x : {'<=0.6216415': 1.1081870645161291, '>0.6216415': 0.985240305882353}
             }
         }
     }

代码:

import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import datasets
import matplotlib.pyplot as plt

def loadDataSet(fileName,split):
    data = pd.DataFrame()
    data['x'] = ''
    data['label'] = ''
    fr = open(fileName)
    i = 0
    for line in fr.readlines():
        curLine = line.strip().split(split)
        try:
            fltLine = [float(curLine[0].strip()), float(curLine[1].strip())]
            data.loc[i, 'x'] = fltLine[0]
            data.loc[i, 'label'] = fltLine[1]
            i += 1
        except:
            continue
    return data


def load_data_regression(num):
    if(num==1):
        x = np.array(list(range(1, 11))).flatten()
        y = np.array([5.56, 5.70, 5.91, 6.40, 6.80, 7.05, 8.90, 8.70, 9.00, 9.05]).flatten()
        train_data = pd.DataFrame()
        train_data['x'] = x
        train_data['label'] = y
        test_data = pd.DataFrame(np.array([3, 6, 9]), columns=['x'])
    elif(num==2):
        train_data= loadDataSet("ex00.txt",' ')
        test_data = pd.DataFrame(np.array([0.3, 0.6, 0.9]), columns=['x'])
    elif(num==3):
        train_data = loadDataSet("ex2.txt",'\t')
        test_data = pd.DataFrame(np.array([0.3,0.6,0.9]), columns=['x'])
    return train_data, test_data


class Node():
    def __init__(self):
        self.left_child = None
        self.right_child = None
        self.data = None
        self.is_leaf = False
        self.split_feature = None
        self.split_value = None
        self.samples = 0
        self.mse = 0.
        self.value = 0.
        self.depth = 0

class Tree():
    def __init__(self, criterion, max_depth, min_samples_split=2, min_samples_leaf=1, min_impurity_decrease=0.,
                 max_leaf_nodes=2):
        if (max_leaf_nodes == None):
            max_leaf_nodes = np.inf
        elif (max_leaf_nodes < 2):
            raise ValueError('max_leaf_nodes 至少是2')

        if (min_samples_split == None):
            min_samples_split = 2
        elif (min_samples_split < 2):
            raise ValueError('min_samples_split 至少是2')

        if (max_depth == None):
            max_depth = 2
        elif (max_depth < 1):
            raise ValueError('max_depth 至少是1')

        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_leaf_nodes = max_leaf_nodes
        self.min_impurity_decrease = min_impurity_decrease

    def fit(self, data):
        self.leaf_nodes_num = 0  # 用来记录已经产生多少个node
        self.root_node = self.__build_tree(data, depth=0)

    def predict(self, x_df):
        y_array = []
        for i in range(x_df.shape[0]):
            y_array.append(self.__predict(x_df, i, self.root_node))
        return np.array(y_array)

    def __predict(self, x_df, index, node):
        if (node.is_leaf):
            if (None != node.value):
                return node.value
            else:
                raise ValueError
        else:
            try:
                value = x_df.loc[index, node.split_feature]
                if (value <= node.split_value):
                    return self.__predict(x_df, index, node.left_child)
                else:
                    return self.__predict(x_df, index, node.right_child)
            except:
                raise TypeError

    def __build_tree(self, data, depth):
        # data.reset_index(drop=True,inplace=True)
        can_split = False
        # 检查是否可以继续分裂
        total_impurity = self.cal_se(np.array(data['label']))
        if(total_impurity==0 or total_impurity<self.min_impurity_decrease):
            #差异太小不用再分了
            can_split = False
        elif (depth == self.max_depth or data.shape[0] < self.min_samples_split or self.leaf_nodes_num > self.max_leaf_nodes - 1):
            can_split = False
        else:
            # 尝试分裂
            criteria_feature, criteria_value, final_left_data, final_right_data = self.__choose_best_split(
                data,total_impurity)
            if (criteria_feature != None):
                can_split = True

        node = Node()
        node.value = np.mean(data['label'])
        node.samples = data.shape[0]
        node.mse = total_impurity/node.samples
        node.depth = depth
        node.data = data
        if (can_split):
            if (self.leaf_nodes_num == 0):
                self.leaf_nodes_num += 2
            else:
                self.leaf_nodes_num += 1

            node.is_leaf = False
            node.split_feature = criteria_feature
            node.split_value = criteria_value
            node.left_child = self.__build_tree(final_left_data, depth + 1)
            node.right_child = self.__build_tree(final_right_data, depth + 1)
        else:
            node.is_leaf = True
        return node

    def __choose_best_split(self, data, total_impurity):
        print('total_impurity',total_impurity)
        se = None
        criteria_feature = None
        criteria_value = 0.

        features = np.array(data.columns).tolist()
        features.remove('label')

        for feature in features:
            for i in range(data.shape[0] - 1):
                first_value = data.loc[i, feature]
                second_value = data.loc[i + 1, feature]
                value = (first_value + second_value) / 2
                left_data = data[data[feature] <= value]
                right_data = data[data[feature] > value]

                if (left_data.shape[0] < self.min_samples_leaf or right_data.shape[0] < self.min_samples_leaf):
                    continue

                left_impurity = self.cal_se(np.array(left_data['label']))
                right_impurity = self.cal_se(np.array(right_data['label']))
                sumSe = left_impurity + right_impurity
                impurity_decrease = (total_impurity - sumSe)/data.shape[0]
                if (impurity_decrease<0 or impurity_decrease < self.min_impurity_decrease):
                    # 未满足最小不纯度减少值
                    continue
                if (se == None or sumSe < se):
                    se = sumSe
                    criteria_feature = feature
                    criteria_value = value
                    final_left_data = left_data
                    final_right_data = right_data

        if (se == None):
            return None, None, None, None
        else:
            final_left_data = final_left_data.dropna(axis=0, how="any")
            final_left_data = final_left_data.reset_index(drop=True)
            final_right_data = final_right_data.dropna(axis=0, how="any")
            final_right_data = final_right_data.reset_index(drop=True)
            print('最佳分割feature:{},value:{},减少mse:{:.4f}'.format(criteria_feature, criteria_value,impurity_decrease))
            return criteria_feature, criteria_value, final_left_data, final_right_data

    def cal_se(self, label):
        mean = label.mean()
        se = 0.
        for y in label:
            se += (y - mean) **2
        return se

    def plot_one_dimemtion(self, data):
        select_feature = data.columns[0]
        X, label = self.sort_x_y(data, select_feature)
        data = pd.DataFrame(X, columns=[select_feature])
        predict = self.predict(data)
        self.plot(X, label, predict)

    def sort_x_y(self, data, select_feature):
        donwSet = set()
        donwSet.add(-1)
        # select_feature='x'
        index = -1
        X = []
        label = []
        X_tmp = np.array(data[select_feature].copy(), dtype=float)
        for i in range(len(X_tmp)):
            while (index in donwSet):
                index = np.argmin(X_tmp)
            donwSet.add(index)
            X_tmp[index] = np.inf
            X.append(data.loc[index, select_feature])
            label.append(data.loc[index, 'label'])
        X = np.array(X)
        label = np.array(label)
        return X, label

    def plot(self, x, lebel, predict):
        plt.scatter(x, lebel, s=20)
        plt.plot(x, predict, color='green')
        plt.show()

    def tree_to_dict(self):
        return self.__get_node_dict(self.root_node)

    def __get_node_dict(self, node):
        if (node.is_leaf):
            return node.value
        else:
            left_value = self.__get_node_dict(node.left_child)
            right_value = self.__get_node_dict(node.right_child)
            left_key = '<=' + str(node.split_value)
            right_key = '>' + str(node.split_value)
            innerDic = {}
            innerDic[left_key] = left_value
            innerDic[right_key] = right_value
            dic={}
            dic[node.split_feature]=innerDic
            return dic

def get_args():
    parser = argparse.ArgumentParser(description='GBDT-Simple-Tutorial')
    parser.add_argument('-criterion', default='mse', type=str, choices=['gini', 'entropy', 'mse'], help='分裂节点时评价准则')
    parser.add_argument('-max_depth', default=None, type=int, help='树的深度,停止分裂条件')
    parser.add_argument('-min_samples_split', default=2, type=int, help='分裂一个内部节点需要的做少样本数,停止分裂条件')
    parser.add_argument('-min_samples_leaf', default=1, type=int, help='每个叶子节点需要的最少样本数,停止分裂条件')
    parser.add_argument('-max_leaf_nodes', default=None, type=int, help='叶子节点的最大数量,停止分裂条件')
    parser.add_argument('-min_impurity_decrease', default=0., type=float, help='最小不纯度的减少量,停止分裂条件')
    args = parser.parse_args()
    return args

def print_dict(src_dict, level=0, src_dict_namestr='tree'):

    if isinstance(src_dict, dict):
        tab_str = '\t'
        for i in range(level):
            tab_str += '\t'
        if 0 == level:
            print(src_dict_namestr, '= {')
        for key, value in src_dict.items():
            if isinstance(value, dict):
                has_dict = False
                for k, v in value.items():
                    if isinstance(v, dict):
                        has_dict = True
                if has_dict:
                    print(tab_str, key, ":{")
                    print_dict(value, level + 1)
                else:
                    print(tab_str, key, ':', value)
            else:
                print(tab_str, key, ': ', value, )
        print(tab_str, '}')


def run():
    train_data, test_data = load_data_regression(2)



    args = get_args()
    args.max_depth =None
    args.min_samples_split = 2
    args.min_samples_leaf = 1
    args.max_leaf_nodes = None
    args.min_impurity_decrease = 0.
    model = Tree(args.criterion, args.max_depth, args.min_samples_split, args.min_samples_leaf,
                 args.min_impurity_decrease, args.max_leaf_nodes)
    model.fit(train_data)
    dic=model.tree_to_dict()
    print_dict(dic)

    try:
        # 只有一维的数据才可以,否则会默认选择一维的feature,如果判断中用到其他feature会出错
        model.plot_one_dimemtion(train_data)
    except:
        pass

    result=model.predict(test_data)

    print("测试数据:",test_data['x'].values)
    print("预测值:",result)
    print('-------------------------------------------------------------------------------')


if __name__ == '__main__':
    import sys
    run()
    sys.exit()

 

 

 

 

 

相关标签: ai 机器学习