Skip to content

Instantly share code, notes, and snippets.

@se4u
Last active September 17, 2015 07:21
Show Gist options
  • Save se4u/43663b75fb197301e66e to your computer and use it in GitHub Desktop.
Save se4u/43663b75fb197301e66e to your computer and use it in GitHub Desktop.
A skeletal implementation of Adaptive value of information which attempts to intelligently use features during inference.
# -*- coding: utf-8 -*-
'''
| Filename : adaptive_inference.py
| Description : Implementation of core concepts from "Learning Adaptive Value of Information for Structured Prediction"
| Author : Pushpendre Rastogi
| Created : Wed Sep 16 12:29:09 2015 (-0400)
| Last-Updated: Thu Sep 17 03:13:58 2015 (-0400)
| By: Pushpendre Rastogi
| Update #: 73
'''
import numpy as np
import random
import functools
import sklearn
from contextlib import contextmanager
import time
DTYPE = 'float32'
X_LEN = 13
Y_SIZE = 3
Z_SIZE = 13
HEADER = ['Alcohol', 'Malic-acid', 'Ash', 'Alcalinity-of-ash', 'Magnesium',
'Total-phenols', 'Flavanoids', 'Nonflavanoid-phenols',
'Proanthocyanins', 'Color-intensity', 'Hue',
'OD280/OD315-of-diluted-wines', 'Proline-Amino-Acid']
COST_MAP = {
'Alcohol': 0,
'Malic-acid': 1,
'Ash': 2,
'Alcalinity-of-ash': 3,
'Magnesium': 4,
'Total-phenols': 5,
'Flavanoids': 6,
'Nonflavanoid-phenols': 7,
'Proanthocyanins': 8,
'Color-intensity': 9,
'Hue': 10,
'OD280/OD315-of-diluted-wines': 11,
'Proline-Amino-Acid': 12}
COST = np.array([COST_MAP[e] for e in HEADER], dtype=DTYPE)
@contextmanager
def tictoc(msg):
t = time.time()
print "Started", msg
yield
print "Completed", msg, "in %ds" % (time.time() - t)
def empty_z_constructor():
return np.array([True] + [False] * (Z_SIZE - 1))
def data_fnc(shuffle=True):
data_file = open("wine.data", "rb")
header = data_file.next().strip().split()[1:]
data = [[float(_) for _ in e.strip().split()]
for e in data_file]
if shuffle:
random.shuffle(data)
data = np.array(data, dtype=DTYPE)
X = data[:, 1:]
Y = data[:, 0] - 1
return X, Y, header
def time_taken_to_compute(z):
assert z.dtype == bool, \
"z should be a binary vector indicating which features have been extracted."
assert z.ndim == 1
return np.dot(COST, z)
def feature_dim(x_len=X_LEN, y_size=Y_SIZE, z_size=Z_SIZE):
return z_size * y_size * x_len
def meta_feature_dim():
return Y_SIZE * Z_SIZE
def feature_function(x, y, z, x_len=X_LEN, y_size=Y_SIZE, z_size=Z_SIZE):
'''
Params
------
z : The bit mask tells us which feature templates were computed.
We might look at z globally or we might look at it locally. The feature function
might be computed as a decision tree mased on looking at the values of z.
For example we might decide to create a feature function that looks at how
many features were computed and then assign that its own corner in the space.
In this feature function I would use a run length based encoding.
Look at the highest feature that has been extracted and learn a weight vector
for that situation.
Note that since we just have a one variable factor graph, there is no
difference between the "example-level" feature extraction and the
"feature-level" feature extraction.
Also when they say
" CREATE A FEATURE VECTOR AS THE COMPOSITE OF DIFFERENT EXTRACTED "
they mean
" CREATE A FEATURE VECTOR AS THE CONCATENATION OF DIFFERENT EXTRACTED "
Your feature vector is super sparse and super explody.
'''
f_total_size = feature_dim(x_len=x_len, y_size=y_size, z_size=z_size)
f = np.zeros((f_total_size,), dtype=DTYPE)
# The index of the highest feature that has been extracted.
z_max = max([i for i, e in enumerate(list(z)) if e])
offset = z_size * y_size * z_max + z_size * y
f[offset:offset + x_len] = x
return f
def batch_learning(X, Y,
warm_start=None, T=1000, learning_rate=1e-2,
verbose=False):
'''
The goal is to minimize the objective:
w* = argmin_w λ ||w||² + 1/n ∑_{j=1}ⁿ E_z [l(yʲ, h(xʲ, z))]
This is a regularized loss that can be easily minimized using SGD.
See PEGASOS and specifically see pages 157, 161, 163, 177, 198 in
Understanding Machine Learning by Shalev-Shwartz
Params
------
warm_start : (default None)
'''
w = (np.zeros((feature_dim(),))
if warm_start is None
else warm_start)
w_sum = np.array(w)
batch_size = X.shape[0]
def sample():
sample_idx = np.random.randint(batch_size)
x_sample = X[sample_idx]
y_sample = Y[sample_idx]
z_sample = np.random.randint(2, size=Z_SIZE)
return x_sample, y_sample, z_sample
def zero_one_loss(y_hat, y):
return (1.0 if y_hat != y else 0.0)
t_idx = 0
for t_idx in xrange(T):
x_sample, y_sample, z_sample = sample()
# y_hat = argmax_{y' in Y} (Δ(y', y) + ⟨ w⁽ᵗ⁾, f(x, y') - f(x, y) ⟩)
# This is where you would usually do complicated dynamic programming
# Of course in our simple case with just 3 classes, we can enumerate
# all the classes and pick the best.
scores = []
feature_diffs = []
for y_hat in range(Y_SIZE):
feature_diff = (feature_function(x_sample, y_hat, z_sample)
- feature_function(x_sample, y_sample, z_sample))
score = zero_one_loss(y_hat, y_sample) + np.dot(w, feature_diff)
feature_diffs.append(feature_diff)
scores.append(score)
max_violating_y_hat_feature_diff = feature_diffs[np.argmax(scores)]
# / (t_idx + 1)
update = (- learning_rate * max_violating_y_hat_feature_diff)
w_norm_before = np.linalg.norm(w)
w += update
w_norm_after = np.linalg.norm(w)
if verbose:
print w_norm_before, w_norm_after
print "Update_norm", np.linalg.norm(update), "W_norm", np.linalg.norm(w)
w_sum += w
return w_sum / (t_idx + 1)
def h_x_z(w, x, z):
scores = [np.dot(w, feature_function(x, y_hat, z))
for y_hat in range(Y_SIZE)]
return np.argmax(scores)
def convert_to_one_hot(e, L):
a = np.zeros((L,))
a[e] = 1
return a
def meta_feature_func(x, z, action, h_x_z_learnt):
y_hat = h_x_z_learnt(x, z)
feat = np.zeros((meta_feature_dim(),))
feat[action * Y_SIZE + y_hat] = 1
return feat
def use_policy_to_predict_action(policy_parameters, x, z, allowable_actions, h_x_z_learnt):
action_scores = [np.dot(policy_parameters,
meta_feature_func(x, z, action, h_x_z_learnt))
for action in allowable_actions]
return allowable_actions[np.argmax(action_scores)]
def inference_under_budget(x, h_x_z_learnt, policy_parameters, budget=10):
z = empty_z_constructor()
h_z = functools.partial(h_x_z_learnt, x)
# The estimate of y_hat based on DIRT-CHEAP-FEATURES!!
y_hat = h_z(z)
budget_spent = 0
# Space of allowable actions.
A = dict(zip(range(Z_SIZE), COST))
while budget_spent < budget and len(A) > 0:
a_hat = use_policy_to_predict_action(
policy_parameters, x * z, z, A, h_x_z_learnt)
a_hat_cost = A[a_hat]
del A[a_hat]
if a_hat_cost < (budget - budget_spent):
z[a_hat] = True
# NOTE: The inference cost should also be low for good performance
# with this strategy.
y_hat = h_z(z)
budget_spent += a_hat_cost
return y_hat
def batch_one_step_off_policy_Q_learning_with_least_squares_of_policy_parameters(
h_x_z_learnt,
X,
Y,
warm_start=None,
gamma=1.0,
budget=100,
lambda_ridge=1,
fit_intercept_ridge=False):
assert not fit_intercept_ridge
# This initialization should be done smartly by estimating the expected
# reward function.
policy_parameter_beta = (np.zeros((meta_feature_dim(),))
if warm_start is None
else warm_start)
def eta(y, x, z):
return (1.0 if y != h_x_z_learnt(x, z) else 0.0)
def R(y, x, z, z_next):
return eta(y, x, z) - eta(y, x, z_next)
def Q_pi_star(x, y, gamma,
Q_collector,
feature_collector,
policy_parameter_beta,
budget,
z=None,
A=None):
z = (empty_z_constructor()
if z is None
else z)
A = (dict(zip(range(Z_SIZE), COST))
if A is None
else A)
# Figure out the action that the policy would take.
a_hat = use_policy_to_predict_action(
policy_parameter_beta,
x * z,
z,
A,
h_x_z_learnt)
# Find the meta feature of the action that the policy would take.
# φ(s, π(s))
meta_feature = meta_feature_func(x, z, a_hat, h_x_z_learnt)
# Do some book keeping.
# 1. update the space of actions that the policy can take.
del A[a_hat]
# 2. Create a z_next where we would land up after taking this action.
z_next = np.array(z)
z_next[a_hat] = True
# Test whether taking this action would exceed the budget?
if np.dot(COST, z_next) <= budget:
# If we remain within budget. Then calculate the reward for this
# step.
r = R(y, x, z, z_next)
Q = r + gamma * max(
0,
Q_pi_star(x, y, gamma, Q_collector, feature_collector,
policy_parameter_beta, budget, z=z_next, A=A))
else:
r = 0.0
Q = r
# Collect the features and Q values.
feature_collector.append(meta_feature)
Q_collector.append(Q)
return Q
Q_collector = []
feature_collector = []
for x, y in zip(X, Y):
Q_pi_star(x, y, gamma,
Q_collector,
feature_collector,
policy_parameter_beta,
budget)
ridge_model = sklearn.linear_model.Ridge(
alpha=lambda_ridge,
fit_intercept=fit_intercept_ridge,
normalize=False).fit(
feature_collector,
Q_collector)
return ridge_model.coef_
def main():
X, Y, header = data_fnc(shuffle=False)
F = X.shape[1]
#-----------------#
# Example feature #
#-----------------#
ex_f = feature_function(
x=np.array([1.0] + [0.0] * 12),
y=0,
z=np.array([True] + [False] * 12))
# array([ 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
#----------------------------#
# Learn parameters using SGD #
#----------------------------#
w_learnt = batch_learning(X, Y)
h_x_z_learnt = functools.partial(h_x_z, w_learnt)
#------------------------------------------------------------#
# After learning the parameters of a 'z' sensitive predictor #
# How can we minimize the cost of prediction at test time? #
# Learn a policy that dictates the features to be extracted. #
#------------------------------------------------------------#
# Equation 2: Suppose we have learnt w_learnt.
# Now find that binary vector `z`, that minimizes the expected loss.
# such that the sum of the costs of the binary vectors selected remains
# less than B.
# At its core, it is yet another structured prediction problem.
# and policy learning is learning the heuristic in local search.
# The basic idea is that a policy is a function that can look at
# 1. The currently extraced features, z
# 2. The prediction that h_x_z would make. Our current estimate of y
# NOTE: While learning this policy we would also have access to the
# true value of y. These values we can use to set the parameters of the
# policy.
policy_parameters = batch_one_step_off_policy_Q_learning_with_least_squares_of_policy_parameters(
h_x_z_learnt,
X,
Y)
# This is how we would finally use the learnt policy to perform inference.
with tictoc("Adaptive Inference"):
for x, y in zip(X, Y):
y_hat = inference_under_budget(
x, h_x_z_learnt, policy_parameters, budget=10)
print y, y_hat
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment