Skip to content

Instantly share code, notes, and snippets.

@AngelBerihuete
Created June 15, 2020 07:51
Show Gist options
  • Save AngelBerihuete/772cb5dfe02e27ff2c5c3b48651c0846 to your computer and use it in GitHub Desktop.
Save AngelBerihuete/772cb5dfe02e27ff2c5c3b48651c0846 to your computer and use it in GitHub Desktop.
test_cross_gpu
#!/bin/bash
#------- Descripción del trabajo -------
#SBATCH --job-name='test_cross_gpu'
#SBATCH --qos=debug
#------- Avisos -------
#SBATCH --mail-user=angel.berihuete@uca.es
#SBATCH --mail-type=END,FAIL,TIME_LIMIT_80
#------- Parametrización -------
#
# In order to use GPUs devices on BSC it
# is mandatory to allocate 40 CPUs per GPU
# requested.
# You specify the gres configuration PER-NODE
# for a job with the --gres flag and a number
# of GPUs. For instance, --gres=gpu:4, we will
# request four GPUs per node (maximun GPUs per
# node in this machine (CTE-POWER at BSC).
# Then, if also you set --node=2, you'll request
# 2*4 GPUs, and --ntasks=2*4*40=320
#
# Remember the term "task" in this context can
# be thought of as a "process". In Slurm, tasks
# are requested with the --ntasks flag. CPUs,
# for the multithreaded programs, are requested
# with the --cpus-per-task flag.
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=160
#SBATCH --gres=gpu:4
#SBATCH --time=00:15:00
#------- Entrada/Salida -------
#SBATCH -D .
#SBATCH --output=test_cross_gpu_%j.out
#SBATCH --error=test_cross_gpu_%j.err
#------- Carga de módulos -------
module purge
module load gcc/8.3.0 cuda/10.1 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
echo "== Starting run at $(date)"
echo "== Job ID: ${SLURM_JOBID}"
echo "== Job NPROCS: ${SLURM_NPROCS}"
echo "== Job NNODES: ${SLURM_NNODES}"
echo "== Node list: ${SLURM_NODELIST}"
echo "== Submit dir. : ${SLURM_SUBMIT_DIR}"
#------- Comando ------
srun test_cross_gpu_logprob.py
#!/apps/PYTHON/3.7.4_ML/bin/python
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import os
# os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
# os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3"
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfb, tfd = tfp.bijectors, tfp.distributions
print("TF version: ", tf.__version__)
print("TFP: ", tfp.__version__)
resolver = tf.distribute.cluster_resolver.SlurmClusterResolver()
st = tf.distribute.experimental.MultiWorkerMirroredStrategy(cluster_resolver=resolver)
# Sanity check to observe which device has calculations
# tf.debugging.set_log_device_placement(True)
# ------------------------
# Setting up physical GPUs
# ------------------------
#
# Nothing to do with previos os.eviron['CUDA...] the system recognize them.
# physical_gpus = tf.config.list_physical_devices('GPU')
# print(len(physical_gpus), "Physical GPUs")
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
# # Restrict TensorFlow to only use the first GPU
# try:
# # Currently, memory growth needs to be the same across GPUs
# for gpu in gpus:
# tf.config.experimental.set_memory_growth(gpu, True)
# logical_gpus = tf.config.experimental.list_logical_devices('GPU')
# print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
# except RuntimeError as e:
# # Visible devices must be set before GPUs have been
# # initialized
# print(e)
# st = tf.distribute.MirroredStrategy()
# strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
# -----------------------
# Setting up logical GPUs
# -----------------------
#
# GPU_SIZE = 6 # (G) MemorySIZE per GPU
# for gpu in physical_gpus:
# tf.config.experimental.set_virtual_device_configuration(
# gpu,
# [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=GPU_SIZE*1024)]*2)
# logical_gpus = tf.config.list_logical_devices('GPU')
# print(len(logical_gpus), "Logical GPUs")
# st = tf.distribute.MirroredStrategy(
# devices=tf.config.list_logical_devices('GPU'))
# st = tf.distribute.MirroredStrategy()
# Sanity check
print ('Number of devices to do the strategy: {}'.format(st.num_replicas_in_sync))
print('Woker devices: {}', st.extended.worker_devices)
# sys.exit()
# -----------------
# Model and dataset
# -----------------
#
# Draw samples from an MVN, then sort them. This way we can easily visually
# verify the correct partition ends up on the correct GPUs.
ndim = 3
def model():
Root = tfd.JointDistributionCoroutine.Root
loc = yield Root(tfb.Shift(.5)(tfd.MultivariateNormalDiag(loc=tf.zeros([ndim]))))
scale_tril = yield Root(tfb.FillScaleTriL()(tfd.MultivariateNormalDiag(loc=tf.zeros([ndim * (ndim + 1) // 2]))))
yield tfd.MultivariateNormalTriL(loc=loc, scale_tril=scale_tril)
dist = tfd.JointDistributionCoroutine(model)
tf.random.set_seed(1)
loc, scale_tril, _ = dist.sample(seed=2)
samples = dist.sample(value=([loc] * 1024, scale_tril, None), seed=3)[2]
samples = tf.round(samples * 1000) / 1000
for dim in reversed(range(ndim)):
samples = tf.gather(samples, tf.argsort(samples[:,dim]))
# print(len(samples))
# print(loc)
# print(scale_tril)
# print(tf.reduce_mean(samples, 0))
# sys.exit()
def dataset_fn(ctx):
batch_size = ctx.get_per_replica_batch_size(len(samples))
d = tf.data.Dataset.from_tensor_slices(samples).batch(batch_size)
return d.shard(ctx.num_input_pipelines, ctx.input_pipeline_id)
ds = st.experimental_distribute_datasets_from_function(dataset_fn)
# Sanity check playing with dataset from function
# Toy 1
def replica_fn(arg):
return tf.reduce_mean(arg, 0)
for batch in ds:
replica1 = st.run(replica_fn, args=(batch,))
print(replica1)
# Toy 2
# iterator = iter(ds)
# @tf.function(input_signature=[iterator.element_spec])
# def replica_fn2(arg):
# return tf.reduce_mean(arg, 0)
# replica2 = st.run(replica_fn2, args=(next(iterator),))
# print(replica2)
# replicas_sum = st.reduce(tf.distribute.ReduceOp.SUM, replica1)
# print(replicas_sum)
# sys.exit()
observations = next(iter(ds))
# print(observations)
# @tf.function(autograph=False)
def log_prob_and_grad(loc, scale_tril, observations):
ctx = tf.distribute.get_replica_context()
with tf.GradientTape() as tape:
tape.watch((loc, scale_tril))
lp = tf.reduce_sum(dist.log_prob(loc, scale_tril, observations)) / len(samples)
grad = tape.gradient(lp, (loc, scale_tril))
return ctx.all_reduce('sum', lp), [ctx.all_reduce('sum', g) for g in grad]
@tf.function(autograph=False)
@tf.custom_gradient
def target_log_prob(loc, scale_tril):
lp, grads = st.run(log_prob_and_grad, (loc, scale_tril, observations))
return lp.values[0], lambda grad_lp: [grad_lp * g.values[0] for g in grads]
singleton_vals = tfp.math.value_and_gradient(target_log_prob, (loc, scale_tril))
print(singleton_vals)
print("*"*50)
sys.exit()
kernel = tfp.mcmc.HamiltonianMonteCarlo(target_log_prob, step_size=.35, num_leapfrog_steps=2)
kernel = tfp.mcmc.TransformedTransitionKernel(kernel, bijector=[tfb.Identity(), tfb.FillScaleTriL()])
@tf.function(autograph=False)
def sample_chain():
return tfp.mcmc.sample_chain(
num_results=200, num_burnin_steps=100,
current_state=[tf.ones_like(loc), tf.linalg.eye(scale_tril.shape[-1])],
kernel=kernel, trace_fn=lambda _, kr: kr.inner_results.is_accepted)
samps, is_accepted = sample_chain()
print(f'accept rate: {np.mean(is_accepted)}')
print(f'ess: {tfp.mcmc.effective_sample_size(samps)}')
print(tf.reduce_mean(samps[0], axis=0))
print(tf.reduce_mean(samps[1], axis=0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment