Skip to content

Instantly share code, notes, and snippets.

@utaal
Last active June 26, 2018 08:53
Show Gist options
  • Save utaal/a919d86929b7955b6479a4788151931a to your computer and use it in GitHub Desktop.
Save utaal/a919d86929b7955b6479a4788151931a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import experiments
def experiment_setup(experiment_name, n, w, **config):
experiments.ensuredir(experiment_name)
return "{}/{}_n={}_w={}_{}".format(
experiments.experdir(experiment_name),
experiment_name,
n,
w,
"_".join(["{}={}".format(k, str(v)) for k, v in config.items()]))
def arrange_closed_loop():
for run in range(0, 10):
experiment_name = "arrange-closed-loop-{}".format(run)
experiments.eprint("### {} ###".format(experiment_name))
experiments.eprint(experiments.experdir(experiment_name))
for w in reversed([1, 2, 4, 8, 16, 32]):
for keys in [10000000]:
for recs in [32000000]:
for rate in [10000]: # * x for x in [1]]: #, 2, 4, 8]]:
for work in [4,]:
for comp in [
# "exchange",
"arrange",
"maintain",
# "selfjoin",
"count",
# "nothing",
]:
mode = "closedloop"
dmode = "seconds"
dparam = "60"
config = {
"keys": keys,
"recs": recs,
"rate": rate,
"work": work,
"comp": comp,
"mode": mode,
"dmode": dmode,
"dparam": dparam,
}
n = 1
filename = experiment_setup(experiment_name, n, w, **config)
experiments.eprint("RUNNING {}".format(filename))
commands = [
("./target/release/arrange {} -h hostfile.txt -n {} -p {} -w {}".format(
" ".join(str(x) for x in [keys, recs, rate, work, comp, mode, dmode, dparam]),
n,
p,
w), p) for p in range(0, n)]
experiments.eprint("commands: {}".format(commands))
processes = [experiments.run_cmd(command, filename, True, node = 4) for command, p in commands]
experiments.waitall(processes)
def arrange_open_loop_load_varies():
experiment_name = "arrange-open-loop-load-varies"
experiments.eprint("### {} ###".format(experiment_name))
experiments.eprint(experiments.experdir(experiment_name))
for n in [1]:
for w in [1]:
total_workers = n * w
for factor in [1, 2, 4, 8, 16, 32]:
rate = int(1000000 / factor)
keys = int(10000000 / factor)
for recs in [32000000]:
for work in [4]:
for comp in [
# "exchange",
# "arrange",
"maintain",
# "selfjoin",
# "count",
# "nothing",
]:
mode = "openloop"
dmode = "overwrite"
dparam = "30"
config = {
"keys": keys,
"recs": recs,
"rate": rate,
"work": work,
"comp": comp,
"mode": mode,
"dmode": dmode,
"dparam": dparam,
}
filename = experiment_setup(experiment_name, n, w, **config)
experiments.eprint("RUNNING {}".format(filename))
commands = [
("./target/release/arrange {} -h hostfile.txt -n {} -p {} -w {}".format(
" ".join(str(x) for x in [keys, recs, rate, work, comp, mode, dmode, dparam]),
n,
p,
w), p) for p in range(0, n)]
experiments.eprint("commands: {}".format(commands))
processes = [experiments.run_cmd(command, filename, True, node = 4) for command, p in commands]
experiments.waitall(processes)
def arrange_open_loop_strong_scaling():
experiment_name = "arrange-open-loop-strong-scaling"
experiments.eprint("### {} ###".format(experiment_name))
experiments.eprint(experiments.experdir(experiment_name))
for n in [1]:
for w in [16, 8, 4, 2, 1]: # 32
total_workers = n * w
for keys in [10000000]: #, 20000000]:
for recs in [32000000]: # , 64000000]:
for rate in [750000, 1000000, 1250000]:
for work in [1, 4, "max"]:
for comp in [
# "exchange",
# "arrange",
"maintain",
# "selfjoin",
# "count",
# "nothing",
]:
mode = "openloop"
dmode = "seconds"
dparam = "300"
config = {
"keys": keys,
"recs": recs,
"rate": rate,
"work": work,
"comp": comp,
"mode": mode,
"dmode": dmode,
"dparam": dparam,
}
filename = experiment_setup(experiment_name, n, w, **config)
experiments.eprint("RUNNING {}".format(filename))
commands = [
("./target/release/arrange {} -h hostfile.txt -n {} -p {} -w {}".format(
" ".join(str(x) for x in [keys, recs, rate, work, comp, mode, dmode, dparam]),
n,
p,
w), p) for p in range(0, n)]
experiments.eprint("commands: {}".format(commands))
processes = [experiments.run_cmd(command, filename, True, node = 3) for command, p in commands]
experiments.waitall(processes)
def arrange_open_loop_weak_scaling():
experiment_name = "arrange-open-loop-weak-scaling"
experiments.eprint("### {} ###".format(experiment_name))
experiments.eprint(experiments.experdir(experiment_name))
for n in [1]:
for w in [16, 8, 4, 2, 1]:
total_workers = n * w
for keys in [10000000 * total_workers]: # , 20000000]:
for recs in [32000000 * total_workers]: #, 64000000]:
for rate in [750000 * w, 1000000 * w]:
for work in [1, 4, "max"]:
for comp in [
# "exchange",
"arrange",
"maintain",
# "selfjoin",
"count",
# "nothing",
]:
mode = "openloop"
dmode = "overwrite"
dparam = "10"
config = {
"keys": keys,
"recs": recs,
"rate": rate,
"work": work,
"comp": comp,
"mode": mode,
"dmode": dmode,
"dparam": dparam,
}
filename = experiment_setup(experiment_name, n, w, **config)
experiments.eprint("RUNNING {}".format(filename))
commands = [
("./target/release/arrange {} -h hostfile.txt -n {} -p {} -w {}".format(
" ".join(str(x) for x in [keys, recs, rate, work, comp, mode, dmode, dparam]),
n,
p,
w), p) for p in range(0, n)]
experiments.eprint("commands: {}".format(commands))
processes = [experiments.run_cmd(command, filename, True, node = 3) for command, p in commands]
experiments.waitall(processes)
#!/usr/bin/env python3
import sys, os
from executor import execute
is_worktree_clean = execute("cd `git rev-parse --show-toplevel`; git diff-index --quiet HEAD -- src/ Cargo.toml experiments/src/ experiments/Cargo.toml", check=False)
if not is_worktree_clean:
shall = input("Work directory dirty. Continue? (y/N) ").lower() == 'y'
# current_commit = ("dirty-" if not is_worktree_clean else "") + execute("git rev-parse HEAD", capture=True)
current_commit = "may-1"
def eprint(*args):
print(*args, file=sys.stderr)
def experdir(name):
return "results/{}/{}".format(current_commit, name)
def ensuredir(name):
eprint("making directory: {}".format(experdir(name)))
execute("mkdir -p {}".format(experdir(name)))
def waitall(processes):
for p in processes:
p.wait()
eprint("commit: {}".format(current_commit))
# assumes the experiment code is at this path on the cluster machine(s)
cluster_src_path = "/home/andreal/Src/differential-dataflow/experiments"
cluster_server = "andreal@fdr"
def run_cmd(cmd, redirect=None, background=False, node=""):
full_cmd = "cd {}; {}".format(cluster_src_path, cmd)
eprint("running on {}{}: {}".format(cluster_server, node, full_cmd))
if redirect is not None and os.path.exists(redirect):
return execute("echo \"skipping {}\"".format(redirect), async=background)
else:
return execute("ssh -t {}{} \"{}\"".format(cluster_server, node, full_cmd) +
(" > {}".format(redirect) if redirect else ""), async=background)
coloredlogs==9.0
executor==19.1
fasteners==0.14.1
humanfriendly==4.10
monotonic==1.4
property-manager==2.2
six==1.11.0
verboselogs==1.7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment