Skip to content

Instantly share code, notes, and snippets.

@nknytk
Last active June 23, 2017 09:39
Show Gist options
  • Save nknytk/32ae3e07bf7479c9d2ec1fbd0d16b46f to your computer and use it in GitHub Desktop.
Save nknytk/32ae3e07bf7479c9d2ec1fbd0d16b46f to your computer and use it in GitHub Desktop.
リッチな決定木で精度向上を目指す

きっかけ

この2ヶ月ほど業務でGradient Boosting Decision Tree(GBDT)を使用しており、性能の高さに感心し、決定木のアルゴリズムに興味を持ちました。決定木の分岐条件の作り方について調べた時に、性能向上に繋がりそうな改 善策を思いついたので、実装して本当に性能が向上するか試してみてみたくなりました。

考えていること

決定木を作るためのアルゴリズムはID3, ID4.5, CARTなどいくつか存在します。私が調べた範囲では、どのアルゴリズムも

  • 特徴量は全て独立変数であると考える
  • 分岐条件を作る際は、データを最もよく分離できる特徴量を1つだけ選ぶ

点が共通していました。

個人的には、Neural Networkは人間の脳の内部構造を真似ることで人間の判断を再現しようとする方法、決定木は人間の判断順序を真似ることで人間の判断を再現しようとする方法であると解釈しています。ただ、人間が条 件分岐のような判断を行う際は、特徴の1箇所に注目するだけでなく、同じデータ間の複数の特徴を比較することも多いように思います。

極端な例を挙げると、四角形が横長か否かを判断する場合、人間は

のような判断をするでしょう。
他にも、去年の身長よりも今年の身長の方が大きかったら背が伸びている、あるトラックの最大積載量よりも実際の積載量が大きかったら過積載など、データ内の別の特徴間を比較して得られる情報は判断に有用であるように感じられます。

ということは、決定木の条件分岐に同データ内の特徴間の比較を加えると、より人間に近い判断ができるようになるのではないでしょうか。 実験で確かめてみます。

実験準備

実験のため、まずCARTの決定木を実装しました。実装にあたってはこちらの実装を参考に、実験の都合でいくつか変更を加えました。

  • 実装を簡単にするため
    • 分岐の評価はジニ係数に固定
    • 決定領域の可視化を諦め、可視化に使う変数を削除
  • 実験を高速にするため
    • 使いまわせる計算結果は使い回す
    • max_depthの違いによる性能比較を決定木を作り直さずに行えるよう、後からmax_depthを増やして末端ノードだけ分割するメソッドを追加
  • 同データ内の別の特徴量間の比較機能を追加
    • 全特徴量ではなく、ペアを作って比較を行う特徴のindexをコンストラクタに指定する実装

実験に使った決定木のコードはdecision_tree.pyです。

実験1: 四角形が横長かどうかを判定する

(横幅, 縦幅)のペアを1万組作り、9割を学習、1割を評価データとして、横幅のほうが大きいかどうか判定する決定木を学習させて精度を測りました。 コードはclassify_rect.pyです。実行すると、標準出力にタブ区切りで
max_depth sklearnの決定木の精度 実装した決定木の精度(データ内特徴量間比較なし) 実装した決定木の精度(データ内特徴量間比較あり)
が出力されます。

結果は以下のようになりました。

1       0.735   0.735   1.0
2       0.875   0.875   1.0
3       0.915   0.915   1.0
4       0.951   0.951   1.0
5       0.961   0.961   1.0
6       0.979   0.979   1.0
7       0.989   0.989   1.0
8       0.995   0.994   1.0
9       0.994   0.994   1.0
10      0.993   0.994   1.0
  • sklearnと実装した決定木(特徴量比較なし)の結果がほぼ揃っていること
  • 特徴量間の比較がない場合、木を深くしても完全には正しい分類をできないこと
  • データ内の特徴量間の比較を追加すれば深さ1で100%正解できること

いずれも期待通りです。
問題設定の非現実性・この問題を決定木で解くことの是非はさておき、実装したコードは正しく動作しています。

実験2: MNIST

もっと一般的なデータで効果を確認するため、MNISTで実験をしました。
全ての特徴が画素値で同質であり、データ内の特徴間比較が意味を持つ可能性があると考えたからです。

データセットをランダムに分割して9割を学習、1割を評価データとし、sklearnの決定木と、今回実装した決定木(データ内特徴量間比較あり)の評価精度を比較しました。 全画素を互いに比較することは現実的ではないため、28x28の中心の14x14の領域に絞り、行か列が同じ画素だけを比較する設定としました。 max_detphを1~30まで変えながら精度を測り、各モデルが最も高い精度を出した時の値を調べました。
実験に使ったコードはmnist_sklearn.py, mnist_mytree.pyです。

モデル 精度 max_depth
sklearn.tree.DecisionTreeClassifier 1回目 0.8775714285714286 14
sklearn.tree.DecisionTreeClassifier 2回目 0.8772857142857143 21,27
sklearn.tree.DecisionTreeClassifier 3回目 0.8788571428571429 16
実装した決定木(データ内特徴間比較なし) 0.8728571428571429 14
実装した決定木(データ内特徴間比較あり 中央14x14 同行のみ比較) 0.8782857142857143 18
実装した決定木(データ内特徴間比較あり 中央14x14 同行・同列のみ比較) 0.881 15

画像中央の14x14の領域で、縦横に画素感の比較を行った場合が最も良い精度となりました。ただし違いは小さく、誤差の可能性があります。誤差ではなかったとしても、改善幅は小さいのは確実です。
何度か精度を測って統計的に有意性を確認したいところですが、時間がかかるためまだ実施していません。

sklearnは1回の実験に20分程度、自前実装は3時間と、sklearnと自分で実装した決定木では(データ内の特徴間比較を行わなかった場合でも)学習速度に大きな差がありました。しかも、sklearnはmax_depthを変える度に1か>ら木を作り直しているのに対し、自前実装は1本の木の末端ノードだけを分割して深さを増やしながら精度を測っており、1本の木を学習させるためにかかる時間は3桁近い速度差がありました。

考察・所感

sklearnの速さに驚きました。分岐条件の探索を素直に実装すれば、計算量は データ数 * 特徴量の次元 * 特徴内のユニークな値の数 になりますが、sklearnは何かショートカットが入っているのでしょうか。ソースコー ドを流し読みしただけでは分からなかったので、今度時間を取って調べてみようと思います。

MNISTの結果には期待していたので、明確な精度向上がなかったのは残念でした。9割を超えるような精度を期待していたのですが。。
時系列に同じ意味の特徴が入っているような場合には、今回の方法で精度向上の可能性があると今でも考えています。効果がありそうなデータセットを見つけたら再度試してみる予定です。

最終的にはRandom ForestやGBDTの中の決定木の条件分岐を少しリッチにして実課題の解決に繋げたいのですが、先は長そうです。

# coding: utf-8
import math
import random
from time import time
import numpy
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from decision_tree import FeatureComparisonTree
def main(min_depth=1, max_depth=10):
x_all = numpy.array([(random.random(), random.random()) for i in range(10000)])
y_all = numpy.array([int(x[0] > x[1]) for x in x_all])
x_train, x_test, y_train, y_test = train_test_split(x_all, y_all, test_size=0.1)
my_model1 = FeatureComparisonTree(x_train, y_train, max_depth=0)
my_model2 = FeatureComparisonTree(x_train, y_train, max_depth=0, groups=[(0, 1)])
my_model1.split_node()
my_model2.split_node()
for d in range(min_depth, max_depth + 1):
sk_model = DecisionTreeClassifier(max_depth=d)
sk_model.fit(x_train, y_train)
sk_prediction = sk_model.predict(x_test)
sk_accuracy = accuracy_score(y_test, sk_prediction)
my_model1.add_depth()
my_model2.add_depth()
my_prediction1 = my_model1.predict(x_test)
my_prediction2 = my_model2.predict(x_test)
my_accuracy1 = accuracy_score(y_test, my_prediction1)
my_accuracy2 = accuracy_score(y_test, my_prediction2)
print('{}\t{}\t{}\t{}'.format(d, sk_accuracy, my_accuracy1, my_accuracy2))
if __name__ == '__main__':
main()
# coding: utf-8
from itertools import combinations
import numpy
class FeatureComparisonTree:
def __init__(self, samples, targets, impurity=None, max_depth=None, groups=[]):
self.samples = samples
self.targets = targets
self.classes = numpy.unique(targets)
self.impurity = impurity if impurity else self.gini_impurity(targets)
self.max_depth = max_depth
self.groups = groups
self.feature = None
self.feature_pair = None
self.label = None
def split_node(self):
if self.classes.shape[0] == 1:
self.label = self.classes[0]
return
if self.max_depth == 0:
self.label_by_vote()
return
single_split = self.single_feature_split_point()
pair_split = self.inter_feature_comparison_split()
if single_split['info_gain'] == pair_split['info_gain'] == 0:
self.label_by_vote()
return
if single_split['info_gain'] < pair_split['info_gain']:
self.feature_pair = pair_split['feature_pair']
samples_l = self.samples[self.samples[:, self.feature_pair[0]] <= self.samples[:, self.feature_pair[1]]]
samples_r = self.samples[self.samples[:, self.feature_pair[0]] > self.samples[:, self.feature_pair[1]]]
split_info = pair_split
else:
self.feature = single_split['feature']
self.threshold = single_split['threshold']
samples_l = self.samples[self.samples[:, self.feature] <= self.threshold]
samples_r = self.samples[self.samples[:, self.feature] > self.threshold]
split_info = single_split
self.left = FeatureComparisonTree(samples_l, split_info['targets_l'], split_info['impurity_l'], self.max_depth - 1, self.groups)
self.right = FeatureComparisonTree(samples_r, split_info['targets_r'], split_info['impurity_r'], self.max_depth - 1, self.groups)
self.left.split_node()
self.right.split_node()
# メモリ節約のため、最下層のノードが持つデータ以外は捨てる
self.samples = None
self.targets = None
self.classes = None
def add_depth(self):
# 深さを増やす
self.max_depth += 1
if self.label is not None:
self.label = None
self.split_node()
else:
self.left.add_depth()
self.right.add_depth()
def gini_impurity(self, targets):
n_samples = targets.shape[0]
classes = numpy.unique(targets)
impurity = 1
n_samples_in_last_class = n_samples
for i in range(len(classes) - 1):
n_samples_in_class = targets[targets == classes[i]].shape[0]
impurity -= (n_samples_in_class / n_samples) ** 2
n_samples_in_last_class -= n_samples_in_class
impurity -= (n_samples_in_last_class / n_samples) ** 2
return impurity
def single_feature_split_point(self):
best_split_point = {
'info_gain': 0,
'feature': None,
'threshold': None,
'targets_l': None,
'targets_r': None,
'impurity_l': None,
'impurity_r': None
}
n_samples = float(self.samples.shape[0])
for f_idx in range(self.samples.shape[1]):
uniq_feature = numpy.unique(self.samples[:, f_idx])
if uniq_feature.shape[0] == 1:
continue
split_points = (uniq_feature[:-1] + uniq_feature[1:]) / 2.0
for th_candidate in split_points:
targets_l = self.targets[self.samples[:, f_idx] <= th_candidate]
targets_r = self.targets[self.samples[:, f_idx] > th_candidate]
impurity_l = self.gini_impurity(targets_l)
impurity_r = self.gini_impurity(targets_r)
info_gain = self.impurity \
- impurity_l * targets_l.shape[0]/n_samples \
- impurity_r * targets_r.shape[0]/n_samples
if info_gain > best_split_point['info_gain']:
best_split_point = {
'info_gain': info_gain,
'feature': f_idx,
'threshold': th_candidate,
'targets_l': targets_l,
'targets_r': targets_r,
'impurity_l': impurity_l,
'impurity_r': impurity_r
}
return best_split_point
def inter_feature_comparison_split(self):
best_split_point = {
'info_gain': 0,
'feature_pair': None,
'targets_l': None,
'targets_r': None,
'impurity_l': None,
'impurity_r':None
}
n_samples = float(self.samples.shape[0])
for group in self.groups:
for feature1, feature2 in combinations(group, 2):
targets_l = self.targets[self.samples[:, feature1] <= self.samples[:, feature2]]
if targets_l.shape[0] in (self.targets.shape[0], 0):
continue
targets_r = self.targets[self.samples[:, feature1] > self.samples[:, feature2]]
impurity_l = self.gini_impurity(targets_l)
impurity_r = self.gini_impurity(targets_r)
info_gain = self.impurity \
- impurity_l * targets_l.shape[0]/n_samples \
- impurity_r * targets_r.shape[0]/n_samples
if info_gain > best_split_point['info_gain']:
best_split_point = {
'info_gain': info_gain,
'feature_pair': (feature1, feature2),
'targets_l': targets_l,
'targets_r': targets_r,
'impurity_l': impurity_l,
'impurity_r': impurity_r
}
return best_split_point
def label_by_vote(self):
largest_class = 0
largest_count = 0
for c in self.classes:
nc = self.targets[self.targets == c].shape[0]
if nc > largest_count:
largest_class = c
largest_count = nc
self.label = largest_class
def predict(self, samples):
predictions = []
for sample in samples:
predictions.append(self.predict_one(sample))
return numpy.array(predictions)
def predict_one(self, sample):
if self.label is not None:
return self.label
if self.feature is not None:
if sample[self.feature] <= self.threshold:
return self.left.predict_one(sample)
else:
return self.right.predict_one(sample)
elif self.feature_pair is not None:
if sample[self.feature_pair[0]] <= sample[self.feature_pair[1]]:
return self.left.predict_one(sample)
else:
return self.right.predict_one(sample)
else:
raise Exception('なんかバグってる')
# coding: utf-8
import math
import random
import numpy
from time import time
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from decision_tree import FeatureComparisonTree
mnist = fetch_mldata('MNIST', data_home='.')
def main(min_depth=1, max_depth=30):
X = mnist.data.toarray()
Y = mnist.target
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1)
# 中央の14x14の画素を横向きにグループ化
groups = [range(uo * 28 + 7, uo * 28 + 21) for uo in range(7, 21)]
# 中央の14x14の画素を縦向きにグループ化
groups += [[j * 28 + i for j in range(7, 21)] for i in range(7, 21)]
model = FeatureComparisonTree(x_train, y_train, max_depth=min_depth-1, groups=groups)
model.split_node()
for d in range(min_depth, max_depth + 1):
t0 = time()
model.add_depth()
t1 = time()
prediction = model.predict(x_test)
t2 = time()
accuracy = accuracy_score(y_test, prediction)
print('{}\t{}\t{}\t{}'.format(d, t1 - t0, t2 - t1, accuracy))
if __name__ == '__main__':
main()
# coding: utf-8
import math
import random
from time import time
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
mnist = fetch_mldata('MNIST', data_home='.')
def main(min_depth=1, max_depth=30):
X = mnist.data
Y = mnist.target
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1)
for d in range(min_depth, max_depth + 1):
model = DecisionTreeClassifier(max_depth=d)
t0 = time()
model.fit(x_train, y_train)
t1 = time()
prediction = model.predict(x_test)
t2 = time()
accuracy = accuracy_score(y_test, prediction)
print('{}\t{}\t{}\t{}'.format(d, t1 - t0, t2 - t1, accuracy))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment