-
-
Save mellorjc/1fd19932e6ae92d6d0b4da3fa3ed70d6 to your computer and use it in GitHub Desktop.
Small example for check_criterion_jacobian failure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from torch.autograd import Variable | |
import torch | |
from torch.nn import functional as F | |
def astype(val, typelike): | |
return val.type('torch.' + type(typelike.data).__name__) | |
class TestLoss(torch.nn.Module): | |
def __init__(self, rg, w): | |
torch.nn.Module.__init__(self) | |
self.rg = rg | |
self.width = w | |
def forward(self, input, target): | |
x = input.expand(input.size(0), self.width) | |
j = torch.arange(0, self.width) | |
j = astype(j, input) | |
j = j.expand_as(x) | |
j = Variable(j, requires_grad=self.rg) | |
fx = j*torch.log(torch.exp(x) + 1.) | |
return F.cross_entropy(fx, target) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from torch.autograd import Variable, gradcheck | |
from torch.autograd.gradcheck import gradgradcheck | |
import argparse | |
import unittest | |
import sys | |
import tempfile | |
import unittest | |
from copy import deepcopy | |
from itertools import product | |
import torch | |
import torch.cuda | |
from torch.autograd import Variable | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torch.nn.parallel as dp | |
import torch.nn.init as init | |
import torch.nn.utils.rnn as rnn_utils | |
import torch.legacy.nn as legacy | |
import example | |
import contextlib | |
from itertools import repeat, product | |
from functools import wraps, reduce | |
from operator import mul | |
from torch.autograd.gradcheck import get_numerical_jacobian, iter_tensors, contiguous | |
import torch.backends.cudnn | |
TEST_CUDA = torch.cuda.is_available() | |
TEST_MULTIGPU = TEST_CUDA and torch.cuda.device_count() >= 2 | |
TEST_CUDNN = TEST_CUDA and torch.backends.cudnn.is_acceptable(torch.cuda.FloatTensor(1)) | |
TEST_CUDNN_VERSION = TEST_CUDNN and torch.backends.cudnn.version() | |
PRECISION = 1e-5 | |
@contextlib.contextmanager | |
def use_cudnn(should_use): | |
orig = torch.backends.cudnn.enabled | |
torch.backends.cudnn.enabled = should_use | |
try: | |
yield | |
finally: | |
torch.backends.cudnn.enabled = orig | |
def default_tensor_type(type): | |
type_str = torch.typename(type) | |
def decorator(fn): | |
@wraps(fn) | |
def wrapper(*args, **kwargs): | |
old_type = torch.typename(torch.Tensor()) | |
torch.set_default_tensor_type(type_str) | |
try: | |
return fn(*args, **kwargs) | |
finally: | |
torch.set_default_tensor_type(old_type) | |
return wrapper | |
return decorator | |
class TestBase(object): | |
def __init__(self, constructor, constructor_args=tuple(), input_size=None, | |
input=None, desc='', reference_fn=None, fullname=None, **kwargs): | |
if input_size is None and input is None: | |
raise RuntimeError("Specify either an input tensor, or it's size!") | |
self.constructor = constructor | |
self.constructor_args = constructor_args | |
self.input = input | |
self.input_size = input_size | |
self.desc = desc | |
self.fullname = fullname | |
self.reference_fn = reference_fn | |
def get_name(self): | |
if self.fullname is not None: | |
return 'test_' + self.fullname | |
test_name = 'test_' + self.constructor.__name__ | |
if self.desc: | |
test_name += '_' + self.desc | |
return test_name | |
def _unpack_input(self, input): | |
if isinstance(input, Variable): | |
return input.data | |
elif torch.is_tensor(input): | |
return input | |
else: | |
return type(input)(self._unpack_input(i) for i in input) | |
def _get_input(self): | |
if self.input is not None: | |
return self.input | |
def map_input_sizes(sizes): | |
if isinstance(sizes, list): | |
return [map_input_sizes(s) for s in sizes] | |
elif torch.is_tensor(sizes): | |
return sizes.double() | |
else: | |
return torch.randn(*sizes) | |
assert self.input_size is not None | |
return map_input_sizes(self.input_size) | |
def __call__(self, test_case): | |
raise NotImplementedError | |
class CriterionTest(TestBase): | |
def __init__(self, *args, **kwargs): | |
super(CriterionTest, self).__init__(*args, **kwargs) | |
self.target = self._get_target(kwargs['target']) | |
self.should_test_cuda = kwargs.get('test_cuda', True) | |
def _get_target(self, target): | |
return target | |
def __call__(self, test_case): | |
module = self.constructor(*self.constructor_args) | |
input = self._get_input() | |
# Check that these methods don't raise errors | |
module.__repr__() | |
str(module) | |
if self.reference_fn is not None: | |
out = test_case._forward_criterion(module, input, self.target) | |
target = self.target | |
if isinstance(target, Variable): | |
target = target.data | |
expected_out = self.reference_fn(deepcopy(self._unpack_input(input)), | |
deepcopy(target), module) | |
test_case.assertEqual(out, expected_out) | |
test_case.check_criterion_jacobian(module, input, self.target) | |
self._do_extra_tests(test_case, module, input, self.target) | |
def test_cuda(self, test_case): | |
if not TEST_CUDA or not self.should_test_cuda: | |
raise unittest.SkipTest('Excluded from CUDA tests') | |
try: | |
cpu_input = self._get_input() | |
type_map = { | |
torch.DoubleTensor: torch.cuda.FloatTensor, | |
} | |
gpu_input = to_gpu(cpu_input, type_map=type_map) | |
cpu_target = self.target | |
gpu_target = to_gpu(self.target, type_map=type_map) | |
cpu_module = self.constructor(*self.constructor_args) | |
gpu_module = self.constructor(*self.constructor_args).float().cuda() | |
cpu_output = test_case._forward_criterion(cpu_module, cpu_input, cpu_target) | |
gpu_output = test_case._forward_criterion(gpu_module, gpu_input, gpu_target) | |
test_case.assertEqual(cpu_output, gpu_output, 4e-4) | |
cpu_gradInput = test_case._backward_criterion(cpu_module, cpu_input, cpu_target) | |
gpu_gradInput = test_case._backward_criterion(gpu_module, gpu_input, gpu_target) | |
test_case.assertEqual(cpu_gradInput, gpu_gradInput, 4e-4) | |
except NotImplementedError: | |
pass | |
def _do_extra_tests(self, test_case, module, input, target): | |
pass | |
class InputVariableMixin(object): | |
def _get_input(self): | |
input = TestBase._get_input(self) | |
def map_variables(i): | |
if isinstance(i, Variable): | |
return i | |
elif torch.is_tensor(i): | |
return Variable(i, requires_grad=True) | |
else: | |
return type(i)(map_variables(elem) for elem in i) | |
return map_variables(input) | |
class NewCriterionTest(InputVariableMixin, CriterionTest): | |
# TODO: check that criterions don't ignore grad_output | |
def __init__(self, *args, **kwargs): | |
super(NewCriterionTest, self).__init__(*args, **kwargs) | |
self.check_gradgrad = kwargs.get('check_gradgrad', True) | |
def _do_extra_tests(self, test_case, module, input, target): | |
if self.check_gradgrad: | |
params = tuple(x for x in module.parameters()) | |
if not isinstance(input, tuple): | |
_assertGradAndGradgradChecks(test_case, lambda x, y, *args, **kw: module(x, y), | |
(input, target) + params) | |
else: | |
_assertGradAndGradgradChecks(test_case, lambda x, y, z, *args, **kw: module(x, y, z), | |
input + (target,) + params) | |
def _get_target(self, target): | |
return Variable(target, requires_grad=False) | |
class TestCase(unittest.TestCase): | |
precision = 1e-5 | |
def setUp(self): | |
torch.manual_seed(SEED) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(SEED) | |
def assertTensorsSlowEqual(self, x, y, prec=None, message=''): | |
max_err = 0 | |
self.assertEqual(x.size(), y.size()) | |
for index in iter_indices(x): | |
max_err = max(max_err, abs(x[index] - y[index])) | |
self.assertLessEqual(max_err, prec, message) | |
def safeCoalesce(self, t): | |
tc = t.coalesce() | |
value_map = {} | |
for idx, val in zip(t._indices().t(), t._values()): | |
idx_tup = tuple(idx) | |
if idx_tup in value_map: | |
value_map[idx_tup] += val | |
else: | |
value_map[idx_tup] = val.clone() if torch.is_tensor(val) else val | |
new_indices = sorted(list(value_map.keys())) | |
new_values = [value_map[idx] for idx in new_indices] | |
if t._values().ndimension() < 2: | |
new_values = t._values().new(new_values) | |
else: | |
new_values = torch.stack(new_values) | |
new_indices = t._indices().new(new_indices).t() | |
tg = t.new(new_indices, new_values, t.size()) | |
self.assertEqual(tc._indices(), tg._indices()) | |
self.assertEqual(tc._values(), tg._values()) | |
return tg | |
def unwrapVariables(self, x, y): | |
if isinstance(x, Variable) and isinstance(y, Variable): | |
return x.data, y.data | |
elif isinstance(x, Variable) or isinstance(y, Variable): | |
raise AssertionError("cannot compare {} and {}".format(type(x), type(y))) | |
return x, y | |
def assertEqual(self, x, y, prec=None, message=''): | |
if prec is None: | |
prec = self.precision | |
x, y = self.unwrapVariables(x, y) | |
if torch.is_tensor(x) and torch.is_tensor(y): | |
def assertTensorsEqual(a, b): | |
super(TestCase, self).assertEqual(a.size(), b.size()) | |
if a.numel() > 0: | |
b = b.type_as(a) | |
b = b.cuda(device=a.get_device()) if a.is_cuda else b.cpu() | |
# check that NaNs are in the same locations | |
nan_mask = a != a | |
self.assertTrue(torch.equal(nan_mask, b != b)) | |
diff = a - b | |
diff[nan_mask] = 0 | |
if diff.is_signed(): | |
diff = diff.abs() | |
max_err = diff.max() | |
self.assertLessEqual(max_err, prec, message) | |
self.assertEqual(x.is_sparse, y.is_sparse, message) | |
if x.is_sparse: | |
x = self.safeCoalesce(x) | |
y = self.safeCoalesce(y) | |
assertTensorsEqual(x._indices(), y._indices()) | |
assertTensorsEqual(x._values(), y._values()) | |
else: | |
assertTensorsEqual(x, y) | |
elif type(x) == str and type(y) == str: | |
super(TestCase, self).assertEqual(x, y) | |
elif type(x) == set and type(y) == set: | |
super(TestCase, self).assertEqual(x, y) | |
elif is_iterable(x) and is_iterable(y): | |
super(TestCase, self).assertEqual(len(x), len(y)) | |
for x_, y_ in zip(x, y): | |
self.assertEqual(x_, y_, prec, message) | |
else: | |
try: | |
self.assertLessEqual(abs(x - y), prec, message) | |
return | |
except: | |
pass | |
super(TestCase, self).assertEqual(x, y, message) | |
def assertNotEqual(self, x, y, prec=None, message=''): | |
if prec is None: | |
prec = self.precision | |
x, y = self.unwrapVariables(x, y) | |
if torch.is_tensor(x) and torch.is_tensor(y): | |
if x.size() != y.size(): | |
super(TestCase, self).assertNotEqual(x.size(), y.size()) | |
self.assertGreater(x.numel(), 0) | |
y = y.type_as(x) | |
y = y.cuda(device=x.get_device()) if x.is_cuda else y.cpu() | |
nan_mask = x != x | |
if torch.equal(nan_mask, y != y): | |
diff = x - y | |
if diff.is_signed(): | |
diff = diff.abs() | |
diff[nan_mask] = 0 | |
max_err = diff.max() | |
self.assertGreaterEqual(max_err, prec, message) | |
elif type(x) == str and type(y) == str: | |
super(TestCase, self).assertNotEqual(x, y) | |
elif is_iterable(x) and is_iterable(y): | |
super(TestCase, self).assertNotEqual(x, y) | |
else: | |
try: | |
self.assertGreaterEqual(abs(x - y), prec, message) | |
return | |
except: | |
pass | |
super(TestCase, self).assertNotEqual(x, y, message) | |
def assertObjectIn(self, obj, iterable): | |
for elem in iterable: | |
if id(obj) == id(elem): | |
return | |
raise AssertionError("object not found in iterable") | |
if sys.version_info < (3, 2): | |
# assertRaisesRegexp renamed assertRaisesRegex in 3.2 | |
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp | |
class NNTestCase(TestCase): | |
def _jacobian(self, input, num_out): | |
if isinstance(input, tuple): | |
return tuple(self._jacobian(elem, num_out) for elem in input) | |
elif isinstance(input, list): | |
return [self._jacobian(elem, num_out) for elem in input] | |
else: | |
return torch.zeros(input.nelement(), num_out) | |
def _flatten_tensors(self, x): | |
if torch.is_tensor(x): | |
if x.is_sparse: | |
return x.to_dense().view(-1) | |
else: | |
return x.view(-1) | |
elif isinstance(x, Variable): | |
return self._flatten_tensors(x.data) | |
else: | |
return tuple(self._flatten_tensors(a) for a in x) | |
def _zero_grad_input(self, input): | |
if isinstance(input, Variable): | |
if input.requires_grad and input.grad is not None: | |
input.grad.data.zero_() | |
input.grad.detach_() | |
elif torch.is_tensor(input): | |
return | |
else: | |
for i in input: | |
self._zero_grad_input(i) | |
def _analytical_jacobian(self, module, input, jacobian_input=True, jacobian_parameters=True): | |
output = self._forward(module, input) | |
output_t = output.data if isinstance(output, Variable) else output | |
d_out = output_t.new().resize_(output_t.size()) | |
flat_d_out = d_out.view(-1) | |
if jacobian_input: | |
jacobian_inp = self._jacobian(input, d_out.nelement()) | |
flat_jacobian_input = list(iter_tensors(jacobian_inp)) | |
if jacobian_parameters: | |
param, d_param = self._get_parameters(module) | |
num_param = sum(p.numel() for p in param) | |
jacobian_param = torch.zeros(num_param, d_out.nelement()) | |
for i in range(flat_d_out.nelement()): | |
d_out.zero_() | |
flat_d_out[i] = 1 | |
if jacobian_parameters: | |
self._zero_grad_parameters(module) | |
# Variables will accumulate gradient from multiple steps | |
if jacobian_input: | |
self._zero_grad_input(input) | |
d_input = self._backward(module, input, output, d_out) | |
if jacobian_input: | |
for jacobian_x, d_x in zip(flat_jacobian_input, iter_tensors(d_input)): | |
jacobian_x[:, i] = d_x | |
if jacobian_parameters: | |
jacobian_param[:, i] = torch.cat(self._flatten_tensors(d_param), 0) | |
res = tuple() | |
if jacobian_input: | |
res += jacobian_inp, | |
if jacobian_parameters: | |
res += jacobian_param, | |
return res | |
def _numerical_jacobian(self, module, input, jacobian_input=True, jacobian_parameters=True): | |
output = self._forward(module, input) | |
output_size = output.nelement() | |
if jacobian_parameters: | |
param, d_param = self._get_parameters(module) | |
def fw(input): | |
out = self._forward(module, input) | |
if isinstance(out, Variable): | |
return out.data | |
return out | |
res = tuple() | |
input = contiguous(input) | |
if jacobian_input: | |
res += get_numerical_jacobian(fw, input, input, eps=1e-6), | |
if jacobian_parameters: | |
res += torch.cat(list(get_numerical_jacobian(fw, input, p, eps=1e-6) for p in param), 0), | |
return res | |
def check_jacobian(self, module, input, jacobian_input=True): | |
jacobian_parameters = bool(self._get_parameters(module)[0]) | |
analytical = self._analytical_jacobian(module, input, jacobian_input, jacobian_parameters) | |
numerical = self._numerical_jacobian(module, input, jacobian_input, jacobian_parameters) | |
analytical_t = iter_tensors(analytical) | |
numerical_t = iter_tensors(numerical) | |
# TODO: compare structure | |
self.assertLessEqual( | |
max(a.add(-1, n).abs().max() for a, n in zip(analytical_t, numerical_t)), | |
PRECISION | |
) | |
def check_criterion_jacobian(self, criterion, input, target): | |
eps = 1e-6 | |
self._forward_criterion(criterion, input, target) | |
analytical_d_x = self._backward_criterion(criterion, input, target) | |
numerical_d_x = deepcopy(analytical_d_x) | |
input_t = iter_tensors(input) | |
numerical_t = iter_tensors(numerical_d_x) | |
for x, d_x in zip(input_t, numerical_t): | |
x = x.view(-1) | |
d_x = d_x.view(-1) | |
for i in range(x.nelement()): | |
original = x[i] | |
x[i] = original + eps | |
fx1 = self._forward_criterion(criterion, input, target) | |
x[i] = original - eps | |
fx2 = self._forward_criterion(criterion, input, target) | |
deriv = (fx1 - fx2) / (2. * eps) | |
d_x[i] = deriv | |
x[i] = original | |
# TODO: check structure | |
analytical_t = iter_tensors(analytical_d_x) | |
numerical_t = iter_tensors(numerical_d_x) | |
self.assertLessEqual( | |
max(a.add(-1, n).abs().max() for a, n in zip(analytical_t, numerical_t)), | |
PRECISION | |
) | |
class TestNN(NNTestCase): | |
def _forward(self, module, input): | |
with freeze_rng_state(): | |
return module(input) | |
def _backward(self, module, input, output, grad_output): | |
output.backward(grad_output, retain_graph=True) | |
if input.grad is None: | |
return None | |
return input.grad.data | |
def _forward_criterion(self, criterion, input, target): | |
if isinstance(input, tuple): | |
args = input + (target,) | |
output = criterion(*args) | |
else: | |
output = criterion(input, target) | |
return output.data[0] | |
def _backward_criterion(self, criterion, input, target): | |
input_tuple = input if isinstance(input, tuple) else (input,) | |
for i in input_tuple: | |
if i.grad is not None: | |
i.grad.data.zero_() | |
args = input_tuple + (target,) | |
criterion(*args).backward() | |
if isinstance(input, tuple): | |
return tuple(map(lambda i: i.grad.data, input)) | |
else: | |
return input.grad.data | |
def _zero_grad_parameters(self, module): | |
if hasattr(module, 'weight') and module.weight is not None: | |
if module.weight.grad is not None: | |
module.weight.grad.data.zero_() | |
module.weight.grad.detach_() | |
if hasattr(module, 'bias') and module.bias is not None: | |
if module.bias.grad is not None: | |
module.bias.grad.data.zero_() | |
module.bias.grad.detach_() | |
def _get_parameters(self, module): | |
params = [] | |
d_params = [] | |
for p in module.parameters(): | |
if p.grad is None: | |
p._grad = Variable(p.data.clone().zero_(), volatile=True) | |
params.append(p.data) | |
d_params.append(p.grad.data) | |
return params, d_params | |
def add_test(test): | |
test_name = test.get_name() | |
cuda_test_name = test_name + '_cuda' | |
if hasattr(TestNN, test_name): | |
raise RuntimeError('Found two tests with the same name: ' + test_name) | |
if hasattr(TestNN, cuda_test_name): | |
raise RuntimeError('Found two tests with the same name: ' + cuda_test_name) | |
setattr(TestNN, test_name, lambda self, test=test: test(self)) | |
setattr(TestNN, cuda_test_name, lambda self, test=test: test.test_cuda(self)) | |
new_criterion_tests = [ | |
dict( | |
module_name='TestLoss', | |
constructor_args=(True, 5,), | |
input=torch.randn(15, 1), | |
target=torch.Tensor(15).uniform_().mul(5).floor().long(), | |
), | |
dict( | |
module_name='TestLoss', | |
constructor_args=(False, 5,), | |
input=torch.randn(15, 1), | |
target=torch.Tensor(15).uniform_().mul(5).floor().long(), | |
desc = 'false_requires_grad' | |
), | |
] | |
for test_params in new_criterion_tests: | |
name = test_params.pop('module_name') | |
test_params['constructor'] = getattr(example, name) | |
test = NewCriterionTest(**test_params) | |
add_test(test) | |
if 'check_no_size_average' in test_params: | |
desc = test_params.get('desc', None) | |
test_params['desc'] = 'no_size_average' if desc is None else desc + '_no_size_average' | |
def gen_no_size_average_constructor(constructor): | |
def no_size_average_constructor(*args, **kwargs): | |
cons = constructor(*args, size_average=False, **kwargs) | |
return cons | |
no_size_average_constructor.__name__ = constructor.__name__ | |
return no_size_average_constructor | |
test_params['constructor'] = gen_no_size_average_constructor(test_params['constructor']) | |
test = NewCriterionTest(**test_params) | |
add_test(test) | |
def run_tests(): | |
remaining = parse_set_seed_once() | |
unittest.main(argv=remaining) | |
torch.set_default_tensor_type('torch.DoubleTensor') | |
SEED = 0 | |
SEED_SET = 0 | |
def _assertGradAndGradgradChecks(test_case, apply_fn, inputs): | |
# call assert function rather than returning a bool since it's nicer | |
# if we get whether this failed on the gradcheck or the gradgradcheck. | |
test_case.assertTrue(gradcheck(apply_fn, inputs)) | |
dummy_out = apply_fn(*inputs) | |
def randn_match_cpu_gpu(x): | |
a = torch.randn(x.size()) | |
if x.is_cuda: | |
a = a.cuda(x.get_device()) | |
return a | |
if isinstance(dummy_out, tuple): | |
grad_y = tuple(Variable(randn_match_cpu_gpu(x), requires_grad=x.requires_grad) | |
for x in dummy_out if isinstance(x, Variable)) | |
else: | |
grad_y = (Variable(randn_match_cpu_gpu(dummy_out), requires_grad=dummy_out.requires_grad),) | |
inputs = tuple([inputs[0].double(), inputs[1]]) | |
test_case.assertTrue(gradgradcheck(apply_fn, inputs, grad_y,)) | |
def parse_set_seed_once(): | |
global SEED | |
global SEED_SET | |
parser = argparse.ArgumentParser(add_help=False) | |
parser.add_argument('--seed', type=int, default=123) | |
args, remaining = parser.parse_known_args() | |
if SEED_SET == 0: | |
torch.manual_seed(args.seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(args.seed) | |
SEED = args.seed | |
SEED_SET = 1 | |
remaining = [sys.argv[0]] + remaining | |
return remaining | |
run_tests() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment