Skip to content

Instantly share code, notes, and snippets.

@mottosso
Last active January 9, 2020 06:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mottosso/dd59f815025e4d76f453f3555e39558e to your computer and use it in GitHub Desktop.
Save mottosso/dd59f815025e4d76f453f3555e39558e to your computer and use it in GitHub Desktop.
Subprocess profiling

Table of contents


Usage

In Python 2 or 3 on any platform

$ python -u subprocess_profile.py --bytes=100 --iterations=1000

Results on Python 2.7 on play-with-docker:

$ docker run -ti --rm ubuntu
(ubuntu) $ apt-get update && apt-get install python
(ubuntu) $ python -u subprocess_profile.py --bytes=1 --iterations=10000
...
Iterations: 10,000
Bytes/iteration: 76 b/i
Bytes/second: 1,982,766 b/s
Avarage roundtrip time: 0.016 ms
Min roundtrip time: 0.002 ms
Max roundtrip time: 0.202 ms
Delta roundtrip time: 0.200 ms
Total bytes: 760,000 b
Total time: 0.383 s

Aggregate results

The second snippet runs the first many times, aggregating their results.

$ python subprocess_aggregate.py
...

10 minutes later

Producing a plot like aggplot.svg

Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
"""Aggregate STDIO results"""
import os
import sys
import json
import pygal
import subprocess
from collections import defaultdict
dp0 = os.path.dirname(__file__)
fname = os.path.join(dp0, "subprocess_profile.py")
oname = os.path.join(dp0, "output.json")
pname = os.path.join(dp0, "aggplot.svg")
rname = os.path.join(dp0, "results.json")
def run(sizes):
data = dict()
for size in sizes:
if size != 1 and size % 100 != 0:
continue
print("bytes: %d" % size)
subprocess.call([
sys.executable,
"stdio_cli.py",
"--iterations", str(10000),
"--bytes", str(size),
"--output", oname,
])
with open(oname) as f:
results = json.load(f)
data[str(size)] = results
print("Writing data to '%s'" % rname)
with open(rname, "w") as f:
json.dump(data, f, indent=2)
if not os.path.exists(rname):
run(sizes=range(1, 10000 + 1))
with open(rname) as f:
data = json.load(f)
sizes = list()
times = defaultdict(list)
for size, result in data.items():
sizes.append(size)
for key, value in result.items():
if key in ("mintime", "avgtime"):
value *= 1000 # microseconds
key += " (mu)"
if key == "bps":
value /= 10 ** 6
key = "mbps"
times[key].append(value)
print("Plotting to %s.." % pname)
plot = pygal.Line(width=2000, height=500)
plot.title = "Time relative size"
for key, values in times.items():
if key in ("iterations", "totb", "bpi",
"maxtime", "deltime", "totdur"):
continue
plot.add(key, values, show_dots=False)
plot.x_labels = sizes
plot.render_to_file(pname)
"""Subprocess IPC communication test
Determine the throughput and bandwidth of standard Python
subprocess interprocess communication.
"""
import os
import sys
import time
import pstats
import logging
import cProfile
import argparse
import subprocess
logging.basicConfig(format="%(name)s: %(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("-o", "--output")
parser.add_argument("-i", "--iterations", default=100, type=int)
parser.add_argument("-b", "--bytes", default=1000, type=int)
parser.add_argument("-s", "--stats", action="store_true",
help="Print additional stats")
parser.add_argument("-p", "--plot", action="store_true")
parser.add_argument("-q", "--quiet", action="store_true")
opt = parser.parse_args()
if not os.getenv("CHILD"):
# Parent Process
#
# This block boots up another Python interpreter
# so as to send messages back and forth.
# ______________
# | |
# | |
# | PARENT |
# | |
# |______________|
#
log = logging.getLogger("<-- PY")
log.setLevel(logging.INFO)
log.info("Booting child..")
popen = subprocess.Popen(
# Open yourself, but run the below block
[sys.executable, __file__],
env=dict(os.environ, **{"CHILD": "1"}),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
bufsize=-1,
universal_newlines=True,
)
# Wait for child to get ready..
response = popen.stdout.readline()
assert response == "Ready\n", "Was: '%s'" % response
# Data the size of opt.bytes
data = "0".zfill(opt.bytes)
bytes_ = sys.getsizeof(data)
log.info("Created data {:,} bytes (len {})".format(
bytes_, len(data)
))
write_per_iteration = list()
rtrip_per_iteration = list() # roundtrip
child_wait_times = list()
times = list()
if opt.stats:
profile = cProfile.Profile()
profile.enable()
tot0 = time.clock()
for i in range(opt.iterations):
# Write
write0 = time.clock()
popen.stdin.write(data)
popen.stdin.write("%d\n" % i)
popen.stdin.flush()
write1 = time.clock()
# Read
read0 = time.clock()
response = popen.stdout.readline()
read1 = time.clock()
write_per_iteration.append((write1 - write0) * 1000)
rtrip_per_iteration.append((read1 - read0) * 1000)
times.append(read1)
index, wait = response.rsplit("-", 1)
response = int(index)
wait = float(wait)
child_wait_times.append(wait)
assert response == i, "Was %d, expected %d" % (response, i)
if not opt.quiet and (opt.iterations > 1000 and i % 100 == 0):
sys.stderr.write("\r%d/%d.." % (i, opt.iterations))
if not opt.quiet:
sys.stderr.write("\r%d/%d..\n" % (opt.iterations, opt.iterations))
if opt.stats:
profile.disable()
stats = pstats.Stats(profile).sort_stats("time")
stats.print_stats(20)
tot1 = time.clock()
totdur = tot1 - tot0 # s
# Send kill signal
popen.kill()
popen.wait()
def plot():
import pygal
fname = os.path.join(os.getcwd(), "plot.svg")
log.info("Plotting to %s.." % fname)
assert rtrip_per_iteration
assert write_per_iteration
assert child_wait_times
plot = pygal.Line(width=2000, height=500)
plot.title = "Time per iteration"
plot.add("Roundtrip (ms)", rtrip_per_iteration, show_dots=False)
plot.add("Write to child (ms)", write_per_iteration, show_dots=False)
plot.add("Child wait (ms)", child_wait_times, show_dots=False)
# plot.x_labels = times
plot.render_to_file(fname)
if opt.plot:
try:
plot()
except ImportError:
log.info("Plotting skipped, could not find pygal")
iterations = opt.iterations
bpi = bytes_ * 2
totb = opt.iterations * bpi # in + out
bps = totb / (totdur)
avgtime = (sum(rtrip_per_iteration) / len(rtrip_per_iteration))
mintime = min(rtrip_per_iteration)
maxtime = max(rtrip_per_iteration)
deltime = maxtime - mintime
results = (
("iterations", opt.iterations),
("bpi", bytes_ * 2),
("bps", bps),
("avgtime", avgtime),
("mintime", mintime),
("maxtime", maxtime),
("deltime", deltime),
("totb", totb),
("totdur", totdur),
)
if opt.output:
import json
with open(opt.output, "w") as f:
log.info("Writing results to '%s'" % opt.output)
json.dump(dict(results), f, indent=2, sort_keys=True)
print("""\
Iterations: {iterations:,}
Bytes/iteration: {bpi:,} b/i
Bytes/second: {bps:,.0f} b/s
Avarage roundtrip time: {avgtime:.3f} ms
Min roundtrip time: {mintime:.3f} ms
Max roundtrip time: {maxtime:.3f} ms
Delta roundtrip time: {deltime:.3f} ms
Total bytes: {totb:,} b
Total time: {totdur:.3f} s
""".format(**locals()))
else:
# Child Process
#
# This block represents the additional Python interpreter which
# is receiving messages via sys.stdin from the above parent.
# _____________
# | |
# | |
# | CHILD |
# | |
# |_____________|
#
log = logging.getLogger("--> MO")
log.setLevel(logging.INFO)
log.info("Ready for action..")
sys.stdout.write("Ready\n")
sys.stdout.flush()
t0 = None
while True:
line = sys.stdin.readline().rstrip()
# Wait time
t1 = time.clock()
duration = "-%.6f\n" % ((t1 - (t0 or t1)) * 1000)
sys.stdout.write(line)
sys.stdout.write(duration)
sys.stdout.flush()
t0 = time.clock()
log.info("Dying..")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment