Skip to content

Instantly share code, notes, and snippets.

@Kentzo
Last active December 7, 2022 08:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kentzo/a36decb53cedbe9da0646a395a1accbb to your computer and use it in GitHub Desktop.
Save Kentzo/a36decb53cedbe9da0646a395a1accbb to your computer and use it in GitHub Desktop.

The benchmark measures and compares execution time and peak memory consumption of the current and new commonpath methods.

Measurement is performed using batches of 1000 paths. Each batch contains paths generated with the following variables:

  1. Length of each path part (range(16, 65, 16))
  2. Number of parts (range(4, 17, 4))
  3. Number of common parts (range(parts_count + 1))

The batch is then split into chunks of equal size such that paths in each following chunk reduce the number of common parts so far by 1 up to the selected number [3], e.g. if the selected number of common parts is 1 then the batch would be "a/b/c", "a/b/c", "a/b/d", "a/b/d", "a/c/d", "a/c/d", "a/c/d".

Each batch is tested in two permutations:

  1. Worst case where the common parts are not found until the last chunk
  2. Best case where the common parts are found by the 2nd path

The measurements are then aggregated per the number of common parts and geometric mean is computed.

import itertools
import os
import timeit
import statistics
import tracemalloc
from typing import NamedTuple
import genericpath
from ntpath import splitdrive
def ntpath_commonpath_old(paths):
"""Given a sequence of path names, returns the longest common sub-path."""
if not paths:
raise ValueError('commonpath() arg is an empty sequence')
paths = tuple(map(os.fspath, paths))
if isinstance(paths[0], bytes):
sep = b'\\'
altsep = b'/'
curdir = b'.'
else:
sep = '\\'
altsep = '/'
curdir = '.'
try:
drivesplits = [splitdrive(p.replace(altsep, sep).lower()) for p in paths]
split_paths = [p.split(sep) for d, p in drivesplits]
try:
isabs, = set(p[:1] == sep for d, p in drivesplits)
except ValueError:
raise ValueError("Can't mix absolute and relative paths") from None
# Check that all drive letters or UNC paths match. The check is made only
# now otherwise type errors for mixing strings and bytes would not be
# caught.
if len(set(d for d, p in drivesplits)) != 1:
raise ValueError("Paths don't have the same drive")
drive, path = splitdrive(paths[0].replace(altsep, sep))
common = path.split(sep)
common = [c for c in common if c and c != curdir]
split_paths = [[c for c in s if c and c != curdir] for s in split_paths]
s1 = min(split_paths)
s2 = max(split_paths)
for i, c in enumerate(s1):
if c != s2[i]:
common = common[:i]
break
else:
common = common[:len(s1)]
prefix = drive + sep if isabs else drive
return prefix + sep.join(common)
except (TypeError, AttributeError):
genericpath._check_arg_types('commonpath', *paths)
raise
def ntpath_commonpath_new(paths):
"""Given an iterable of path names, returns the longest common sub-path."""
try:
paths = iter(paths)
except TypeError:
# TODO: should be a TypeError, but not backward compatible.
raise ValueError('commonpath() arg is not iteratable') from None
try:
paths_0 = next(paths)
except StopIteration:
raise ValueError('commonpath() arg is an empty sequence') from None
paths_0 = os.fspath(paths_0)
paths_rest = map(os.fspath, paths)
if isinstance(paths_0, bytes):
sep = b'\\'
altsep = b'/'
curdir = b'.'
else:
sep = '\\'
altsep = '/'
curdir = '.'
common_drive, path = splitdrive(paths_0.replace(altsep, sep))
common_parts = [c for c in path.split(sep) if c and c != curdir]
common_drive_lower, common_parts_lower = common_drive.lower(), [c.lower() for c in common_parts]
min_parts_lower = max_parts_lower = common_parts_lower
isabs = path[:1] == sep
try:
for paths_i in paths_rest:
drive_lower, path_lower = splitdrive(paths_i.replace(altsep, sep).lower())
parts_lower = [c for c in path.split(sep) if c and c != curdir]
if (path[:1] == sep) != isabs:
raise ValueError("Can't mix absolute and relative paths")
elif drive_lower != common_drive_lower:
raise ValueError("Paths don't have the same drive")
if parts_lower < min_parts_lower:
min_parts_lower = parts_lower
elif parts_lower > max_parts_lower:
max_parts_lower = parts_lower
for i, c in enumerate(min_parts_lower):
if c != max_parts_lower[i]:
del common_parts[i:]
break
else:
del common_parts[len(min_parts_lower):]
prefix = (common_drive + sep) if isabs else common_drive
return prefix + sep.join(common_parts)
except (TypeError, AttributeError):
genericpath._check_arg_types('commonpath', paths_0, paths_i)
raise
def posixpath_commonpath_old(paths):
"""Given a sequence of path names, returns the longest common sub-path."""
if not paths:
raise ValueError('commonpath() arg is an empty sequence')
paths = tuple(map(os.fspath, paths))
if isinstance(paths[0], bytes):
sep = b'/'
curdir = b'.'
else:
sep = '/'
curdir = '.'
try:
split_paths = [path.split(sep) for path in paths]
try:
isabs, = set(p[:1] == sep for p in paths)
except ValueError:
raise ValueError("Can't mix absolute and relative paths") from None
split_paths = [[c for c in s if c and c != curdir] for s in split_paths]
s1 = min(split_paths)
s2 = max(split_paths)
common = s1
for i, c in enumerate(s1):
if c != s2[i]:
common = s1[:i]
break
prefix = sep if isabs else sep[:0]
return prefix + sep.join(common)
except (TypeError, AttributeError):
genericpath._check_arg_types('commonpath', *paths)
raise
def posixpath_commonpath_new(paths):
"""Given an iterable of path names, returns the longest common sub-path."""
try:
paths = iter(paths)
except TypeError:
# TODO: should be a TypeError, but not backward compatible.
raise ValueError('commonpath() arg cannot be iterated') from None
try:
paths_0 = next(paths)
except StopIteration:
raise ValueError('commonpath() arg is an empty sequence') from None
paths_0 = os.fspath(paths_0)
paths_rest = map(os.fspath, paths)
if isinstance(paths_0, bytes):
sep = b'/'
curdir = b'.'
else:
sep = '/'
curdir = '.'
isabs = paths_0[:1] == sep
min_parts = max_parts = [c for c in paths_0.split(sep) if c and c != curdir]
try:
for paths_i in paths_rest:
parts = [c for c in paths_i.split(sep) if c and c != curdir]
if (paths_i[:1] == sep) != isabs:
raise ValueError("Can't mix absolute and relative paths")
if parts < min_parts:
min_parts = parts
elif parts > max_parts:
max_parts = parts
common_parts = min_parts
for i, c in enumerate(common_parts):
if c != max_parts[i]:
del common_parts[i:]
break
prefix = sep if isabs else sep[:0]
return prefix + sep.join(common_parts)
except (TypeError, AttributeError):
genericpath._check_arg_types('commonpath', paths_0, paths_i)
raise
class BenchmarkCaseResult(NamedTuple):
exec_time: float
mem_peak: float
class BenchmarkResult(NamedTuple):
worst_case: BenchmarkCaseResult
best_case: BenchmarkCaseResult
class Benchmark:
def __init__(self, batch_size, part_size_range, parts_count_range, old_func, new_func):
self.batch_size = batch_size
self.part_size_range = part_size_range
self.parts_count_range = parts_count_range
self.old_func = old_func
self.new_func = new_func
def gen_test_batches(self, batch_size, part_size_range, parts_count_range):
for part_size, parts_count in itertools.product(part_size_range, parts_count_range):
for common_parts_count in range(parts_count + 1):
batch = []
chunks_count = 1 + parts_count - common_parts_count
chunk_size = batch_size // chunks_count # reminder is accounted for below
template = [ord('a') + i for i in range(parts_count)]
for i in range(chunks_count):
if i != 0:
template[-i] += 1
path = '/'.join(chr(c) * part_size for c in template)
if i < batch_size % chunks_count:
batch.extend([path.lower() for _ in range(chunk_size + 1)]) # enforce true copying
else:
batch.extend([path.lower() for _ in range(chunk_size)])
assert len(batch) == batch_size
assert len(batch) == len(set(map(id, batch)))
yield (part_size, parts_count, common_parts_count), batch
def measure(self, func, batch, repeat=3, number=8):
globals = {'func': func}
def measure_case(batch_case):
globals['case'] = batch_case
exec_time = min(timeit.repeat('func(case)', repeat=repeat, number=number, globals=globals)) / number
tracemalloc.start()
tracemalloc.reset_peak()
func(batch_case)
mem_peak = float(tracemalloc.get_traced_memory()[1])
tracemalloc.stop()
return BenchmarkCaseResult(exec_time, mem_peak)
# common parts are not found until the last chunk
batch_worst_case = sorted(batch)
# common parts are found on the second iteration
batch_best_case = list(batch_worst_case)
batch_best_case[1], batch_best_case[-1] = batch_best_case[-1], batch_best_case[1]
r = BenchmarkResult(
worst_case=measure_case(batch_worst_case),
best_case=measure_case(batch_best_case),
)
return r
def aggregate_gains(self, old, new):
kwargs = {}
for f in BenchmarkResult._fields:
old_case_result = getattr(old, f)
new_case_result = getattr(new, f)
kwargs[f] = BenchmarkCaseResult(
exec_time=new_case_result.exec_time / old_case_result.exec_time,
mem_peak=new_case_result.mem_peak / old_case_result.mem_peak
)
return BenchmarkResult(**kwargs)
def aggregate_stats(self, gains):
kwargs = {}
for f in BenchmarkResult._fields:
kwargs[f] = BenchmarkCaseResult(
exec_time=round((statistics.geometric_mean(getattr(b, f).exec_time for b in gains) - 1) * 100, ndigits=1),
mem_peak=round((statistics.geometric_mean(getattr(b, f).mem_peak for b in gains) - 1) * 100, ndigits=1)
)
return BenchmarkResult(**kwargs)
def run(self):
old = {}
new = {}
for key, batch in self.gen_test_batches(self.batch_size, self.part_size_range, self.parts_count_range):
old[key] = self.measure(self.old_func, batch)
new[key] = self.measure(self.new_func, batch)
gains = {}
for key in old:
common_parts_count = key[2]
gains.setdefault(common_parts_count, []).append(self.aggregate_gains(old[key], new[key]))
stats = {}
for key in gains:
stats[key] = self.aggregate_stats(gains[key])
return stats
bench = Benchmark(1000, range(16, 65, 16), range(4, 17, 4), ntpath_commonpath_old, ntpath_commonpath_new)
print(f"\t \tEXEC\tPEAK MEM")
for common_parts_count, benchmark in bench.run().items():
print(f"{common_parts_count} common parts:")
print(f"\tworst case\t{benchmark.worst_case.exec_time:+}%\t{benchmark.worst_case.mem_peak:+}%")
print(f"\tbest case\t{benchmark.best_case.exec_time:+}%\t{benchmark.best_case.mem_peak:+}%")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment