Skip to content

Instantly share code, notes, and snippets.

@bbengfort

bbengfort/mpmprof.py

Created Mar 20, 2017
Embed
What would you like to do?
Fork of memory_profiler that performs per-child memory profiling for multiprocessing applications.
#!/usr/bin/env python3
"""
Multiprocessing version of memory profiling of Python programs.
"""
import os
import re
import time
import glob
import argparse
import subprocess
import memory_profiler as mp
from collections import defaultdict
try:
import numpy as np
import matplotlib.pyplot as plt
except ImportError:
plt = None
np = None
# Command Descriptions and Constants
DESCRIPTION = "Multiprocessing memory profiling over time."
EPILOG = "If there are any bugs or concerns, submit an issue on Github"
VERSION = "mpmprof v{}".format(mp.__version__)
FILETIME = "%Y%m%d%H%M%S"
BLANKS = set(' \t')
def run_action(args):
"""
Run the given program and profile its memory usage.
"""
# Determine where to write the output to
if args.output is None:
args.output = "mprofile_{}.dat".format(
time.strftime(FILETIME, time.localtime())
)
# Determine if the command is a Python command
if args.command[0].endswith('.py') and not args.nopython:
args.python = True
# Run the executable with the extra features
if args.python:
print("running as a Python program ...")
if not args.command[0].startswith('python'):
args.command.insert(0, 'python')
# Inform the user we're sampling
print("mpmprof: Sampling memory every {} seconds".format(args.interval))
# Put the command back together from the argument parsing
command = " ".join([
c if BLANKS.isdisjoint(c) else "'{}'".format(c) for c in args.command
])
# Open a subprocess to the given command
proc = subprocess.Popen(args.command)
# This is where a call to mp.memory_usage should go.
# Instead we're adding the custom code for sampling spawned memory
with open(args.output, "a") as f:
# Write the command to the data file
f.write("CMDLINE {}\n".format(command))
# Continue sampling until the subprocess is over, counting lines
lines = 0
while True:
# Determine if the subprocess is still running
if proc.poll() is not None: break
# Collect memory usage of master program and write to profile
mem = mp._get_memory(proc.pid)
f.write("MEM {0:.6f} {1:.4f}\n".format(mem, time.time()))
lines += 1
# Collect memory usage of spawned children and write to profile
for idx, mem in enumerate(mp._get_child_memory(proc.pid)):
f.write("CHLD{0} {1:.6f} {2:.4f}\n".format(idx, mem, time.time()))
lines += 1
# Flush every 50 lines
if lines > 50:
lines = 0
f.flush()
# Sleep for the given interval
time.sleep(args.interval)
# Return the results of the run action
return "memory profile written to {}".format(args.output)
def plot_action(args):
"""
Use matplotlib to draw the memory usage of a mprofile .dat file.
"""
if plt is None:
raise ImportError(
"matplotlib is needed for plotting."
)
def read_mprofile_file(path):
"""
Reads the specialized version of the mprofile for multiprocessing
"""
# Regular expression line parsers for parsing data
cmdre = re.compile(r"^CMDLINE\s+(.+)$")
memre = re.compile(r"^MEM\s+([\d\.e]+)\s+([\d\.e]+)$")
cldre = re.compile(r"^CHLD(\d+)\s+([\d\.e]+)\s+([\d\.e]+)$")
# Data structure returned is a series of names (mem, ts) tuples.
series = defaultdict(list)
command = None
with open(path, 'r') as f:
for line in f:
# Match children memory usage lines
match = cldre.match(line)
if match:
idx, mem, ts = match.groups()
series["child " + idx].append((float(mem), float(ts)))
continue
# Match main process memory usage lines
match = memre.match(line)
if match:
series['main'].append(tuple(map(float, match.groups())))
continue
# Match command line(s)
# NOTE: mprofile files are openeded for appending, could be multiple
match = cmdre.match(line)
if match:
command = match.groups()[0]
return command, series
def plot_mprofile_file(path, title=None):
"""
Plots an mprofile file that contains specialized child process data.
"""
# Parse the mprofile file to get the data
command, series = read_mprofile_file(path)
title = title or command
# Create and configure the figure
fig = plt.figure(figsize=(14, 6), dpi=90)
axe = fig.add_axes([0.1, 0.1, 0.6, 0.75])
axe.set_xlabel("time (in seconds)")
axe.set_ylabel("memory used (in MiB)")
axe.set_title(title)
# Find the start timestamp for the process and track the maximal memory point
# This currently assumes that the series were written in order
start = series['main'][0][1]
mpoint = (0, 0)
# Plot all of the series, the main process and the child.
for proc, data in series.items():
# Create the numpy arrays from the series data
ts = np.asarray([item[1] for item in data]) - start
mem = np.asarray([item[0] for item in data])
# Plot the line to the figure
plt.plot(ts, mem, "+-", label=proc)
# Detect the maximal memory point
max_mem = mem.max()
if max_mem > mpoint[1]:
mpoint = (mem.argmax(), max_mem)
# Add the marker lines for the maximal memory usage
plt.hlines(mpoint[1], plt.xlim()[0]+0.001, plt.xlim()[1] - 0.001, 'r', '--')
plt.vlines(ts[mpoint[0]], plt.ylim()[0]+0.001, plt.ylim()[1] - 0.001, 'r', '--')
# Add the legend
legend = axe.legend(loc='center left', bbox_to_anchor=(1, 0.5))
legend.get_frame().set_alpha(0.5)
axe.grid()
# Get the latest profile if no profile files were passed in.
if not args.profile:
# Glob profiles of our format and sort them.
profiles = glob.glob("mprofile_??????????????.dat")
profiles.sort()
if not profiles:
raise ValueError((
"No input file found.\nThis program looks for mprofile_*.dat "
"files generated by the `mpmprof run` command."
))
# Assign the latest profile to visualize
args.profile = profiles[0:1]
# Filter out any files that do not exist
args.profile = list(filter(os.path.exists, args.profile))
if not args.profile:
raise ValueError("No input files found!")
# For each passed in file, create a figure from the mprofile.
for path in args.profile:
axe = plot_mprofile_file(path, args.title)
if args.output:
plt.savefig(args.output)
else:
plt.show()
return "{} memory profiles plotted.".format(len(args.profile))
if __name__ == '__main__':
# Create the argument parser and subparsers for each command
parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG)
subparsers = parser.add_subparsers(title='commands')
# Add the version command
parser.add_argument('-v', '--version', action='version', version=VERSION)
# Commands defined in an dictionary for easy adding
commands = (
# Run command definition
{
'name': 'run',
'action': run_action,
'help': 'monitor the memory usage of a command',
'args': {
'--python': {
'default': False,
'action': 'store_true',
'help': 'activates extra features for Python programs',
},
'--nopython': {
'default': False,
'action': 'store_true',
'help': 'disables extra features for Python programs',
},
('-T', '--interval'): {
'type': float,
'default': 0.1,
'metavar': 'S',
'help': 'sampling period (in seconds), defaults to 0.1',
},
('-o', '--output'): {
'type': str,
'default': None,
'metavar': 'PATH',
'help': 'location to write the memory profiler output to',
},
'command': {
'nargs': argparse.REMAINDER,
'help': 'command to run and profile memory usage',
}
}
},
# Plot command definition
{
'name': 'plot',
'action': plot_action,
'help': 'plot the memory usage of a mprofile data file',
'args': {
('-t', '--title'): {
'type': str,
'default': None,
'metavar': 'S',
'help': 'set the title of the figure',
},
('-o', '--output'): {
'type': str,
'default': None,
'metavar': 'PATH',
'help': 'write the figure as a png to disk'
},
'profile': {
'nargs': '*',
'help': 'profile to plot, omit to use the latest',
}
}
}
)
# Add the commands and their arguments.
for cmd in commands:
# Create the command subparser and add the action
cmd_parser = subparsers.add_parser(cmd['name'], help=cmd['help'])
cmd_parser.set_defaults(func=cmd['action'])
# Add the arguments
for args, kwargs in cmd['args'].items():
if isinstance(args, str):
args = (args,)
cmd_parser.add_argument(*args, **kwargs)
# Handle input from the command line
args = parser.parse_args() # Parse the arguments
try:
msg = args.func(args) # Call the default function
parser.exit(0, msg+"\n") # Exit cleanly with message
except Exception as e:
parser.error(str(e)) # Exit with error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment