Skip to content

Instantly share code, notes, and snippets.

@Erotemic
Last active August 2, 2016 15:49
Show Gist options
  • Save Erotemic/b694158a7637de42208d5b86852b4f9e to your computer and use it in GitHub Desktop.
Save Erotemic/b694158a7637de42208d5b86852b4f9e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import utool as ut
import numpy as np
import sklearn
import sklearn.datasets
import sklearn.svm
import sklearn.metrics
import sklearn.model_selection
from sklearn import preprocessing
(print, rrr, profile) = ut.inject2(__name__, '[classify_shark]')
def get_sharks_dataset(target_type=None):
"""
>>> from ibeis.scripts.classify_shark import * # NOQA
"""
import ibeis
ibs = ibeis.opendb('WS_ALL')
config = {
'dim_size': (256, 256),
'resize_dim': 'wh'
}
all_annots = ibs.annots(config=config)
TARGET_TYPE = 'binary'
#TARGET_TYPE = 'multiclass1'
if target_type is None:
target_type = TARGET_TYPE
orig_case_tags = all_annots.case_tags
tag_vocab = ut.flat_unique(*orig_case_tags)
print('Original tags')
print(ut.repr3(ut.dict_hist(ut.flatten(orig_case_tags))))
def cleanup_tags(orig_case_tags, tag_vocab):
regex_map = [
('injur-trunc', 'injur-trunc'),
('trunc', 'injur-trunc'),
('healthy', 'healthy'),
(['injur-unknown', 'other_injury'], 'injur-other'),
('nicks', 'injur-nicks'),
('scar', 'injur-scar'),
('bite', 'injur-bite'),
('pose:novel', None),
]
alias_map = ut.build_alias_map(regex_map, tag_vocab)
unmapped = list(set(tag_vocab) - set(alias_map.keys()))
case_tags = ut.alias_tags(orig_case_tags, alias_map)
print('unmapped = %r' % (unmapped,))
return case_tags
case_tags = cleanup_tags(orig_case_tags, tag_vocab)
print('Cleaned tags')
print(ut.repr3(ut.dict_hist(ut.flatten(case_tags))))
if target_type == 'binary':
regex_map = [
('injur-.*', 'injured'),
('healthy', 'healthy'),
]
tag_vocab = ut.flat_unique(*case_tags)
alias_map = ut.build_alias_map(regex_map, tag_vocab)
case_tags2 = ut.alias_tags(case_tags, alias_map)
elif target_type == 'multiclass1':
regex_map = [
('injur-trunc', 'injur-trunc'),
('healthy', 'healthy'),
('injur-.*', 'injur-other'),
]
tag_vocab = ut.flat_unique(*case_tags)
alias_map = ut.build_alias_map(regex_map, tag_vocab)
unmapped = list(set(tag_vocab) - set(alias_map.keys()))
print('unmapped = %r' % (unmapped,))
case_tags2 = ut.alias_tags(case_tags, alias_map)
elif target_type == 'multiclass2':
regex_map = [
('injur-trunc', 'injur-trunc'),
('healthy', 'healthy'),
('injur-.*', None),
]
tag_vocab = ut.flat_unique(*case_tags)
alias_map = ut.build_alias_map(regex_map, tag_vocab)
unmapped = list(set(tag_vocab) - set(alias_map.keys()))
print('unmapped = %r' % (unmapped,))
case_tags2 = ut.alias_tags(case_tags, alias_map)
elif target_type == '_experimental_multilabel':
pass
# Binarize into multi-class labels
# http://stackoverflow.com/questions/10526579/use-scikit-learn-to-classify-into-multiple-categories
#menc = preprocessing.MultiLabelBinarizer()
#menc.fit(annot_tags)
#target = menc.transform(annot_tags)
#enc = menc
else:
raise ValueError('Unknown target_type=%r' % (target_type,))
# henc = preprocessing.OneHotEncoder()
# henc.fit(menc.transform(annot_tags))
# target = henc.transform(menc.transform(annot_tags))
# target = np.array([int('healthy' not in tags) for tags in annots.case_tags])
ntags_list = np.array(ut.lmap(len, case_tags2))
is_no_tag = ntags_list == 0
is_single_tag = ntags_list == 1
is_multi_tag = ntags_list > 1
print('Multi Tags: %s' % (ut.repr2(ut.compress(case_tags2, is_multi_tag), nl=1),))
multi_annots = all_annots.compress(is_multi_tag) # NOQA
#ibs.set_image_imagesettext(multi_annots.gids, ['MultiTaged'] * is_multi_tag.sum())
print('can\'t use %r annots due to no labels' % (is_no_tag.sum(),))
print('can\'t use %r annots due to inconsistent labels' % (is_multi_tag.sum(),))
print('will use %r annots with consistent labels' % (is_single_tag.sum(),))
annot_tags = ut.compress(case_tags2, is_single_tag)
annots = all_annots.compress(is_single_tag)
annot_tag_hist = ut.dict_hist(ut.flatten(annot_tags))
print('Final Annot Tags')
print(ut.repr3(annot_tag_hist))
# target_names = ['healthy', 'injured']
enc = preprocessing.LabelEncoder()
enc.fit(ut.unique(ut.flatten(annot_tags)))
target = enc.transform(ut.flatten(annot_tags))
target_names = enc.classes_
data = np.array([h.ravel() for h in annots.hog_hog])
# Build scipy / scikit data standards
ds = sklearn.datasets.base.Bunch(
ibs=ibs,
aids=annots.aids,
name='sharks',
DESCR='injured-vs-healthy whale sharks',
data=data,
target=target,
target_names=target_names,
target_labels=enc.transform(target_names),
enc=enc,
config=config
)
return ds
#@ut.reloadable_class
#class ClfMultiResult(object):
# def __init__(multi_result, result_list):
# multi_result.result_list = result_list
# def compile_results(multi_result):
# import pandas as pd
# result_list = multi_result.result_list
# multi_result.df = reduce(ut.partial(pd.DataFrame.add, fill_value=0), [result.df for result in result_list])
# #hardness = 1 / multi_result.df['decision'].abs()
# def get_hardest_fail_idxs(multi_result):
# df = multi_result.df
# sortx = multi_result.hardness.argsort()[::-1]
# # Order by hardness
# df = multi_result.df.take(sortx)
# failed = multi_result.df['is_fp'] + multi_result.df['is_fn']
# # Grab only failures
# hard_fail_idxs = failed[failed > 0].index.values
# return hard_fail_idxs
@ut.reloadable_class
class ClfProblem(object):
"""
Harness for researching a classification problem
"""
def __init__(problem, ds):
problem.ds = ds
def print_support_info(problem):
enc = problem.ds.enc
target_labels = enc.inverse_transform(problem.ds.target)
label_hist = ut.dict_hist(target_labels)
print('support hist' + ut.repr3(label_hist))
def fit_new_classifier(problem, train_idx):
"""
x_train2 = np.random.rand(100, 2)
y_train2 = np.random.randint(0, 2, size=100)
x_train3 = np.random.rand(100, 2)
y_train3 = np.random.randint(0, 3, size=100)
x_test = np.random.rand(10, 2)
X = clf._validate_for_predict(x_test)
X = clf._compute_kernel(X)
clf3 = sklearn.svm.SVC(kernel='linear', C=1, class_weight='balanced',
decision_function_shape='ovr')
clf3.fit(x_train3, y_train3)
clf2 = sklearn.svm.SVC(kernel='linear', C=1, class_weight='balanced',
decision_function_shape='ovr')
clf2.fit(x_train2, y_train2)
y_pred2 = clf2.predict(x_test)
y_pred3 = clf3.predict(x_test)
clf2.decision_function(x_test)
clf3.decision_function(x_test)
dec2 = clf2._dense_decision_function(X)
dec3 = clf3._dense_decision_function(X)
if True:
return final
else:
_ovr_decision_function(predictions, confidences, n_classes)
y_pred2
predictions = dec3 < 0
confidences = dec3
n_classes = len(clf3.classes_)
_ovr_decision_function(predictions, confidences, n_classes)
y_pred3
"""
print('[problem] train classifier on %d data points' % (len(train_idx)))
data = problem.ds.data
target = problem.ds.target
x_train = data.take(train_idx, axis=0)
y_train = target.take(train_idx, axis=0)
clf = sklearn.svm.SVC(kernel='linear', C=1, class_weight='balanced',
decision_function_shape='ovr')
clf.fit(x_train, y_train)
return clf
def test_classifier(problem, clf, test_idx):
print('[problem] test classifier on %d data points' % (len(test_idx),))
data = problem.ds.data
target = problem.ds.target
x_test = data.take(test_idx, axis=0)
y_true = target.take(test_idx, axis=0)
if len(clf.classes_) == 2:
# Adapt _ovr_decision_function for 2-class case
# This is simply a linear scaling into a probability based on the
# other members of this query.
X = clf._validate_for_predict(x_test)
X = clf._compute_kernel(X)
_dec2 = clf._dense_decision_function(X)
dec2 = -_dec2
n_samples = dec2.shape[0]
n_classes = len(clf.classes_)
final = np.zeros((n_samples, n_classes))
confidence_max = max(np.abs(dec2.max()), np.abs(dec2.min()))
norm_conf = ((dec2.T[0] / confidence_max) + 1) / 2
final.T[0] = 1 - norm_conf
final.T[1] = norm_conf
# output comparable to multiclass version
y_conf = final
else:
# Get notion of confidence / probability of decision
y_conf = clf.decision_function(x_test)
y_pred = y_conf.argmax(axis=1)
#if False:
# real_pred = clf.predict(x_test)
# real_conf = clf.decision_function(x_test)
# np.all(y_pred == real_pred)
# np.all((real_conf > 0) == real_pred)
# np.all((norm_conf > 0) == real_pred)
# assert np.all(dec2.ravel() == real_conf)
result = ClfSingleResult(problem.ds, test_idx, y_true, y_pred, y_conf)
return result
def stratified_2sample_idxs(problem, frac=.2, split_frac=.75):
target = problem.ds.target
target_labels = problem.ds.target_labels
rng = np.random.RandomState(043)
train_sample = []
test_sample = []
for label in target_labels:
target_idxs = np.where(target == label)[0]
subset_size = int(len(target_idxs) * frac)
rand_idx = ut.random_indexes(len(target_idxs), subset_size, rng=rng)
sample_idx = ut.take(target_idxs, rand_idx)
split = int(len(sample_idx) * split_frac)
train_sample.append(sample_idx[split:])
test_sample.append(sample_idx[:split])
train_idx = np.array(sorted(ut.flatten(train_sample)))
test_idx = np.array(sorted(ut.flatten(test_sample)))
return train_idx, test_idx
def gen_crossval_idxs(problem, n_folds=2):
xvalkw = dict(n_folds=n_folds, shuffle=True, random_state=43432)
target = problem.ds.target
#skf = sklearn.model_selection.StratifiedKFold(**xvalkw)
import sklearn.cross_validation
skf = sklearn.cross_validation.StratifiedKFold(target, **xvalkw)
_iter = skf
msg = 'cross-val test on %s' % (problem.ds.name)
for count, (train_idx, test_idx) in enumerate(ut.ProgIter(_iter, lbl=msg)):
yield train_idx, test_idx
@ut.reloadable_class
class ClfSingleResult(object):
r"""
Reports the results of a classification problem
Example:
>>> result = ClfSingleResult()
"""
def __init__(result, ds=None, test_idx=None, y_true=None, y_pred=None, y_conf=None):
result.ds = ds
result.test_idx = test_idx
result.y_true = y_true
result.y_pred = y_pred
result.y_conf = y_conf
def compile_results(result):
import pandas as pd
y_true = result.y_true
y_pred = result.y_pred
y_conf = result.y_conf
test_idx = result.test_idx
# passed = y_pred == y_true
# failed = y_pred != y_true
#confusion = sklearn.metrics.confusion_matrix(y_true, y_pred)
# is_tn = np.logical_and(passed, y_true == 0)
# is_fp = np.logical_and(failed, y_true == 0)
# is_fn = np.logical_and(failed, y_true == 1)
# is_tp = np.logical_and(passed, y_true == 1)
# columns = ['tn', 'fp', 'fn', 'tp', 'decision', 'pred']
# column_data = [is_tn, is_fp, is_fn, is_tp, y_conf, y_pred]
index = pd.Series(test_idx, name='test_idx')
if len(result.ds.target_names) == 1:
y_conf
decision = pd.DataFrame(y_conf, index=index, columns=result.ds.target_names)
result.decision = decision / 3
easiness = np.array(ut.ziptake(result.decision.values, y_true))
columns = ['pred', 'easiness']
column_data = [y_pred, easiness]
data = dict(zip(columns, column_data))
result.df = pd.DataFrame(data, index, columns)
y_true
#result.decision = pd.Series(y_conf, index, name='decision', dtype=np.float)
#result._compiled['confusion'] = confusion
#score = (1 - (sum(passed) / len(passed)))
#result._compiled['score'] = score
def print_report(result):
report = sklearn.metrics.classification_report(
result.y_true, result.y_pred,
target_names=result.ds.target_names)
print(report)
def stratified_sample_idxs_balanced(target, frac=.2, balanced=True):
rng = np.random.RandomState(43)
sample = []
for label in np.unique(target):
target_idxs = np.where(target == label)[0]
subset_size = int(len(target_idxs) * frac)
rand_idx = ut.random_indexes(len(target_idxs), subset_size, rng=rng)
sample_idx = ut.take(target_idxs, rand_idx)
sample.append(sample_idx)
sample_idx = np.array(sorted(ut.flatten(sample)))
return sample_idx
def stratified_sample_idxs_unbalanced(target, size=1000):
rng = np.random.RandomState(43)
sample = []
for label in np.unique(target):
target_idxs = np.where(target == label)[0]
subset_size = size
rand_idx = ut.random_indexes(len(target_idxs), subset_size, rng=rng)
sample_idx = ut.take(target_idxs, rand_idx)
sample.append(sample_idx)
sample_idx = np.array(sorted(ut.flatten(sample)))
return sample_idx
def learn_injured_sharks():
r"""
References:
http://scikit-learn.org/stable/model_selection.html
TODO:
* Change unreviewed healthy tags to healthy-likely
Example:
>>> from ibeis.scripts.classify_shark import * # NOQA
"""
from ibeis.scripts import classify_shark
import plottool as pt
import pandas as pd
pt.qt4ensure()
target_type = 'binary'
target_type = 'multiclass1'
target_type = 'multiclass2'
ds = classify_shark.get_sharks_dataset(target_type)
# Sample the dataset
#idxs = stratified_sample_idxs_balanced(ds.target, .5)
idxs = stratified_sample_idxs_unbalanced(ds.target, 1000)
ds.target = ds.target.take(idxs, axis=0)
ds.data = ds.data.take(idxs, axis=0)
ds.aids = ut.take(ds.aids, idxs)
problem = classify_shark.ClfProblem(ds)
problem.print_support_info()
result_list = []
#train_idx, test_idx = problem.stratified_2sample_idxs()
n_folds = 2
for train_idx, test_idx in problem.gen_crossval_idxs(n_folds):
clf = problem.fit_new_classifier(train_idx)
result = problem.test_classifier(clf, test_idx)
result_list.append(result)
for result in result_list:
result.compile_results()
for result in result_list:
result.print_report()
isect_sets = [set(s1).intersection(set(s2)) for s1, s2 in ut.combinations([result.df.index for result in result_list], 2)]
assert all([len(s) == 0 for s in isect_sets]), ('cv sets should not intersect')
pd.set_option("display.max_rows", 20)
# Combine information from results
df = pd.concat([result.df for result in result_list])
df['hardness'] = 1 / df['easiness']
df['aid'] = ut.take(ds.aids, df.index)
df['target'] = ut.take(ds.target, df.index)
df['failed'] = df['pred'] != df['target']
report = sklearn.metrics.classification_report(
y_true=df['target'], y_pred=df['pred'],
target_names=result.ds.target_names)
print(report)
confusion = sklearn.metrics.confusion_matrix(df['target'], df['pred'])
print('Confusion Matrix:')
print(pd.DataFrame(confusion, columns=result.ds.target_names, index=result.ds.target_names))
#def confusion_by_label():
# Print Confusion by label
#for target in [0, 1]:
# df_target = df[df['target'] == target]
# df_err = df_target[['tp', 'fp', 'fn', 'tn']]
# print('target = %r' % (ds.target_names[target]))
# print('df_err.sum() =%s' % (ut.repr3(df_err.sum().astype(np.int32).to_dict()),))
#for true_target in [0, 1]:
# for pred_target in [0, 1]:
# df_pred_target = df[df['target'] == pred_target]
# df_err = df_target[['tp', 'fp', 'fn', 'tn']]
# print('target = %r' % (ds.target_names[target]))
# print('df_err.sum() =%s' % (ut.repr3(df_err.sum().astype(np.int32).to_dict()),))
def snapped_slice(size, frac, n):
start = int(size * frac - np.ceil(n / 2))
stop = int(size * frac + np.floor(n / 2))
if stop >= size:
buf = (stop - size + 1)
start -= buf
stop -= buf
if start < 0:
buf = 0 - start
stop += buf
start += buf
assert stop < size, 'out of bounds'
sl = slice(start, stop)
return sl
def grab_subchunk(place, n, target):
df_chunk = df.take(df['hardness'].argsort())
if target is not None:
df_chunk = df_chunk[df_chunk['target'] == target]
#df_chunk = df_chunk[df_chunk[err] > 0]
frac = {'start': 0.0, 'middle': 0.5, 'end': 1.0}[place]
sl = snapped_slice(len(df_chunk), frac, n)
idx = df_chunk.index[sl]
df_chunk = df_chunk.loc[idx]
place_name = 'hardness=%.2f' % (frac,)
if target is not None:
df_chunk.nice = place_name + ' ' + ds.target_names[target]
else:
df_chunk.nice = place_name
return df_chunk
n = 4
places = ['start', 'middle', 'end']
df_list = [grab_subchunk(place, n, target) for place in places for target in ds.target_labels]
from ibeis_cnn import draw_results
ibs = ds.ibs
config = ds.config
fnum = 1
pnum_ = pt.make_pnum_nextgen(nRows=len(places), nSubplots=len(df_list))
for df_chunk in df_list:
if len(df_chunk) == 0:
import vtool as vt
img = vt.get_no_symbol(size=(n * 100, 200))
#size=(200, 100))
#img = np.zeros((10, 10), dtype=np.uint8)
else:
annots_chunk = ibs.annots(df_chunk['aid'].values, config=config)
data_lists = [(np.array(annots_chunk.hog_img) * 255).astype(np.uint8), annots_chunk.chips]
label_list = (1 - df_chunk['failed']).values
flat_metadata = df_chunk.to_dict(orient='list')
flat_metadata['tags'] = annots_chunk.case_tags
tup = draw_results.get_patch_chunk(data_lists, label_list, flat_metadata, draw_meta=['decision', 'tags'], vert=False, fontScale=4.0)
img, offset_list, sf_list, stacked_orig_sizes = tup
fig, ax = pt.imshow(img, fnum=fnum, pnum=pnum_())
ax.set_title(df_chunk.nice)
pt.adjust_subplots2(top=.95, left=0, right=1, bottom=.00, hspace=.1, wspace=0)
if False:
pt.qt4ensure()
subset_df = df_chunk
for idx in ut.InteractiveIter(subset_df.index.values):
dfrow = subset_df.loc[idx]
assert dfrow['aid'] == ds.aids[idx]
annot = ibs.annots([dfrow['aid']], config=config)
hogimg = annot.hog_img[0]
chip = annot.chips[0]
pt.clf()
pt.imshow(hogimg, pnum=(1, 2, 1))
pt.imshow(chip, pnum=(1, 2, 2))
pt.set_xlabel(str(annot.case_tags[0]))
fig = pt.gcf()
print(dfrow)
fig.show()
fig.canvas.draw()
if __name__ == '__main__':
r"""
CommandLine:
python -m ibeis.scripts.classify_shark
python -m ibeis.scripts.classify_shark --allexamples
"""
import multiprocessing
multiprocessing.freeze_support() # for win32
import utool as ut # NOQA
ut.doctest_funcs()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment