Skip to content

Instantly share code, notes, and snippets.

@mellorjc
Created September 1, 2017 10:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mellorjc/1fd19932e6ae92d6d0b4da3fa3ed70d6 to your computer and use it in GitHub Desktop.
Save mellorjc/1fd19932e6ae92d6d0b4da3fa3ed70d6 to your computer and use it in GitHub Desktop.
Small example for check_criterion_jacobian failure
from torch.autograd import Variable
import torch
from torch.nn import functional as F
def astype(val, typelike):
return val.type('torch.' + type(typelike.data).__name__)
class TestLoss(torch.nn.Module):
def __init__(self, rg, w):
torch.nn.Module.__init__(self)
self.rg = rg
self.width = w
def forward(self, input, target):
x = input.expand(input.size(0), self.width)
j = torch.arange(0, self.width)
j = astype(j, input)
j = j.expand_as(x)
j = Variable(j, requires_grad=self.rg)
fx = j*torch.log(torch.exp(x) + 1.)
return F.cross_entropy(fx, target)
from torch.autograd import Variable, gradcheck
from torch.autograd.gradcheck import gradgradcheck
import argparse
import unittest
import sys
import tempfile
import unittest
from copy import deepcopy
from itertools import product
import torch
import torch.cuda
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.parallel as dp
import torch.nn.init as init
import torch.nn.utils.rnn as rnn_utils
import torch.legacy.nn as legacy
import example
import contextlib
from itertools import repeat, product
from functools import wraps, reduce
from operator import mul
from torch.autograd.gradcheck import get_numerical_jacobian, iter_tensors, contiguous
import torch.backends.cudnn
TEST_CUDA = torch.cuda.is_available()
TEST_MULTIGPU = TEST_CUDA and torch.cuda.device_count() >= 2
TEST_CUDNN = TEST_CUDA and torch.backends.cudnn.is_acceptable(torch.cuda.FloatTensor(1))
TEST_CUDNN_VERSION = TEST_CUDNN and torch.backends.cudnn.version()
PRECISION = 1e-5
@contextlib.contextmanager
def use_cudnn(should_use):
orig = torch.backends.cudnn.enabled
torch.backends.cudnn.enabled = should_use
try:
yield
finally:
torch.backends.cudnn.enabled = orig
def default_tensor_type(type):
type_str = torch.typename(type)
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
old_type = torch.typename(torch.Tensor())
torch.set_default_tensor_type(type_str)
try:
return fn(*args, **kwargs)
finally:
torch.set_default_tensor_type(old_type)
return wrapper
return decorator
class TestBase(object):
def __init__(self, constructor, constructor_args=tuple(), input_size=None,
input=None, desc='', reference_fn=None, fullname=None, **kwargs):
if input_size is None and input is None:
raise RuntimeError("Specify either an input tensor, or it's size!")
self.constructor = constructor
self.constructor_args = constructor_args
self.input = input
self.input_size = input_size
self.desc = desc
self.fullname = fullname
self.reference_fn = reference_fn
def get_name(self):
if self.fullname is not None:
return 'test_' + self.fullname
test_name = 'test_' + self.constructor.__name__
if self.desc:
test_name += '_' + self.desc
return test_name
def _unpack_input(self, input):
if isinstance(input, Variable):
return input.data
elif torch.is_tensor(input):
return input
else:
return type(input)(self._unpack_input(i) for i in input)
def _get_input(self):
if self.input is not None:
return self.input
def map_input_sizes(sizes):
if isinstance(sizes, list):
return [map_input_sizes(s) for s in sizes]
elif torch.is_tensor(sizes):
return sizes.double()
else:
return torch.randn(*sizes)
assert self.input_size is not None
return map_input_sizes(self.input_size)
def __call__(self, test_case):
raise NotImplementedError
class CriterionTest(TestBase):
def __init__(self, *args, **kwargs):
super(CriterionTest, self).__init__(*args, **kwargs)
self.target = self._get_target(kwargs['target'])
self.should_test_cuda = kwargs.get('test_cuda', True)
def _get_target(self, target):
return target
def __call__(self, test_case):
module = self.constructor(*self.constructor_args)
input = self._get_input()
# Check that these methods don't raise errors
module.__repr__()
str(module)
if self.reference_fn is not None:
out = test_case._forward_criterion(module, input, self.target)
target = self.target
if isinstance(target, Variable):
target = target.data
expected_out = self.reference_fn(deepcopy(self._unpack_input(input)),
deepcopy(target), module)
test_case.assertEqual(out, expected_out)
test_case.check_criterion_jacobian(module, input, self.target)
self._do_extra_tests(test_case, module, input, self.target)
def test_cuda(self, test_case):
if not TEST_CUDA or not self.should_test_cuda:
raise unittest.SkipTest('Excluded from CUDA tests')
try:
cpu_input = self._get_input()
type_map = {
torch.DoubleTensor: torch.cuda.FloatTensor,
}
gpu_input = to_gpu(cpu_input, type_map=type_map)
cpu_target = self.target
gpu_target = to_gpu(self.target, type_map=type_map)
cpu_module = self.constructor(*self.constructor_args)
gpu_module = self.constructor(*self.constructor_args).float().cuda()
cpu_output = test_case._forward_criterion(cpu_module, cpu_input, cpu_target)
gpu_output = test_case._forward_criterion(gpu_module, gpu_input, gpu_target)
test_case.assertEqual(cpu_output, gpu_output, 4e-4)
cpu_gradInput = test_case._backward_criterion(cpu_module, cpu_input, cpu_target)
gpu_gradInput = test_case._backward_criterion(gpu_module, gpu_input, gpu_target)
test_case.assertEqual(cpu_gradInput, gpu_gradInput, 4e-4)
except NotImplementedError:
pass
def _do_extra_tests(self, test_case, module, input, target):
pass
class InputVariableMixin(object):
def _get_input(self):
input = TestBase._get_input(self)
def map_variables(i):
if isinstance(i, Variable):
return i
elif torch.is_tensor(i):
return Variable(i, requires_grad=True)
else:
return type(i)(map_variables(elem) for elem in i)
return map_variables(input)
class NewCriterionTest(InputVariableMixin, CriterionTest):
# TODO: check that criterions don't ignore grad_output
def __init__(self, *args, **kwargs):
super(NewCriterionTest, self).__init__(*args, **kwargs)
self.check_gradgrad = kwargs.get('check_gradgrad', True)
def _do_extra_tests(self, test_case, module, input, target):
if self.check_gradgrad:
params = tuple(x for x in module.parameters())
if not isinstance(input, tuple):
_assertGradAndGradgradChecks(test_case, lambda x, y, *args, **kw: module(x, y),
(input, target) + params)
else:
_assertGradAndGradgradChecks(test_case, lambda x, y, z, *args, **kw: module(x, y, z),
input + (target,) + params)
def _get_target(self, target):
return Variable(target, requires_grad=False)
class TestCase(unittest.TestCase):
precision = 1e-5
def setUp(self):
torch.manual_seed(SEED)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(SEED)
def assertTensorsSlowEqual(self, x, y, prec=None, message=''):
max_err = 0
self.assertEqual(x.size(), y.size())
for index in iter_indices(x):
max_err = max(max_err, abs(x[index] - y[index]))
self.assertLessEqual(max_err, prec, message)
def safeCoalesce(self, t):
tc = t.coalesce()
value_map = {}
for idx, val in zip(t._indices().t(), t._values()):
idx_tup = tuple(idx)
if idx_tup in value_map:
value_map[idx_tup] += val
else:
value_map[idx_tup] = val.clone() if torch.is_tensor(val) else val
new_indices = sorted(list(value_map.keys()))
new_values = [value_map[idx] for idx in new_indices]
if t._values().ndimension() < 2:
new_values = t._values().new(new_values)
else:
new_values = torch.stack(new_values)
new_indices = t._indices().new(new_indices).t()
tg = t.new(new_indices, new_values, t.size())
self.assertEqual(tc._indices(), tg._indices())
self.assertEqual(tc._values(), tg._values())
return tg
def unwrapVariables(self, x, y):
if isinstance(x, Variable) and isinstance(y, Variable):
return x.data, y.data
elif isinstance(x, Variable) or isinstance(y, Variable):
raise AssertionError("cannot compare {} and {}".format(type(x), type(y)))
return x, y
def assertEqual(self, x, y, prec=None, message=''):
if prec is None:
prec = self.precision
x, y = self.unwrapVariables(x, y)
if torch.is_tensor(x) and torch.is_tensor(y):
def assertTensorsEqual(a, b):
super(TestCase, self).assertEqual(a.size(), b.size())
if a.numel() > 0:
b = b.type_as(a)
b = b.cuda(device=a.get_device()) if a.is_cuda else b.cpu()
# check that NaNs are in the same locations
nan_mask = a != a
self.assertTrue(torch.equal(nan_mask, b != b))
diff = a - b
diff[nan_mask] = 0
if diff.is_signed():
diff = diff.abs()
max_err = diff.max()
self.assertLessEqual(max_err, prec, message)
self.assertEqual(x.is_sparse, y.is_sparse, message)
if x.is_sparse:
x = self.safeCoalesce(x)
y = self.safeCoalesce(y)
assertTensorsEqual(x._indices(), y._indices())
assertTensorsEqual(x._values(), y._values())
else:
assertTensorsEqual(x, y)
elif type(x) == str and type(y) == str:
super(TestCase, self).assertEqual(x, y)
elif type(x) == set and type(y) == set:
super(TestCase, self).assertEqual(x, y)
elif is_iterable(x) and is_iterable(y):
super(TestCase, self).assertEqual(len(x), len(y))
for x_, y_ in zip(x, y):
self.assertEqual(x_, y_, prec, message)
else:
try:
self.assertLessEqual(abs(x - y), prec, message)
return
except:
pass
super(TestCase, self).assertEqual(x, y, message)
def assertNotEqual(self, x, y, prec=None, message=''):
if prec is None:
prec = self.precision
x, y = self.unwrapVariables(x, y)
if torch.is_tensor(x) and torch.is_tensor(y):
if x.size() != y.size():
super(TestCase, self).assertNotEqual(x.size(), y.size())
self.assertGreater(x.numel(), 0)
y = y.type_as(x)
y = y.cuda(device=x.get_device()) if x.is_cuda else y.cpu()
nan_mask = x != x
if torch.equal(nan_mask, y != y):
diff = x - y
if diff.is_signed():
diff = diff.abs()
diff[nan_mask] = 0
max_err = diff.max()
self.assertGreaterEqual(max_err, prec, message)
elif type(x) == str and type(y) == str:
super(TestCase, self).assertNotEqual(x, y)
elif is_iterable(x) and is_iterable(y):
super(TestCase, self).assertNotEqual(x, y)
else:
try:
self.assertGreaterEqual(abs(x - y), prec, message)
return
except:
pass
super(TestCase, self).assertNotEqual(x, y, message)
def assertObjectIn(self, obj, iterable):
for elem in iterable:
if id(obj) == id(elem):
return
raise AssertionError("object not found in iterable")
if sys.version_info < (3, 2):
# assertRaisesRegexp renamed assertRaisesRegex in 3.2
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
class NNTestCase(TestCase):
def _jacobian(self, input, num_out):
if isinstance(input, tuple):
return tuple(self._jacobian(elem, num_out) for elem in input)
elif isinstance(input, list):
return [self._jacobian(elem, num_out) for elem in input]
else:
return torch.zeros(input.nelement(), num_out)
def _flatten_tensors(self, x):
if torch.is_tensor(x):
if x.is_sparse:
return x.to_dense().view(-1)
else:
return x.view(-1)
elif isinstance(x, Variable):
return self._flatten_tensors(x.data)
else:
return tuple(self._flatten_tensors(a) for a in x)
def _zero_grad_input(self, input):
if isinstance(input, Variable):
if input.requires_grad and input.grad is not None:
input.grad.data.zero_()
input.grad.detach_()
elif torch.is_tensor(input):
return
else:
for i in input:
self._zero_grad_input(i)
def _analytical_jacobian(self, module, input, jacobian_input=True, jacobian_parameters=True):
output = self._forward(module, input)
output_t = output.data if isinstance(output, Variable) else output
d_out = output_t.new().resize_(output_t.size())
flat_d_out = d_out.view(-1)
if jacobian_input:
jacobian_inp = self._jacobian(input, d_out.nelement())
flat_jacobian_input = list(iter_tensors(jacobian_inp))
if jacobian_parameters:
param, d_param = self._get_parameters(module)
num_param = sum(p.numel() for p in param)
jacobian_param = torch.zeros(num_param, d_out.nelement())
for i in range(flat_d_out.nelement()):
d_out.zero_()
flat_d_out[i] = 1
if jacobian_parameters:
self._zero_grad_parameters(module)
# Variables will accumulate gradient from multiple steps
if jacobian_input:
self._zero_grad_input(input)
d_input = self._backward(module, input, output, d_out)
if jacobian_input:
for jacobian_x, d_x in zip(flat_jacobian_input, iter_tensors(d_input)):
jacobian_x[:, i] = d_x
if jacobian_parameters:
jacobian_param[:, i] = torch.cat(self._flatten_tensors(d_param), 0)
res = tuple()
if jacobian_input:
res += jacobian_inp,
if jacobian_parameters:
res += jacobian_param,
return res
def _numerical_jacobian(self, module, input, jacobian_input=True, jacobian_parameters=True):
output = self._forward(module, input)
output_size = output.nelement()
if jacobian_parameters:
param, d_param = self._get_parameters(module)
def fw(input):
out = self._forward(module, input)
if isinstance(out, Variable):
return out.data
return out
res = tuple()
input = contiguous(input)
if jacobian_input:
res += get_numerical_jacobian(fw, input, input, eps=1e-6),
if jacobian_parameters:
res += torch.cat(list(get_numerical_jacobian(fw, input, p, eps=1e-6) for p in param), 0),
return res
def check_jacobian(self, module, input, jacobian_input=True):
jacobian_parameters = bool(self._get_parameters(module)[0])
analytical = self._analytical_jacobian(module, input, jacobian_input, jacobian_parameters)
numerical = self._numerical_jacobian(module, input, jacobian_input, jacobian_parameters)
analytical_t = iter_tensors(analytical)
numerical_t = iter_tensors(numerical)
# TODO: compare structure
self.assertLessEqual(
max(a.add(-1, n).abs().max() for a, n in zip(analytical_t, numerical_t)),
PRECISION
)
def check_criterion_jacobian(self, criterion, input, target):
eps = 1e-6
self._forward_criterion(criterion, input, target)
analytical_d_x = self._backward_criterion(criterion, input, target)
numerical_d_x = deepcopy(analytical_d_x)
input_t = iter_tensors(input)
numerical_t = iter_tensors(numerical_d_x)
for x, d_x in zip(input_t, numerical_t):
x = x.view(-1)
d_x = d_x.view(-1)
for i in range(x.nelement()):
original = x[i]
x[i] = original + eps
fx1 = self._forward_criterion(criterion, input, target)
x[i] = original - eps
fx2 = self._forward_criterion(criterion, input, target)
deriv = (fx1 - fx2) / (2. * eps)
d_x[i] = deriv
x[i] = original
# TODO: check structure
analytical_t = iter_tensors(analytical_d_x)
numerical_t = iter_tensors(numerical_d_x)
self.assertLessEqual(
max(a.add(-1, n).abs().max() for a, n in zip(analytical_t, numerical_t)),
PRECISION
)
class TestNN(NNTestCase):
def _forward(self, module, input):
with freeze_rng_state():
return module(input)
def _backward(self, module, input, output, grad_output):
output.backward(grad_output, retain_graph=True)
if input.grad is None:
return None
return input.grad.data
def _forward_criterion(self, criterion, input, target):
if isinstance(input, tuple):
args = input + (target,)
output = criterion(*args)
else:
output = criterion(input, target)
return output.data[0]
def _backward_criterion(self, criterion, input, target):
input_tuple = input if isinstance(input, tuple) else (input,)
for i in input_tuple:
if i.grad is not None:
i.grad.data.zero_()
args = input_tuple + (target,)
criterion(*args).backward()
if isinstance(input, tuple):
return tuple(map(lambda i: i.grad.data, input))
else:
return input.grad.data
def _zero_grad_parameters(self, module):
if hasattr(module, 'weight') and module.weight is not None:
if module.weight.grad is not None:
module.weight.grad.data.zero_()
module.weight.grad.detach_()
if hasattr(module, 'bias') and module.bias is not None:
if module.bias.grad is not None:
module.bias.grad.data.zero_()
module.bias.grad.detach_()
def _get_parameters(self, module):
params = []
d_params = []
for p in module.parameters():
if p.grad is None:
p._grad = Variable(p.data.clone().zero_(), volatile=True)
params.append(p.data)
d_params.append(p.grad.data)
return params, d_params
def add_test(test):
test_name = test.get_name()
cuda_test_name = test_name + '_cuda'
if hasattr(TestNN, test_name):
raise RuntimeError('Found two tests with the same name: ' + test_name)
if hasattr(TestNN, cuda_test_name):
raise RuntimeError('Found two tests with the same name: ' + cuda_test_name)
setattr(TestNN, test_name, lambda self, test=test: test(self))
setattr(TestNN, cuda_test_name, lambda self, test=test: test.test_cuda(self))
new_criterion_tests = [
dict(
module_name='TestLoss',
constructor_args=(True, 5,),
input=torch.randn(15, 1),
target=torch.Tensor(15).uniform_().mul(5).floor().long(),
),
dict(
module_name='TestLoss',
constructor_args=(False, 5,),
input=torch.randn(15, 1),
target=torch.Tensor(15).uniform_().mul(5).floor().long(),
desc = 'false_requires_grad'
),
]
for test_params in new_criterion_tests:
name = test_params.pop('module_name')
test_params['constructor'] = getattr(example, name)
test = NewCriterionTest(**test_params)
add_test(test)
if 'check_no_size_average' in test_params:
desc = test_params.get('desc', None)
test_params['desc'] = 'no_size_average' if desc is None else desc + '_no_size_average'
def gen_no_size_average_constructor(constructor):
def no_size_average_constructor(*args, **kwargs):
cons = constructor(*args, size_average=False, **kwargs)
return cons
no_size_average_constructor.__name__ = constructor.__name__
return no_size_average_constructor
test_params['constructor'] = gen_no_size_average_constructor(test_params['constructor'])
test = NewCriterionTest(**test_params)
add_test(test)
def run_tests():
remaining = parse_set_seed_once()
unittest.main(argv=remaining)
torch.set_default_tensor_type('torch.DoubleTensor')
SEED = 0
SEED_SET = 0
def _assertGradAndGradgradChecks(test_case, apply_fn, inputs):
# call assert function rather than returning a bool since it's nicer
# if we get whether this failed on the gradcheck or the gradgradcheck.
test_case.assertTrue(gradcheck(apply_fn, inputs))
dummy_out = apply_fn(*inputs)
def randn_match_cpu_gpu(x):
a = torch.randn(x.size())
if x.is_cuda:
a = a.cuda(x.get_device())
return a
if isinstance(dummy_out, tuple):
grad_y = tuple(Variable(randn_match_cpu_gpu(x), requires_grad=x.requires_grad)
for x in dummy_out if isinstance(x, Variable))
else:
grad_y = (Variable(randn_match_cpu_gpu(dummy_out), requires_grad=dummy_out.requires_grad),)
inputs = tuple([inputs[0].double(), inputs[1]])
test_case.assertTrue(gradgradcheck(apply_fn, inputs, grad_y,))
def parse_set_seed_once():
global SEED
global SEED_SET
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--seed', type=int, default=123)
args, remaining = parser.parse_known_args()
if SEED_SET == 0:
torch.manual_seed(args.seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(args.seed)
SEED = args.seed
SEED_SET = 1
remaining = [sys.argv[0]] + remaining
return remaining
run_tests()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment