|
import itertools |
|
import os |
|
import timeit |
|
import statistics |
|
import tracemalloc |
|
from typing import NamedTuple |
|
|
|
import genericpath |
|
from ntpath import splitdrive |
|
|
|
|
|
def ntpath_commonpath_old(paths): |
|
"""Given a sequence of path names, returns the longest common sub-path.""" |
|
|
|
if not paths: |
|
raise ValueError('commonpath() arg is an empty sequence') |
|
|
|
paths = tuple(map(os.fspath, paths)) |
|
if isinstance(paths[0], bytes): |
|
sep = b'\\' |
|
altsep = b'/' |
|
curdir = b'.' |
|
else: |
|
sep = '\\' |
|
altsep = '/' |
|
curdir = '.' |
|
|
|
try: |
|
drivesplits = [splitdrive(p.replace(altsep, sep).lower()) for p in paths] |
|
split_paths = [p.split(sep) for d, p in drivesplits] |
|
|
|
try: |
|
isabs, = set(p[:1] == sep for d, p in drivesplits) |
|
except ValueError: |
|
raise ValueError("Can't mix absolute and relative paths") from None |
|
|
|
# Check that all drive letters or UNC paths match. The check is made only |
|
# now otherwise type errors for mixing strings and bytes would not be |
|
# caught. |
|
if len(set(d for d, p in drivesplits)) != 1: |
|
raise ValueError("Paths don't have the same drive") |
|
|
|
drive, path = splitdrive(paths[0].replace(altsep, sep)) |
|
common = path.split(sep) |
|
common = [c for c in common if c and c != curdir] |
|
|
|
split_paths = [[c for c in s if c and c != curdir] for s in split_paths] |
|
s1 = min(split_paths) |
|
s2 = max(split_paths) |
|
for i, c in enumerate(s1): |
|
if c != s2[i]: |
|
common = common[:i] |
|
break |
|
else: |
|
common = common[:len(s1)] |
|
|
|
prefix = drive + sep if isabs else drive |
|
return prefix + sep.join(common) |
|
except (TypeError, AttributeError): |
|
genericpath._check_arg_types('commonpath', *paths) |
|
raise |
|
|
|
|
|
def ntpath_commonpath_new(paths): |
|
"""Given an iterable of path names, returns the longest common sub-path.""" |
|
try: |
|
paths = iter(paths) |
|
except TypeError: |
|
# TODO: should be a TypeError, but not backward compatible. |
|
raise ValueError('commonpath() arg is not iteratable') from None |
|
|
|
try: |
|
paths_0 = next(paths) |
|
except StopIteration: |
|
raise ValueError('commonpath() arg is an empty sequence') from None |
|
|
|
paths_0 = os.fspath(paths_0) |
|
paths_rest = map(os.fspath, paths) |
|
|
|
if isinstance(paths_0, bytes): |
|
sep = b'\\' |
|
altsep = b'/' |
|
curdir = b'.' |
|
else: |
|
sep = '\\' |
|
altsep = '/' |
|
curdir = '.' |
|
|
|
common_drive, path = splitdrive(paths_0.replace(altsep, sep)) |
|
common_parts = [c for c in path.split(sep) if c and c != curdir] |
|
common_drive_lower, common_parts_lower = common_drive.lower(), [c.lower() for c in common_parts] |
|
min_parts_lower = max_parts_lower = common_parts_lower |
|
isabs = path[:1] == sep |
|
|
|
try: |
|
for paths_i in paths_rest: |
|
drive_lower, path_lower = splitdrive(paths_i.replace(altsep, sep).lower()) |
|
parts_lower = [c for c in path.split(sep) if c and c != curdir] |
|
|
|
if (path[:1] == sep) != isabs: |
|
raise ValueError("Can't mix absolute and relative paths") |
|
elif drive_lower != common_drive_lower: |
|
raise ValueError("Paths don't have the same drive") |
|
|
|
if parts_lower < min_parts_lower: |
|
min_parts_lower = parts_lower |
|
elif parts_lower > max_parts_lower: |
|
max_parts_lower = parts_lower |
|
|
|
for i, c in enumerate(min_parts_lower): |
|
if c != max_parts_lower[i]: |
|
del common_parts[i:] |
|
break |
|
else: |
|
del common_parts[len(min_parts_lower):] |
|
|
|
prefix = (common_drive + sep) if isabs else common_drive |
|
return prefix + sep.join(common_parts) |
|
except (TypeError, AttributeError): |
|
genericpath._check_arg_types('commonpath', paths_0, paths_i) |
|
raise |
|
|
|
|
|
def posixpath_commonpath_old(paths): |
|
"""Given a sequence of path names, returns the longest common sub-path.""" |
|
|
|
if not paths: |
|
raise ValueError('commonpath() arg is an empty sequence') |
|
|
|
paths = tuple(map(os.fspath, paths)) |
|
if isinstance(paths[0], bytes): |
|
sep = b'/' |
|
curdir = b'.' |
|
else: |
|
sep = '/' |
|
curdir = '.' |
|
|
|
try: |
|
split_paths = [path.split(sep) for path in paths] |
|
|
|
try: |
|
isabs, = set(p[:1] == sep for p in paths) |
|
except ValueError: |
|
raise ValueError("Can't mix absolute and relative paths") from None |
|
|
|
split_paths = [[c for c in s if c and c != curdir] for s in split_paths] |
|
s1 = min(split_paths) |
|
s2 = max(split_paths) |
|
common = s1 |
|
for i, c in enumerate(s1): |
|
if c != s2[i]: |
|
common = s1[:i] |
|
break |
|
|
|
prefix = sep if isabs else sep[:0] |
|
return prefix + sep.join(common) |
|
except (TypeError, AttributeError): |
|
genericpath._check_arg_types('commonpath', *paths) |
|
raise |
|
|
|
|
|
def posixpath_commonpath_new(paths): |
|
"""Given an iterable of path names, returns the longest common sub-path.""" |
|
try: |
|
paths = iter(paths) |
|
except TypeError: |
|
# TODO: should be a TypeError, but not backward compatible. |
|
raise ValueError('commonpath() arg cannot be iterated') from None |
|
|
|
try: |
|
paths_0 = next(paths) |
|
except StopIteration: |
|
raise ValueError('commonpath() arg is an empty sequence') from None |
|
|
|
paths_0 = os.fspath(paths_0) |
|
paths_rest = map(os.fspath, paths) |
|
|
|
if isinstance(paths_0, bytes): |
|
sep = b'/' |
|
curdir = b'.' |
|
else: |
|
sep = '/' |
|
curdir = '.' |
|
|
|
isabs = paths_0[:1] == sep |
|
min_parts = max_parts = [c for c in paths_0.split(sep) if c and c != curdir] |
|
|
|
try: |
|
for paths_i in paths_rest: |
|
parts = [c for c in paths_i.split(sep) if c and c != curdir] |
|
|
|
if (paths_i[:1] == sep) != isabs: |
|
raise ValueError("Can't mix absolute and relative paths") |
|
|
|
if parts < min_parts: |
|
min_parts = parts |
|
elif parts > max_parts: |
|
max_parts = parts |
|
|
|
common_parts = min_parts |
|
for i, c in enumerate(common_parts): |
|
if c != max_parts[i]: |
|
del common_parts[i:] |
|
break |
|
|
|
prefix = sep if isabs else sep[:0] |
|
return prefix + sep.join(common_parts) |
|
except (TypeError, AttributeError): |
|
genericpath._check_arg_types('commonpath', paths_0, paths_i) |
|
raise |
|
|
|
|
|
class BenchmarkCaseResult(NamedTuple): |
|
exec_time: float |
|
mem_peak: float |
|
|
|
|
|
class BenchmarkResult(NamedTuple): |
|
worst_case: BenchmarkCaseResult |
|
best_case: BenchmarkCaseResult |
|
|
|
|
|
class Benchmark: |
|
def __init__(self, batch_size, part_size_range, parts_count_range, old_func, new_func): |
|
self.batch_size = batch_size |
|
self.part_size_range = part_size_range |
|
self.parts_count_range = parts_count_range |
|
self.old_func = old_func |
|
self.new_func = new_func |
|
|
|
def gen_test_batches(self, batch_size, part_size_range, parts_count_range): |
|
for part_size, parts_count in itertools.product(part_size_range, parts_count_range): |
|
for common_parts_count in range(parts_count + 1): |
|
batch = [] |
|
|
|
chunks_count = 1 + parts_count - common_parts_count |
|
chunk_size = batch_size // chunks_count # reminder is accounted for below |
|
template = [ord('a') + i for i in range(parts_count)] |
|
|
|
for i in range(chunks_count): |
|
if i != 0: |
|
template[-i] += 1 |
|
|
|
path = '/'.join(chr(c) * part_size for c in template) |
|
|
|
if i < batch_size % chunks_count: |
|
batch.extend([path.lower() for _ in range(chunk_size + 1)]) # enforce true copying |
|
else: |
|
batch.extend([path.lower() for _ in range(chunk_size)]) |
|
|
|
assert len(batch) == batch_size |
|
assert len(batch) == len(set(map(id, batch))) |
|
|
|
yield (part_size, parts_count, common_parts_count), batch |
|
|
|
def measure(self, func, batch, repeat=3, number=8): |
|
globals = {'func': func} |
|
|
|
def measure_case(batch_case): |
|
globals['case'] = batch_case |
|
exec_time = min(timeit.repeat('func(case)', repeat=repeat, number=number, globals=globals)) / number |
|
|
|
tracemalloc.start() |
|
tracemalloc.reset_peak() |
|
func(batch_case) |
|
mem_peak = float(tracemalloc.get_traced_memory()[1]) |
|
tracemalloc.stop() |
|
|
|
return BenchmarkCaseResult(exec_time, mem_peak) |
|
|
|
# common parts are not found until the last chunk |
|
batch_worst_case = sorted(batch) |
|
|
|
# common parts are found on the second iteration |
|
batch_best_case = list(batch_worst_case) |
|
batch_best_case[1], batch_best_case[-1] = batch_best_case[-1], batch_best_case[1] |
|
|
|
r = BenchmarkResult( |
|
worst_case=measure_case(batch_worst_case), |
|
best_case=measure_case(batch_best_case), |
|
) |
|
|
|
return r |
|
|
|
def aggregate_gains(self, old, new): |
|
kwargs = {} |
|
|
|
for f in BenchmarkResult._fields: |
|
old_case_result = getattr(old, f) |
|
new_case_result = getattr(new, f) |
|
|
|
kwargs[f] = BenchmarkCaseResult( |
|
exec_time=new_case_result.exec_time / old_case_result.exec_time, |
|
mem_peak=new_case_result.mem_peak / old_case_result.mem_peak |
|
) |
|
|
|
return BenchmarkResult(**kwargs) |
|
|
|
def aggregate_stats(self, gains): |
|
kwargs = {} |
|
|
|
for f in BenchmarkResult._fields: |
|
kwargs[f] = BenchmarkCaseResult( |
|
exec_time=round((statistics.geometric_mean(getattr(b, f).exec_time for b in gains) - 1) * 100, ndigits=1), |
|
mem_peak=round((statistics.geometric_mean(getattr(b, f).mem_peak for b in gains) - 1) * 100, ndigits=1) |
|
) |
|
|
|
return BenchmarkResult(**kwargs) |
|
|
|
def run(self): |
|
old = {} |
|
new = {} |
|
for key, batch in self.gen_test_batches(self.batch_size, self.part_size_range, self.parts_count_range): |
|
old[key] = self.measure(self.old_func, batch) |
|
new[key] = self.measure(self.new_func, batch) |
|
|
|
gains = {} |
|
for key in old: |
|
common_parts_count = key[2] |
|
gains.setdefault(common_parts_count, []).append(self.aggregate_gains(old[key], new[key])) |
|
|
|
stats = {} |
|
for key in gains: |
|
stats[key] = self.aggregate_stats(gains[key]) |
|
|
|
return stats |
|
|
|
|
|
bench = Benchmark(1000, range(16, 65, 16), range(4, 17, 4), ntpath_commonpath_old, ntpath_commonpath_new) |
|
print(f"\t \tEXEC\tPEAK MEM") |
|
for common_parts_count, benchmark in bench.run().items(): |
|
print(f"{common_parts_count} common parts:") |
|
print(f"\tworst case\t{benchmark.worst_case.exec_time:+}%\t{benchmark.worst_case.mem_peak:+}%") |
|
print(f"\tbest case\t{benchmark.best_case.exec_time:+}%\t{benchmark.best_case.mem_peak:+}%") |