Skip to content

Instantly share code, notes, and snippets.

@vfdev-5
Last active January 13, 2020 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vfdev-5/0278d79fb0741c95224b7b8c76a60e94 to your computer and use it in GitHub Desktop.
Save vfdev-5/0278d79fb0741c95224b7b8c76a60e94 to your computer and use it in GitHub Desktop.
Helper scripts to benchmark and check ignite's engine on MNIST, CIFAR10 tasks
#!/bin/bash
# Tests configuration:
if [ -z $version ]; then
export version="v0.2.1"
# export version="master"
# export version="engine_refactor"
echo "Setup version: $version"
fi
if [ -z $remote ]; then
export remote="pytorch"
# export remote="vfdev-5"
echo "Setup remote: $remote"
fi
if [ -z $bench_task ]; then
export bench_task="mnist"
echo "Setup benchmark task: $bench_task"
fi
if [ "$bench_task" == "mnist" ]; then
export bench_prep_pycmd="from torchvision.datasets import MNIST; MNIST(download=True, root='.')"
export bench_cmd="python mnist.py --epochs 5"
elif [ "$bench_task" == "contrib/cifar10" ]; then
export bench_prep_pycmd="from torchvision import datasets; datasets.CIFAR10(download=True, root='.')"
export bench_cmd='python main.py --params=num_epochs=5'
else
echo "Unknown benchmark task : $bench_task"
exit 1
fi
export env_name="benchmark_engine__${remote}_${version}"
echo "Check existing conda env : $env_name"
conda env list | grep $env_name
ret=$?
set -e
ignite_path="/tmp/ignite_${remote}_${version}"
if [ "$ret" == "1" ]; then
echo "Setup conda env : $env_name"
conda config --set always_yes yes --set changeps1 no
conda create -q -n $env_name pytorch torchvision python=3.7 -c pytorch
source activate $env_name
pip install -q tqdm tensorboardX
# git clone repository and install ignite
rm -rf ${ignite_path}
git clone https://github.com/${remote}/ignite.git -b ${version} ${ignite_path}
cd ${ignite_path} && pip install --upgrade -e .
else
source activate $env_name
fi
echo "Check installed ignite: "
pip list | grep ignite
cd ${ignite_path}/examples/${bench_task}
echo "Download dataset"
python -c "${bench_prep_pycmd}"
echo "Run $bench_task"
time `${bench_cmd}`
source deactivate
#!/bin/bash
set -e
echo "\n--- Check save/resume training on CIFAR10 dataset ---\n"
if [ ! -f ${exec_script} ]; then
echo "This check should run from example/contrib/cifar10 folder"
exit 1
fi
data_path="/data_deep/CLASSIC_DATASETS/cifar10/"
base_output_path="/tmp/cifar10-output"
with_debug_opts=1
if [ ${with_debug_opts} -eq 0 ]; then
exec_script="main.py"
no_augs_vals="False"
else
exec_script="main_debug.py"
no_augs_vals="False True"
base_output_path=${base_output_path}-debug
fi
for crash_iter in 510 820 1000; do
for n_proc in 1 2; do
for no_augs in ${no_augs_vals}; do
output_path="$base_output_path/$n_proc-no_augs_${no_augs}-crash_iter_${crash_iter}"
echo "\nRUN training $exec_script with crash at $crash_iter iteration, n_proc=$n_proc, no_augs=${no_augs}\n"
if [ ${with_debug_opts} -eq 0 ]; then
# no debug options
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path"
else
# debugging
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True"
# python -u ${exec_script} --params="data_path=$data_path;batch_size=512;crash_iteration=$crash_iter;output_path=$output_path;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True"
fi
chkpt=`find $output_path -name training_checkpoint_*.pth`
echo "\nRESUME training $exec_script from a checkpoint: ${chkpt}, n_proc=$n_proc, no_augs=${no_augs}\n"
if [ ${with_debug_opts} -eq 0 ]; then
# no debug options
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt"
else
# debugging
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True"
# python -u ${exec_script} --params="data_path=$data_path;batch_size=512;output_path=$output_path;resume_from=$chkpt;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True"
fi
if [ ${with_debug_opts} -eq 0 ]; then
echo "\nRUN training $exec_script without crash, n_proc=$n_proc, no_augs=${no_augs}\n"
# no debug options
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path"
else
exit 0
fi
done
done
done
# TODO:
# - Problem: Resumed run does not see the same data since checkpoint iteration
# => loss curve is discontinuous
# => train/test accuracy curves are discontinuous
# - It is due to DistributedSampler
# - If no DistributedSampler used, dataflow is the same but loss curve is still discontinuous
# <= This is due to unrestored running average
# 1) Improve engine to be able to work with DistributedSampler <=> DONE
# 2) Test with Data Augs => data indices are not the same
# 3) Test with cudnn non deterministic if there is a gap train/test accuracy curves
import argparse
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.parallel
import torch.distributed as dist
import ignite
from ignite.engine import Events, Engine, create_supervised_evaluator
from ignite.metrics import Accuracy, Loss
from ignite.handlers import Checkpoint, global_step_from_engine
from ignite.utils import convert_tensor
from ignite.contrib.engines import common
from ignite.contrib.handlers import TensorboardLogger, ProgressBar
from ignite.contrib.handlers.tensorboard_logger import OutputHandler, OptimizerParamsHandler, GradsHistHandler
from ignite.contrib.handlers import PiecewiseLinear
from utils import set_seed, get_train_test_loaders, get_model
def run(output_path, config):
device = "cuda"
local_rank = config['local_rank']
distributed = backend is not None
if distributed:
torch.cuda.set_device(local_rank)
device = "cuda"
rank = dist.get_rank() if distributed else 0
torch.manual_seed(config['seed'] + rank)
# Rescale batch_size and num_workers
ngpus_per_node = torch.cuda.device_count()
ngpus = dist.get_world_size() if distributed else 1
batch_size = config['batch_size'] // ngpus
num_workers = int((config['num_workers'] + ngpus_per_node - 1) / ngpus_per_node)
train_loader, test_loader = get_train_test_loaders(
path=config['data_path'],
batch_size=batch_size,
distributed=distributed,
num_workers=num_workers,
no_augs=config['debug__no_augs'],
)
model = get_model(config['model'])
model = model.to(device)
if distributed:
model = torch.nn.parallel.DistributedDataParallel(model,
device_ids=[local_rank, ],
output_device=local_rank)
optimizer = optim.SGD(model.parameters(), lr=config['learning_rate'],
momentum=config['momentum'],
weight_decay=config['weight_decay'],
nesterov=True)
criterion = nn.CrossEntropyLoss().to(device)
le = len(train_loader)
milestones_values = [
(0, 0.0),
(le * config['num_warmup_epochs'], config['learning_rate']),
(le * config['num_epochs'], 0.0)
]
lr_scheduler = PiecewiseLinear(optimizer, param_name="lr",
milestones_values=milestones_values)
def _prepare_batch(batch, device, non_blocking):
x, y = batch
return (convert_tensor(x, device=device, non_blocking=non_blocking),
convert_tensor(y, device=device, non_blocking=non_blocking))
def process_function(engine, batch):
x, y = _prepare_batch(batch, device=device, non_blocking=True)
model.train()
# Supervised part
y_pred = model(x)
loss = criterion(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return {
'batch loss': loss.item(),
}
trainer = Engine(process_function)
train_sampler = train_loader.sampler if distributed else None
to_save = {'trainer': trainer, 'model': model, 'optimizer': optimizer, 'lr_scheduler': lr_scheduler}
metric_names = ['batch loss', ]
common.setup_common_training_handlers(trainer, train_sampler=train_sampler,
to_save=to_save, save_every_iters=config['checkpoint_every'],
output_path=output_path, lr_scheduler=lr_scheduler,
output_names=metric_names, with_pbar_on_iters=config['display_iters'],
log_every_iters=10)
if rank == 0:
tb_logger = TensorboardLogger(log_dir=output_path)
tb_logger.attach(trainer,
log_handler=OutputHandler(tag="train",
metric_names=metric_names),
event_name=Events.ITERATION_COMPLETED)
tb_logger.attach(trainer,
log_handler=OptimizerParamsHandler(optimizer, param_name="lr"),
event_name=Events.ITERATION_STARTED)
metrics = {
"accuracy": Accuracy(device=device if distributed else None),
"loss": Loss(criterion, device=device if distributed else None)
}
evaluator = create_supervised_evaluator(model, metrics=metrics, device=device, non_blocking=True)
train_evaluator = create_supervised_evaluator(model, metrics=metrics, device=device, non_blocking=True)
def run_validation(engine):
torch.cuda.synchronize()
train_evaluator.run(train_loader)
evaluator.run(test_loader)
trainer.add_event_handler(Events.EPOCH_STARTED(every=3), run_validation)
trainer.add_event_handler(Events.COMPLETED, run_validation)
if rank == 0:
if config['display_iters']:
ProgressBar(persist=False, desc="Train evaluation").attach(train_evaluator)
ProgressBar(persist=False, desc="Test evaluation").attach(evaluator)
tb_logger.attach(train_evaluator,
log_handler=OutputHandler(tag="train",
metric_names=list(metrics.keys()),
global_step_transform=global_step_from_engine(trainer)),
event_name=Events.COMPLETED)
tb_logger.attach(evaluator,
log_handler=OutputHandler(tag="test",
metric_names=list(metrics.keys()),
global_step_transform=global_step_from_engine(trainer)),
event_name=Events.COMPLETED)
# Store the best model by validation accuracy:
common.save_best_model_by_val_score(output_path, evaluator, model=model, metric_name='accuracy', n_saved=3,
trainer=trainer, tag="test")
if config['log_model_grads_every'] is not None:
tb_logger.attach(trainer,
log_handler=GradsHistHandler(model, tag=model.__class__.__name__),
event_name=Events.ITERATION_COMPLETED(every=config['log_model_grads_every']))
if config['crash_iteration'] is not None:
@trainer.on(Events.ITERATION_STARTED(once=config['crash_iteration']))
def _(engine):
raise Exception("STOP at iteration: {}".format(engine.state.iteration))
def ef(_, event):
if event in [1, 2, 3]:
return True
elif event % config['checkpoint_every'] in [0, 1, 2]:
return True
return False
if config['debug__display_data_stats']:
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_data_stats)
if config['debug__display_grads']:
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_model_grads, model)
if config['debug__display_model_weights']:
trainer.add_event_handler(Events.STARTED, debug_print_model_weights, model)
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_model_weights, model)
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_batch_loss)
# if config['debug__crash_iteration'] is not None:
# crash_iter = config['debug__crash_iteration']
#
# def near_after_crash_iter(_, event):
# if crash_iter - 15 <= event <= crash_iter + 15:
# return True
# return False
#
# if config['debug__display_data_stats']:
# trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=near_after_crash_iter),
# debug_print_data_stats)
#
# if config['debug__display_grads']:
# trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=near_after_crash_iter),
# debug_print_model_grads, model)
resume_from = config['resume_from']
if resume_from is not None:
checkpoint_fp = Path(resume_from)
assert checkpoint_fp.exists(), "Checkpoint '{}' is not found".format(checkpoint_fp.as_posix())
print("Resume from a checkpoint: {}".format(checkpoint_fp.as_posix()))
checkpoint = torch.load(checkpoint_fp.as_posix())
Checkpoint.load_objects(to_load=to_save, checkpoint=checkpoint)
try:
trainer.run(train_loader, max_epochs=config['num_epochs'], seed=config['seed'])
except Exception as e:
import traceback
print(traceback.format_exc())
if rank == 0:
tb_logger.close()
def debug_print_model_weights(engine, model):
output = {'total': 0.0}
max_counter = 5
for name, p in model.named_parameters():
name = name.replace('.', '/')
n = torch.norm(p)
if max_counter > 0:
output[name] = n
output['total'] += n
max_counter -= 1
print("{} | {}: {}".format(
engine.state.epoch,
engine.state.iteration,
" - ".join(["{}:{:.4f}".format(m, v) for m, v in output.items()]))
)
def debug_print_model_grads(engine, model):
output = {'grads/total': 0.0}
max_counter = 5
for name, p in model.named_parameters():
if p.grad is None:
continue
name = name.replace('.', '/')
n = torch.norm(p.grad)
if max_counter > 0:
output['grads/{}'.format(name)] = n
output['grads/total'] += n
max_counter -= 1
print("{} | {}: {}".format(
engine.state.epoch,
engine.state.iteration,
" - ".join(["{}:{:.4f}".format(m, v) for m, v in output.items()]))
)
def debug_print_data_stats(engine):
x, y = engine.state.batch
output = {
'batch xmean': x.mean().item(),
'batch xstd': x.std().item(),
'batch ymedian': y.median().item(),
}
print("{} | {}: {}".format(
engine.state.epoch,
engine.state.iteration,
" - ".join(["{}:{:.7f}".format(m, v) for m, v in output.items()]))
)
def debug_print_batch_loss(engine):
output = engine.state.output
print("{} | {}: {}".format(
engine.state.epoch,
engine.state.iteration,
" - ".join(["{}:{:.5f}".format(m, v) for m, v in output.items()]))
)
if __name__ == "__main__":
parser = argparse.ArgumentParser("Training a CNN on CIFAR10 dataset")
parser.add_argument('--network', type=str, default="fastresnet", help="Network to train")
parser.add_argument('--params', type=str,
help='Override default configuration with parameters: '
'data_path=/path/to/dataset;batch_size=64;num_workers=12 ...')
parser.add_argument('--local_rank', type=int, help='Local process rank in distributed computation')
args = parser.parse_args()
network_name = args.network
assert torch.cuda.is_available()
# torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
batch_size = 512
num_epochs = 24
config = {
"seed": 12,
"data_path": "/tmp/cifar10",
"output_path": "/tmp/cifar10-output",
"model": network_name,
"momentum": 0.9,
"weight_decay": 1e-4,
"batch_size": batch_size,
"num_workers": 10,
"num_epochs": num_epochs,
"learning_rate": 0.04,
"num_warmup_epochs": 4,
# distributed settings
"dist_url": "env://",
"dist_backend": None, # if None distributed option is disabled, set to "nccl" to enable
# Logging:
"display_iters": True,
"log_model_grads_every": None,
"checkpoint_every": 200,
# Crash/Resume training:
"resume_from": None, # Path to checkpoint file .pth
"crash_iteration": None,
"debug__no_augs": False, # for debugging near and after crash_iteration
"debug__crash_iteration": None, # for debugging near and after crash_iteration
"debug__display_data_stats": False, # for debugging near and after crash_iteration
"debug__display_grads": False, # for debugging near and after crash_iteration
"debug__display_model_weights": False, # for debugging model weights
}
# Default configuration dictionary
if args.local_rank is not None:
config['local_rank'] = args.local_rank
else:
config['local_rank'] = 0
# Override config:
if args.params is not None:
for param in args.params.split(";"):
key, value = param.split("=")
if "/" not in value:
value = eval(value)
config[key] = value
backend = config['dist_backend']
distributed = backend is not None
if distributed:
dist.init_process_group(backend, init_method=config['dist_url'])
# let each node print the info
if config['local_rank'] == 0:
print("\nDistributed setting:")
print("\tbackend: {}".format(dist.get_backend()))
print("\tworld size: {}".format(dist.get_world_size()))
print("\trank: {}".format(dist.get_rank()))
print("\n")
output_path = None
# let each node print the info
if config['local_rank'] == 0:
print("Train {} on CIFAR10".format(network_name))
print("- PyTorch version: {}".format(torch.__version__))
print("- Ignite version: {}".format(ignite.__version__))
print("- CUDA version: {}".format(torch.version.cuda))
print("\n")
print("Configuration:")
for key, value in config.items():
print("\t{}: {}".format(key, value))
print("\n")
# create log directory only by 1 node
if (not distributed) or (dist.get_rank() == 0):
from datetime import datetime
now = datetime.now().strftime("%Y%m%d-%H%M%S")
gpu_conf = "-single-gpu"
if distributed:
ngpus_per_node = torch.cuda.device_count()
nnodes = dist.get_world_size() // ngpus_per_node
gpu_conf = "-distributed-{}nodes-{}gpus".format(nnodes, ngpus_per_node)
output_path = Path(config['output_path']) / "{}{}".format(now, gpu_conf)
if not output_path.exists():
output_path.mkdir(parents=True)
output_path = output_path.as_posix()
print("Output path: {}".format(output_path))
try:
run(output_path, config)
except KeyboardInterrupt:
print("Catched KeyboardInterrupt -> exit")
except Exception as e:
if distributed:
dist.destroy_process_group()
raise e
if distributed:
dist.destroy_process_group()
#!/bin/bash
set -e
echo "\n--- Check save/resume training on CIFAR10 dataset ---\n"
if [ ! -f ${exec_script} ]; then
echo "This check should run from example/contrib/cifar10 folder"
exit 1
fi
data_path="/data_deep/CLASSIC_DATASETS/cifar10/"
base_output_path="/tmp/cifar10-output-1000"
exec_script="main.py"
no_augs_vals="False"
for crash_iter in 1000; do
for n_proc in 2; do
output_path="$base_output_path/$n_proc-crash_iter_${crash_iter}"
echo "\nRUN training $exec_script with crash at $crash_iter iteration, n_proc=$n_proc\n"
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path;validate_every=1"
chkpt=`find $output_path -name training_checkpoint_*.pth`
echo "\nRESUME training $exec_script from a checkpoint: ${chkpt}, n_proc=$n_proc\n"
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt;validate_every=1"
echo "\nRUN training $exec_script without crash, n_proc=$n_proc\n"
# no debug options
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;validate_every=1"
done
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment