Last active
January 31, 2024 22:31
-
-
Save robertnishihara/2b81595abd4f50a049767a040ce435ab to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from multiprocessing import Pool, Process | |
import numpy as np | |
import psutil | |
import scipy.signal | |
import sys | |
import tensorflow as tf | |
import time | |
num_trials = 5 | |
# Count the number of physical CPUs. | |
num_cpus = psutil.cpu_count(logical=False) | |
print('Using {} cores.'.format(num_cpus)) | |
################################################ | |
###### Benchmark 1: numerical computation ###### | |
################################################ | |
def f(args): | |
image, random_filter = args | |
# Do some image processing. | |
return scipy.signal.convolve2d(image, random_filter)[::5, ::5] | |
pool = Pool(num_cpus) | |
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)] | |
def run_benchmark(): | |
image = np.zeros((3000, 3000)) | |
pool.map(f, zip(num_cpus * [image], filters)) | |
durations1 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
run_benchmark() | |
duration1 = time.time() - start_time | |
durations1.append(duration1) | |
print('Numerical computation workload took {} seconds.'.format(duration1)) | |
############################################### | |
###### Benchmark 2: stateful computation ###### | |
############################################### | |
def accumulate_prefixes(args): | |
running_prefix_count, running_popular_prefixes, document = args | |
for word in document: | |
for i in range(1, len(word)): | |
prefix = word[:i] | |
running_prefix_count[prefix] += 1 | |
if running_prefix_count[prefix] > 3: | |
running_popular_prefixes.add(prefix) | |
return running_prefix_count, running_popular_prefixes | |
pool = Pool(num_cpus) | |
durations2 = [] | |
for _ in range(num_trials): | |
running_prefix_counts = [defaultdict(int) for _ in range(4)] | |
running_popular_prefixes = [set() for _ in range(4)] | |
start_time = time.time() | |
for i in range(10): | |
documents = [[np.random.bytes(20) for _ in range(10000)] | |
for _ in range(num_cpus)] | |
results = pool.map( | |
accumulate_prefixes, | |
zip(running_prefix_counts, running_popular_prefixes, documents)) | |
running_prefix_counts = [result[0] for result in results] | |
running_popular_prefixes = [result[1] for result in results] | |
popular_prefixes = set() | |
for prefixes in running_popular_prefixes: | |
popular_prefixes |= prefixes | |
duration2 = time.time() - start_time | |
durations2.append(duration2) | |
print('Stateful computation workload took {} seconds.'.format(duration2)) | |
################################################### | |
###### Benchmark 3: expensive initialization ###### | |
################################################### | |
def save_model(): | |
mnist = tf.keras.datasets.mnist.load_data() | |
x_train, y_train = mnist[0] | |
x_train = x_train / 255.0 | |
model = tf.keras.models.Sequential([ | |
tf.keras.layers.Flatten(input_shape=(28, 28)), | |
tf.keras.layers.Dense(512, activation=tf.nn.relu), | |
tf.keras.layers.Dropout(0.2), | |
tf.keras.layers.Dense(10, activation=tf.nn.softmax) | |
]) | |
model.compile( | |
optimizer='adam', | |
loss='sparse_categorical_crossentropy', | |
metrics=['accuracy']) | |
# Train the model. | |
model.fit(x_train, y_train, epochs=1) | |
# Save the model to disk. | |
filename = '/tmp/model' | |
model.save(filename) | |
# Train and save the model. This has to be done in a separate process because | |
# otherwise Python multiprocessing will hang when you try do run the code | |
# below. | |
p = Process(target=save_model) | |
p.start() | |
p.join() | |
filename = '/tmp/model' | |
def evaluate_next_batch(i): | |
# Pin the process to a specific core if we are on Linux to prevent | |
# contention between the different processes since TensorFlow uses | |
# multiple threads. | |
if sys.platform == 'linux': | |
psutil.Process().cpu_affinity([i]) | |
model = tf.keras.models.load_model(filename) | |
mnist = tf.keras.datasets.mnist.load_data() | |
x_test = mnist[1][0] / 255.0 | |
return model.predict(x_test) | |
pool = Pool(num_cpus) | |
durations3 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
for _ in range(10): | |
pool.map(evaluate_next_batch, range(num_cpus)) | |
duration3 = time.time() - start_time | |
durations3.append(duration3) | |
print('Expensive initialization workload took {} seconds.'.format(duration3)) | |
print('Used {} cores.'.format(num_cpus)) | |
print(""" | |
Results: | |
- Numerical computation: {} +/- {} | |
- Stateful computation: {} +/- {} | |
- Expensive initialization: {} +/- {} | |
""".format(np.mean(durations1), np.std(durations1), | |
np.mean(durations2), np.std(durations2), | |
np.mean(durations3), np.std(durations3))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
import numpy as np | |
import psutil | |
import ray | |
import scipy.signal | |
import sys | |
import tensorflow as tf | |
import time | |
num_trials = 5 | |
# Count the number of physical CPUs. | |
num_cpus = psutil.cpu_count(logical=False) | |
print('Using {} cores.'.format(num_cpus)) | |
ray.init(num_cpus=num_cpus) | |
################################################ | |
###### Benchmark 1: numerical computation ###### | |
################################################ | |
@ray.remote | |
def f(image, random_filter): | |
# Do some image processing. | |
return scipy.signal.convolve2d(image, random_filter)[::5, ::5] | |
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)] | |
def run_benchmark(): | |
image = np.zeros((3000, 3000)) | |
image_id = ray.put(image) | |
ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)]) | |
# Run it a couple times to warm up the Ray object store because the initial | |
# memory accesses are slower. | |
[run_benchmark() for _ in range(5)] | |
durations1 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
run_benchmark() | |
duration1 = time.time() - start_time | |
durations1.append(duration1) | |
print('Numerical computation workload took {} seconds.'.format(duration1)) | |
############################################### | |
###### Benchmark 2: stateful computation ###### | |
############################################### | |
@ray.remote | |
class StreamingPrefixCount(object): | |
def __init__(self): | |
self.prefix_count = defaultdict(int) | |
self.popular_prefixes = set() | |
def add_document(self, document): | |
for word in document: | |
for i in range(1, len(word)): | |
prefix = word[:i] | |
self.prefix_count[prefix] += 1 | |
if self.prefix_count[prefix] > 3: | |
self.popular_prefixes.add(prefix) | |
def get_popular(self): | |
return self.popular_prefixes | |
durations2 = [] | |
for _ in range(num_trials): | |
streaming_actors = [StreamingPrefixCount.remote() for _ in range(num_cpus)] | |
start_time = time.time() | |
for i in range(num_cpus * 10): | |
document = [np.random.bytes(20) for _ in range(10000)] | |
streaming_actors[i % num_cpus].add_document.remote(document) | |
# Aggregate all of the results. | |
results = ray.get([actor.get_popular.remote() for actor in streaming_actors]) | |
popular_prefixes = set() | |
for prefixes in results: | |
popular_prefixes |= prefixes | |
duration2 = time.time() - start_time | |
durations2.append(duration2) | |
print('Stateful computation workload took {} seconds.'.format(duration2)) | |
################################################### | |
###### Benchmark 3: expensive initialization ###### | |
################################################### | |
mnist = tf.keras.datasets.mnist.load_data() | |
x_train, y_train = mnist[0] | |
x_train = x_train / 255.0 | |
model = tf.keras.models.Sequential([ | |
tf.keras.layers.Flatten(input_shape=(28, 28)), | |
tf.keras.layers.Dense(512, activation=tf.nn.relu), | |
tf.keras.layers.Dropout(0.2), | |
tf.keras.layers.Dense(10, activation=tf.nn.softmax) | |
]) | |
model.compile( | |
optimizer='adam', | |
loss='sparse_categorical_crossentropy', | |
metrics=['accuracy']) | |
# Train the model. | |
model.fit(x_train, y_train, epochs=1) | |
# Save the model to disk. | |
filename = '/tmp/model' | |
model.save(filename) | |
@ray.remote | |
class Model(object): | |
def __init__(self, i): | |
# Pin the actor to a specific core if we are on Linux to prevent | |
# contention between the different actors since TensorFlow uses | |
# multiple threads. | |
if sys.platform == 'linux': | |
psutil.Process().cpu_affinity([i]) | |
# Load the model and some data. | |
self.model = tf.keras.models.load_model(filename) | |
mnist = tf.keras.datasets.mnist.load_data() | |
self.x_test = mnist[1][0] / 255.0 | |
def evaluate_next_batch(self): | |
# Note that we reuse the same data over and over, but in a | |
# real application, the data would be different each time. | |
return self.model.predict(self.x_test) | |
def ping(self): | |
pass | |
actors = [Model.remote(i) for i in range(num_cpus)] | |
# Make sure the actors have started. | |
ray.get([actor.ping.remote() for actor in actors]) | |
durations3 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
# Parallelize the evaluation of some test data. | |
for j in range(10): | |
results = ray.get([actor.evaluate_next_batch.remote() for actor in actors]) | |
duration3 = time.time() - start_time | |
durations3.append(duration3) | |
print('Expensive initialization workload took {} seconds.'.format(duration3)) | |
print('Used {} cores.'.format(num_cpus)) | |
print(""" | |
Results: | |
- Numerical computation: {} +/- {} | |
- Stateful computation: {} +/- {} | |
- Expensive initialization: {} +/- {} | |
""".format(np.mean(durations1), np.std(durations1), | |
np.mean(durations2), np.std(durations2), | |
np.mean(durations3), np.std(durations3))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
import numpy as np | |
import psutil | |
import scipy.signal | |
import sys | |
import tensorflow as tf | |
import time | |
num_trials = 5 | |
# Count the number of physical CPUs. | |
num_cpus = psutil.cpu_count(logical=False) | |
print('Using {} cores.'.format(num_cpus)) | |
################################################ | |
###### Benchmark 1: numerical computation ###### | |
################################################ | |
def f(image, random_filter): | |
# Do some image processing. | |
return scipy.signal.convolve2d(image, random_filter)[::5, ::5] | |
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)] | |
def run_benchmark(): | |
image = np.zeros((3000, 3000)) | |
[f(image, filters[i]) for i in range(num_cpus)] | |
durations1 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
run_benchmark() | |
duration1 = time.time() - start_time | |
durations1.append(duration1) | |
print('Numerical computation workload took {} seconds.'.format(duration1)) | |
############################################### | |
###### Benchmark 2: stateful computation ###### | |
############################################### | |
class StreamingPrefixCount(object): | |
def __init__(self): | |
self.prefix_count = defaultdict(int) | |
self.popular_prefixes = set() | |
def add_document(self, document): | |
for word in document: | |
for i in range(1, len(word)): | |
prefix = word[:i] | |
self.prefix_count[prefix] += 1 | |
if self.prefix_count[prefix] > 3: | |
self.popular_prefixes.add(prefix) | |
def get_popular(self): | |
return self.popular_prefixes | |
durations2 = [] | |
for _ in range(num_trials): | |
streaming_actors = [StreamingPrefixCount() for _ in range(num_cpus)] | |
start_time = time.time() | |
for i in range(num_cpus * 10): | |
document = [np.random.bytes(20) for _ in range(10000)] | |
streaming_actors[i % num_cpus].add_document(document) | |
# Aggregate all of the results. | |
results = [actor.get_popular() for actor in streaming_actors] | |
popular_prefixes = set() | |
for prefixes in results: | |
popular_prefixes |= prefixes | |
duration2 = time.time() - start_time | |
durations2.append(duration2) | |
print('Stateful computation workload took {} seconds.'.format(duration2)) | |
################################################### | |
###### Benchmark 3: expensive initialization ###### | |
################################################### | |
mnist = tf.keras.datasets.mnist.load_data() | |
x_train, y_train = mnist[0] | |
x_train = x_train / 255.0 | |
model = tf.keras.models.Sequential([ | |
tf.keras.layers.Flatten(input_shape=(28, 28)), | |
tf.keras.layers.Dense(512, activation=tf.nn.relu), | |
tf.keras.layers.Dropout(0.2), | |
tf.keras.layers.Dense(10, activation=tf.nn.softmax) | |
]) | |
model.compile( | |
optimizer='adam', | |
loss='sparse_categorical_crossentropy', | |
metrics=['accuracy']) | |
# Train the model. | |
model.fit(x_train, y_train, epochs=1) | |
# Save the model to disk. | |
filename = '/tmp/model' | |
model.save(filename) | |
class Model(object): | |
def __init__(self): | |
# Load the model and some data. | |
self.model = tf.keras.models.load_model(filename) | |
mnist = tf.keras.datasets.mnist.load_data() | |
self.x_test = mnist[1][0] / 255.0 | |
def evaluate_next_batch(self): | |
# Note that we reuse the same data over and over, but in a | |
# real application, the data would be different each time. | |
return self.model.predict(self.x_test) | |
actor = Model() | |
durations3 = [] | |
for _ in range(num_trials): | |
start_time = time.time() | |
for j in range(10): | |
results = [actor.evaluate_next_batch() for _ in range(num_cpus)] | |
duration3 = time.time() - start_time | |
durations3.append(duration3) | |
print('Expensive initialization workload took {} seconds.'.format(duration3)) | |
print('Used {} cores.'.format(num_cpus)) | |
print(""" | |
Results: | |
- Numerical computation: {} +/- {} | |
- Stateful computation: {} +/- {} | |
- Expensive initialization: {} +/- {} | |
""".format(np.mean(durations1), np.std(durations1), | |
np.mean(durations2), np.std(durations2), | |
np.mean(durations3), np.std(durations3))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is an example configuration file for starting an instance on | |
# m5.4xlarge instance on AWS appropriately configured for these | |
# benchmarks. To launch instances of different sizes, you will need | |
# to modify the fields below. Documentation for the Ray autoscaler is | |
# available at https://ray.readthedocs.io/en/latest/autoscaling.html. | |
# An unique identifier for the head node and workers of this cluster. | |
cluster_name: benchmarkforblog_m54xlarge | |
# The minimum number of workers nodes to launch in addition to the head | |
# node. This number should be >= 0. | |
min_workers: 0 | |
# The maximum number of workers nodes to launch in addition to the head | |
# node. This takes precedence over min_workers. | |
max_workers: 0 | |
# The autoscaler will scale up the cluster to this target fraction of resource | |
# usage. For example, if a cluster of 10 nodes is 100% busy and | |
# target_utilization is 0.8, it would resize the cluster to 13. This fraction | |
# can be decreased to increase the aggressiveness of upscaling. | |
# This value must be less than 1.0 for scaling to happen. | |
target_utilization_fraction: 0.8 | |
# If a node is idle for this many minutes, it will be removed. | |
idle_timeout_minutes: 5 | |
# Cloud-provider specific configuration. | |
provider: | |
type: aws | |
region: us-west-2 | |
availability_zone: us-west-2a | |
# How Ray will authenticate with newly launched nodes. | |
auth: | |
ssh_user: ubuntu | |
# By default Ray creates a new private keypair, but you can also use your own. | |
# If you do so, make sure to also set "KeyName" in the head and worker node | |
# configurations below. | |
# ssh_private_key: /path/to/your/key.pem | |
# Provider-specific config for the head node, e.g. instance type. By default | |
# Ray will auto-configure unspecified fields such as SubnetId and KeyName. | |
# For more documentation on available fields, see: | |
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances | |
head_node: | |
InstanceType: m5.4xlarge | |
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. | |
# Set primary volume to 25 GiB | |
BlockDeviceMappings: | |
- DeviceName: /dev/sda1 | |
Ebs: | |
VolumeSize: 50 | |
# Additional options in the boto docs. | |
# Provider-specific config for worker nodes, e.g. instance type. By default | |
# Ray will auto-configure unspecified fields such as SubnetId and KeyName. | |
# For more documentation on available fields, see: | |
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances | |
worker_nodes: | |
InstanceType: m5.4xlarge | |
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. | |
# Set primary volume to 25 GiB | |
BlockDeviceMappings: | |
- DeviceName: /dev/sda1 | |
Ebs: | |
VolumeSize: 50 | |
# Run workers on spot by default. Comment this out to use on-demand. | |
InstanceMarketOptions: | |
MarketType: spot | |
# Additional options can be found in the boto docs, e.g. | |
# SpotOptions: | |
# MaxPrice: MAX_HOURLY_PRICE | |
# Additional options in the boto docs. | |
# Files or directories to copy to the head and worker nodes. The format is a | |
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. | |
file_mounts: { | |
# "/path1/on/remote/machine": "/path1/on/local/machine", | |
# "/path2/on/remote/machine": "/path2/on/local/machine", | |
} | |
# List of shell commands to run to set up nodes. | |
setup_commands: | |
# Install basics. | |
- sudo apt-get update | |
- sudo apt-get install -y build-essential curl unzip valgrind tmux gdb emacs-nox | |
# Install Anaconda. | |
- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true | |
- bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true | |
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc | |
# Install Ray and other libraries. | |
- pip install ray numpy psutil ray scipy tensorflow boto3==1.4.8 | |
# Custom commands that will be run on the head node after common setup. | |
head_setup_commands: [] | |
# Custom commands that will be run on worker nodes after common setup. | |
worker_setup_commands: [] | |
# Command to start ray on the head node. You don't need to change this. | |
head_start_ray_commands: [] | |
# Command to start ray on worker nodes. You don't need to change this. | |
worker_start_ray_commands: [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment