Skip to content

Instantly share code, notes, and snippets.

@robertnishihara
Last active January 31, 2024 22:31
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save robertnishihara/2b81595abd4f50a049767a040ce435ab to your computer and use it in GitHub Desktop.
Save robertnishihara/2b81595abd4f50a049767a040ce435ab to your computer and use it in GitHub Desktop.
from collections import defaultdict
from multiprocessing import Pool, Process
import numpy as np
import psutil
import scipy.signal
import sys
import tensorflow as tf
import time
num_trials = 5
# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)
print('Using {} cores.'.format(num_cpus))
################################################
###### Benchmark 1: numerical computation ######
################################################
def f(args):
image, random_filter = args
# Do some image processing.
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
pool = Pool(num_cpus)
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
def run_benchmark():
image = np.zeros((3000, 3000))
pool.map(f, zip(num_cpus * [image], filters))
durations1 = []
for _ in range(num_trials):
start_time = time.time()
run_benchmark()
duration1 = time.time() - start_time
durations1.append(duration1)
print('Numerical computation workload took {} seconds.'.format(duration1))
###############################################
###### Benchmark 2: stateful computation ######
###############################################
def accumulate_prefixes(args):
running_prefix_count, running_popular_prefixes, document = args
for word in document:
for i in range(1, len(word)):
prefix = word[:i]
running_prefix_count[prefix] += 1
if running_prefix_count[prefix] > 3:
running_popular_prefixes.add(prefix)
return running_prefix_count, running_popular_prefixes
pool = Pool(num_cpus)
durations2 = []
for _ in range(num_trials):
running_prefix_counts = [defaultdict(int) for _ in range(4)]
running_popular_prefixes = [set() for _ in range(4)]
start_time = time.time()
for i in range(10):
documents = [[np.random.bytes(20) for _ in range(10000)]
for _ in range(num_cpus)]
results = pool.map(
accumulate_prefixes,
zip(running_prefix_counts, running_popular_prefixes, documents))
running_prefix_counts = [result[0] for result in results]
running_popular_prefixes = [result[1] for result in results]
popular_prefixes = set()
for prefixes in running_popular_prefixes:
popular_prefixes |= prefixes
duration2 = time.time() - start_time
durations2.append(duration2)
print('Stateful computation workload took {} seconds.'.format(duration2))
###################################################
###### Benchmark 3: expensive initialization ######
###################################################
def save_model():
mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)
# Train and save the model. This has to be done in a separate process because
# otherwise Python multiprocessing will hang when you try do run the code
# below.
p = Process(target=save_model)
p.start()
p.join()
filename = '/tmp/model'
def evaluate_next_batch(i):
# Pin the process to a specific core if we are on Linux to prevent
# contention between the different processes since TensorFlow uses
# multiple threads.
if sys.platform == 'linux':
psutil.Process().cpu_affinity([i])
model = tf.keras.models.load_model(filename)
mnist = tf.keras.datasets.mnist.load_data()
x_test = mnist[1][0] / 255.0
return model.predict(x_test)
pool = Pool(num_cpus)
durations3 = []
for _ in range(num_trials):
start_time = time.time()
for _ in range(10):
pool.map(evaluate_next_batch, range(num_cpus))
duration3 = time.time() - start_time
durations3.append(duration3)
print('Expensive initialization workload took {} seconds.'.format(duration3))
print('Used {} cores.'.format(num_cpus))
print("""
Results:
- Numerical computation: {} +/- {}
- Stateful computation: {} +/- {}
- Expensive initialization: {} +/- {}
""".format(np.mean(durations1), np.std(durations1),
np.mean(durations2), np.std(durations2),
np.mean(durations3), np.std(durations3)))
from collections import defaultdict
import numpy as np
import psutil
import ray
import scipy.signal
import sys
import tensorflow as tf
import time
num_trials = 5
# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)
print('Using {} cores.'.format(num_cpus))
ray.init(num_cpus=num_cpus)
################################################
###### Benchmark 1: numerical computation ######
################################################
@ray.remote
def f(image, random_filter):
# Do some image processing.
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
def run_benchmark():
image = np.zeros((3000, 3000))
image_id = ray.put(image)
ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])
# Run it a couple times to warm up the Ray object store because the initial
# memory accesses are slower.
[run_benchmark() for _ in range(5)]
durations1 = []
for _ in range(num_trials):
start_time = time.time()
run_benchmark()
duration1 = time.time() - start_time
durations1.append(duration1)
print('Numerical computation workload took {} seconds.'.format(duration1))
###############################################
###### Benchmark 2: stateful computation ######
###############################################
@ray.remote
class StreamingPrefixCount(object):
def __init__(self):
self.prefix_count = defaultdict(int)
self.popular_prefixes = set()
def add_document(self, document):
for word in document:
for i in range(1, len(word)):
prefix = word[:i]
self.prefix_count[prefix] += 1
if self.prefix_count[prefix] > 3:
self.popular_prefixes.add(prefix)
def get_popular(self):
return self.popular_prefixes
durations2 = []
for _ in range(num_trials):
streaming_actors = [StreamingPrefixCount.remote() for _ in range(num_cpus)]
start_time = time.time()
for i in range(num_cpus * 10):
document = [np.random.bytes(20) for _ in range(10000)]
streaming_actors[i % num_cpus].add_document.remote(document)
# Aggregate all of the results.
results = ray.get([actor.get_popular.remote() for actor in streaming_actors])
popular_prefixes = set()
for prefixes in results:
popular_prefixes |= prefixes
duration2 = time.time() - start_time
durations2.append(duration2)
print('Stateful computation workload took {} seconds.'.format(duration2))
###################################################
###### Benchmark 3: expensive initialization ######
###################################################
mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)
@ray.remote
class Model(object):
def __init__(self, i):
# Pin the actor to a specific core if we are on Linux to prevent
# contention between the different actors since TensorFlow uses
# multiple threads.
if sys.platform == 'linux':
psutil.Process().cpu_affinity([i])
# Load the model and some data.
self.model = tf.keras.models.load_model(filename)
mnist = tf.keras.datasets.mnist.load_data()
self.x_test = mnist[1][0] / 255.0
def evaluate_next_batch(self):
# Note that we reuse the same data over and over, but in a
# real application, the data would be different each time.
return self.model.predict(self.x_test)
def ping(self):
pass
actors = [Model.remote(i) for i in range(num_cpus)]
# Make sure the actors have started.
ray.get([actor.ping.remote() for actor in actors])
durations3 = []
for _ in range(num_trials):
start_time = time.time()
# Parallelize the evaluation of some test data.
for j in range(10):
results = ray.get([actor.evaluate_next_batch.remote() for actor in actors])
duration3 = time.time() - start_time
durations3.append(duration3)
print('Expensive initialization workload took {} seconds.'.format(duration3))
print('Used {} cores.'.format(num_cpus))
print("""
Results:
- Numerical computation: {} +/- {}
- Stateful computation: {} +/- {}
- Expensive initialization: {} +/- {}
""".format(np.mean(durations1), np.std(durations1),
np.mean(durations2), np.std(durations2),
np.mean(durations3), np.std(durations3)))
from collections import defaultdict
import numpy as np
import psutil
import scipy.signal
import sys
import tensorflow as tf
import time
num_trials = 5
# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)
print('Using {} cores.'.format(num_cpus))
################################################
###### Benchmark 1: numerical computation ######
################################################
def f(image, random_filter):
# Do some image processing.
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
def run_benchmark():
image = np.zeros((3000, 3000))
[f(image, filters[i]) for i in range(num_cpus)]
durations1 = []
for _ in range(num_trials):
start_time = time.time()
run_benchmark()
duration1 = time.time() - start_time
durations1.append(duration1)
print('Numerical computation workload took {} seconds.'.format(duration1))
###############################################
###### Benchmark 2: stateful computation ######
###############################################
class StreamingPrefixCount(object):
def __init__(self):
self.prefix_count = defaultdict(int)
self.popular_prefixes = set()
def add_document(self, document):
for word in document:
for i in range(1, len(word)):
prefix = word[:i]
self.prefix_count[prefix] += 1
if self.prefix_count[prefix] > 3:
self.popular_prefixes.add(prefix)
def get_popular(self):
return self.popular_prefixes
durations2 = []
for _ in range(num_trials):
streaming_actors = [StreamingPrefixCount() for _ in range(num_cpus)]
start_time = time.time()
for i in range(num_cpus * 10):
document = [np.random.bytes(20) for _ in range(10000)]
streaming_actors[i % num_cpus].add_document(document)
# Aggregate all of the results.
results = [actor.get_popular() for actor in streaming_actors]
popular_prefixes = set()
for prefixes in results:
popular_prefixes |= prefixes
duration2 = time.time() - start_time
durations2.append(duration2)
print('Stateful computation workload took {} seconds.'.format(duration2))
###################################################
###### Benchmark 3: expensive initialization ######
###################################################
mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)
class Model(object):
def __init__(self):
# Load the model and some data.
self.model = tf.keras.models.load_model(filename)
mnist = tf.keras.datasets.mnist.load_data()
self.x_test = mnist[1][0] / 255.0
def evaluate_next_batch(self):
# Note that we reuse the same data over and over, but in a
# real application, the data would be different each time.
return self.model.predict(self.x_test)
actor = Model()
durations3 = []
for _ in range(num_trials):
start_time = time.time()
for j in range(10):
results = [actor.evaluate_next_batch() for _ in range(num_cpus)]
duration3 = time.time() - start_time
durations3.append(duration3)
print('Expensive initialization workload took {} seconds.'.format(duration3))
print('Used {} cores.'.format(num_cpus))
print("""
Results:
- Numerical computation: {} +/- {}
- Stateful computation: {} +/- {}
- Expensive initialization: {} +/- {}
""".format(np.mean(durations1), np.std(durations1),
np.mean(durations2), np.std(durations2),
np.mean(durations3), np.std(durations3)))
# This is an example configuration file for starting an instance on
# m5.4xlarge instance on AWS appropriately configured for these
# benchmarks. To launch instances of different sizes, you will need
# to modify the fields below. Documentation for the Ray autoscaler is
# available at https://ray.readthedocs.io/en/latest/autoscaling.html.
# An unique identifier for the head node and workers of this cluster.
cluster_name: benchmarkforblog_m54xlarge
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 0
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: m5.4xlarge
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI.
# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50
# Additional options in the boto docs.
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
worker_nodes:
InstanceType: m5.4xlarge
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI.
# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50
# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE
# Additional options in the boto docs.
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# List of shell commands to run to set up nodes.
setup_commands:
# Install basics.
- sudo apt-get update
- sudo apt-get install -y build-essential curl unzip valgrind tmux gdb emacs-nox
# Install Anaconda.
- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
- bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
# Install Ray and other libraries.
- pip install ray numpy psutil ray scipy tensorflow boto3==1.4.8
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands: []
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands: []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment