|
"""Compare defined functions to lambdas.""" |
|
from functools import partial |
|
from operator import add |
|
from random import choice |
|
import importlib |
|
import logging |
|
import multiprocessing |
|
import sys |
|
import time |
|
import timeit |
|
|
|
(SPACE, EMPTYSTRING, DOUBLEQUOTE, ) = (' ', '', '"',) |
|
|
|
join = lambda delimiter, *args: delimiter.join(args) |
|
delimiters = (EMPTYSTRING, SPACE, ) |
|
empty_join, space_join = tuple(partial(join, delimiter) |
|
for delimiter in delimiters) |
|
|
|
|
|
def echo(message): |
|
"""Wrap logger.""" |
|
logger.info(message) |
|
|
|
# because of IPython having already imported |
|
importlib.reload(logging) |
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO) |
|
echo('Logger is working.') |
|
|
|
LENGTH_LIMIT, MAX_NUMBER = 50, 1000 |
|
|
|
|
|
def imperative_random_numbers_with_list_check( |
|
length_limit=LENGTH_LIMIT, |
|
maximum_value=MAX_NUMBER |
|
): |
|
"""Create a sorted set of pseudorandom numbers. |
|
|
|
:length_limit: max length of list returned |
|
:maximum_value: max value of a number in the list |
|
""" |
|
chosen = [] |
|
|
|
def random_number(): |
|
while True: |
|
number = choice(range(maximum_value + 1)) |
|
if number not in chosen: |
|
chosen.append(number) |
|
break |
|
return number |
|
|
|
return sorted(random_number() for _ in range(length_limit)) |
|
|
|
|
|
def imperative_random_numbers_without_list_check( |
|
length_limit=LENGTH_LIMIT, |
|
maximum_value=MAX_NUMBER |
|
): |
|
"""Create a sorted set of pseudorandom numbers. |
|
|
|
:length_limit: max length of list returned |
|
:maximum_value: max value of a number in the list |
|
""" |
|
def get_numbers(): |
|
return [choice(range(maximum_value + 1)) |
|
for _ in range(length_limit)] |
|
|
|
numbers = get_numbers() |
|
while len(numbers) != length_limit: |
|
numbers = get_numbers() |
|
return sorted(numbers) |
|
|
|
|
|
def broken_sorted_random_numbers( |
|
length_limit=LENGTH_LIMIT, |
|
maximum_value=MAX_NUMBER |
|
): |
|
"""Create a sorted collection of pseudorandom numbers that fails the test. |
|
|
|
:length_limit: max length of list returned |
|
:maximum_value: max value of a number in the list |
|
""" |
|
chosen = [] |
|
|
|
def random_number(): |
|
while True: |
|
number = choice(range(maximum_value)) + maximum_value |
|
if number not in chosen: |
|
chosen.append(number) |
|
break |
|
return number |
|
|
|
numbers = sorted(random_number() for _ in range(10)) |
|
return numbers + numbers # break unique test |
|
|
|
|
|
def test_sorted_list(f, sample_size=10000): |
|
"""Test function f to see that its return value matches some criteria. |
|
|
|
:f: A function that returns a random list of sorted numbers. |
|
""" |
|
lists_of_sorted_random_numbers = tuple(f() for _ in range(sample_size)) |
|
|
|
correct_length = lambda item: len(item) == LENGTH_LIMIT |
|
unique = lambda item: sorted(list(set(item))) == item |
|
correct_max_value = lambda item:\ |
|
all(tuple(number <= MAX_NUMBER for number in item)) |
|
|
|
check = (all(( |
|
correct_length(item), |
|
unique(item), |
|
correct_max_value(item))) |
|
for item |
|
in lists_of_sorted_random_numbers) |
|
try: |
|
assert all(tuple(check)) |
|
return True |
|
except AssertionError: |
|
# add labels to test to view which ones are failing |
|
check = tuple( |
|
( |
|
('length test', correct_length(item)), |
|
('unique test', unique(item)), |
|
('max test', correct_max_value(item)) |
|
) |
|
for item |
|
in lists_of_sorted_random_numbers) |
|
return set(check) |
|
|
|
echo(space_join( |
|
'imperative_random_numbers_with_list_check', |
|
repr(imperative_random_numbers_with_list_check()) |
|
)) |
|
echo(space_join( |
|
'imperative_random_numbers_without_list_check', |
|
repr(imperative_random_numbers_without_list_check()) |
|
)) |
|
|
|
assert test_sorted_list(imperative_random_numbers_with_list_check) |
|
|
|
broken_result = test_sorted_list(broken_sorted_random_numbers) |
|
assert broken_result == \ |
|
{(('length test', False), ('unique test', False), ('max test', False))} |
|
|
|
# begin functional style using lambdas |
|
equals_limit = lambda list_, length_limit=LENGTH_LIMIT: \ |
|
len(list_) == length_limit |
|
|
|
random_number = lambda max_number=MAX_NUMBER: \ |
|
choice(range(max_number + 1)) |
|
|
|
random_list = lambda length_limit=LENGTH_LIMIT: \ |
|
list(set([random_number() for _ in range(length_limit)])) |
|
|
|
get_list = lambda list_:\ |
|
sorted(list_)\ |
|
if equals_limit(list_)\ |
|
else get_list(random_list()) |
|
|
|
lambda_random_numbers_sans_list_check = lambda: get_list(random_list()) |
|
|
|
|
|
echo(space_join( |
|
'lambda_random_numbers_sans_list_check', |
|
repr(lambda_random_numbers_sans_list_check()) |
|
)) |
|
assert test_sorted_list(lambda_random_numbers_sans_list_check) |
|
|
|
# ## Rewrite functional version to hold list state. |
|
|
|
|
|
get_list = lambda list_=None: list_ or [] |
|
get_number = lambda list_, number:\ |
|
get_number(get_list(list_), random_number())\ |
|
if number in list_\ |
|
else (list_, number) |
|
|
|
append = lambda list_, number:\ |
|
add(list_, [number]) |
|
|
|
number_list = lambda list_:\ |
|
append(*get_number(list_ or [], random_number())) |
|
|
|
def random_numbers(list_=None): |
|
"""Return numbers.""" |
|
numbers = number_list(list_) |
|
return numbers\ |
|
if equals_limit(numbers)\ |
|
else random_numbers(numbers) |
|
|
|
lambda_random_numbers_with_list_check = lambda: sorted(random_numbers()) |
|
echo(space_join( |
|
'lambda_random_numbers_with_list_check', |
|
repr(lambda_random_numbers_with_list_check()) |
|
)) |
|
assert test_sorted_list(lambda_random_numbers_with_list_check) |
|
|
|
|
|
template = 'from __main__ import {}' |
|
function_names = ( |
|
'imperative_random_numbers_with_list_check', |
|
'imperative_random_numbers_without_list_check', |
|
'lambda_random_numbers_sans_list_check', |
|
'lambda_random_numbers_with_list_check', |
|
) |
|
setups = tuple(template.format(item) |
|
for item in function_names) |
|
|
|
# Used to prevent multiple threads from mixing their output |
|
GLOBALLOCK = multiprocessing.Lock() |
|
|
|
|
|
def worker_func(args): |
|
"""Set up worker function.""" |
|
label, function_name, setup = args |
|
result = min( |
|
timeit.Timer('{}()'.format(function_name), setup=setup).repeat( |
|
10, 10000)) |
|
|
|
GLOBALLOCK.acquire() |
|
echo(space_join(label, str(result))) |
|
GLOBALLOCK.release() |
|
|
|
|
|
def time_them_async(argp=None): |
|
"""Multiprocessing example.""" |
|
pool = multiprocessing.Pool() |
|
func_args = tuple(zip(function_names, function_names, setups)) |
|
try: |
|
pool.map_async(worker_func, func_args).get(9999999) |
|
except KeyboardInterrupt: |
|
# Allow ^C to interrupt from any thread. |
|
sys.stdout.write('\033[0m') |
|
sys.stdout.write('User Interupt\n') |
|
pool.close() |
|
|
|
if __name__ == '__main__': |
|
start = time.time() |
|
time_them_async() |
|
total_elapsed_time = time.time() - start |
|
echo(space_join('total_elapsed_time:', str(total_elapsed_time))) |