A Queryable List for Python 3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
from itertools import islice, takewhile, dropwhile | |
from types import FunctionType, BuiltinFunctionType, BuiltinMethodType | |
class QueryableList(list): | |
""" | |
Lists made awesome! | |
""" | |
def __init__(self, iterable): | |
# Call the base list class __init__() | |
super().__init__(iterable) | |
# List of supported function types and attributes | |
self._function_types = [FunctionType, BuiltinFunctionType, BuiltinMethodType] | |
def _is_function(self, function): | |
""" | |
Helper method that checks if a given function is of the supported function types. | |
:param function: The function to check. | |
:return: Boolean. | |
""" | |
for _type in self._function_types: | |
if not isinstance(function, _type) and not hasattr(function, "__call__"): | |
return False | |
return True | |
def first_or_default(self, predicate, default=None): | |
""" | |
Returns the first match of a predicate or "default" if no match was found. | |
:param predicate: A function that accepts one value and returns a boolean. | |
:param default: Default value to return if no match was found. | |
:return: Either the firstly matched value or default. | |
""" | |
# Normalize the boolean finding method | |
match = (lambda x: predicate(x)) if self._is_function(predicate) else (lambda y: y == predicate) | |
# Process depending on type of predicate | |
return next(filter(match, iter(self)), default) | |
def last_or_default(self, predicate, default=None): | |
""" | |
Returns the lastly matched of a predicate or "default" if no match was found. | |
:param predicate: A function that accepts one value and returns a boolean. | |
:param default: Default value to return if no match was found. | |
:return: Either the lastly matched value or default. | |
""" | |
# Normalize the boolean finding method | |
match = (lambda x: predicate(x)) if self._is_function(predicate) else (lambda y: y == predicate) | |
# Process | |
last_item = default | |
for it in iter(self): | |
last_item = it if match(it) else last_item | |
return last_item | |
def filter(self, predicate): | |
""" | |
Processes all items in the collection and takes only the ones that match the predicate. | |
:param predicate: A function that accepts one value and returns a boolean. | |
:return: A new QueryableList. | |
""" | |
# Validate predicate | |
if not self._is_function(predicate): | |
raise ValueError("Predicate must be a function.") | |
# Process | |
return QueryableList(filter(predicate, iter(self))) | |
def filter_out(self, predicate): | |
""" | |
Processes all items in the collection and takes only the ones that do not match the predicate, leaving the | |
ones that match the predicate out. | |
:param predicate: A function that accepts one value and returns a boolean. | |
:return: A new QueryableList. | |
""" | |
# Validate predicate | |
if not self._is_function(predicate): | |
raise ValueError("Predicate must be a function.") | |
# Process | |
# filter out matches method | |
is_not_a_match = lambda item: not predicate(item) | |
return QueryableList(filter(is_not_a_match, iter(self))) | |
def filter_not_none(self, predicate=None): | |
""" | |
Processes all items in the collection and takes only the ones that are both not None and match the predicate. | |
If the predicate was not given, this method filters out the None values from the collection. | |
:param predicate: A function that accepts one value and returns a boolean. | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if predicate and not self._is_function(predicate): | |
raise ValueError("Predicate can only be either None or a function.") | |
# Process | |
if not predicate: | |
return QueryableList(filter(lambda item: item is not None, iter(self))) | |
else: | |
return QueryableList(filter(lambda item: item is not None and predicate(item), iter(self))) | |
def filter_by_type(self, instance_type): | |
""" | |
Processes all items in the collection and takes only the ones that are instances of a given class or type. | |
:param instance_type: The class or type to filter for. | |
:return: A new QueryableList. | |
""" | |
# Validate instance_type | |
if instance_type is None: | |
raise ValueError("Instance Type cannot be None.") | |
# Process | |
is_instance_of = lambda item: isinstance(item, instance_type) | |
try: | |
return QueryableList(self.filter(is_instance_of)) | |
except TypeError: | |
raise ValueError("Instance Type is neither a class nor a type.") | |
def filter_out_by_type(self, instance_type): | |
""" | |
Processes all items in the collection and takes only the ones that are not instances of a given class or type. | |
:param instance_type: The class or type to filter against. | |
:return: A new QueryableList. | |
""" | |
# Validate instance_type | |
if instance_type is None: | |
raise ValueError("Instance Type cannot be None.") | |
# Process | |
is_not_instance_of = lambda item: not isinstance(item, instance_type) | |
try: | |
return QueryableList(self.filter(is_not_instance_of)) | |
except TypeError: | |
raise ValueError("Instance Type is neither a class nor a type.") | |
def map(self, function): | |
""" | |
Processes all items in the collection, applying the given function to every element. | |
:param function: A function that maps a single value to another one. | |
:return: A new QueryableList. | |
""" | |
# Validate function | |
if not self._is_function(function): | |
raise ValueError("Function must be a function.") | |
# Process | |
return QueryableList(map(function, iter(self))) | |
def map_not_none(self, function): | |
""" | |
Processes all items in the collection that are not None, applying the given function to every element. | |
:param function: A function that maps a single value to another one. | |
:return: A new QueryableList. | |
""" | |
# Validate function | |
if not self._is_function(function): | |
raise ValueError("Function must be a function.") | |
# Process | |
return self.filter_not_none().map(function) | |
def reduce(self, function, initial_value=None): | |
""" | |
Reduces the collection to a single value using a given function and an initial value. If the initial value is | |
None, the first item in the collection will be considered the initial value. | |
:param function: A function that reduces two values to a single value. | |
:param initial_value: The initial_value to use with function. | |
:return: A single value. The result of reducing the collection with "function" and "identity". | |
""" | |
# Validate function | |
if not self._is_function(function): | |
raise ValueError("Function must be a function.") | |
# Process | |
if initial_value is None: | |
return reduce(function, iter(self)) | |
else: | |
return reduce(function, iter(self), initial_value) | |
def take(self, number): | |
""" | |
Returns a specific number of elements from the head of the collection (starting at the beginning). | |
:param number: A positive integer denoting the number of elements to return. | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if not number or not isinstance(number, int) or number <= 0: | |
raise ValueError("Number must be a positive integer.") | |
# Process | |
return QueryableList(islice(iter(self), number)) | |
def take_last(self, number): | |
""" | |
Returns a specific number of elements from the tail of the collection. | |
:param number: A positive integer denoting the number of elements to return. | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if not number or not isinstance(number, int) or number <= 0: | |
raise ValueError("Number must be a positive integer.") | |
def helper(): | |
return list(iter(self))[-abs(number):] | |
# Process | |
return QueryableList(iter(helper())) | |
def skip(self, number): | |
""" | |
Drops the first number of elements from the collection, starting at the beginning. | |
:param number: A positive integer denoting the number of elements to drop. | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if not number or not isinstance(number, int) or number <= 0: | |
raise ValueError("Number must be a positive integer.") | |
# Process | |
return QueryableList(islice(iter(self), number, None)) | |
def take_while(self, predicate): | |
""" | |
Takes all elements from the collection that match the criterion, the moment it finds the first value that does | |
not match the criterion it will stop. | |
:param predicate: A function that accepts one value and returns a boolean | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if not predicate or not isinstance(predicate, FunctionType): | |
raise ValueError("Predicate must be a function.") | |
# Process | |
return QueryableList(takewhile(predicate, iter(self))) | |
def drop_while(self, predicate): | |
""" | |
Drops elements from the collection that match the criterion, the moment it finds an an element that does not | |
match the criterion it will stop. | |
:param predicate: A function that accepts one value and returns a boolean | |
:return: A new QueryableList. | |
""" | |
# Validate input | |
if not predicate or not isinstance(predicate, FunctionType): | |
raise ValueError("Predicate must be a function.") | |
# Process | |
return QueryableList(dropwhile(predicate, iter(self))) | |
def distinct(self): | |
""" | |
Returns the unique elements from the collection. | |
:return: A new QueryableList. | |
""" | |
def helper(): | |
seen = set() | |
for item in iter(self): | |
if item not in seen: | |
seen.add(item) | |
yield item | |
del seen | |
# Process | |
return QueryableList(helper()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import operator | |
from queryable_collections.queryable_list import QueryableList | |
### | |
# Predicates | |
less_than_0 = lambda x: x < 0 | |
less_than_10 = lambda x: x < 10 | |
multiples_of_2 = lambda x: x % 2 == 0 | |
multiples_of_3 = lambda x: x % 3 == 0 | |
values_i_dont_like = lambda x: multiples_of_2(x) or multiples_of_3(x) | |
merge_tuples = lambda x, y: x + y | |
add_one = lambda x: x + 1 | |
to_square = lambda x: x * x | |
divide_by_5 = lambda x: float(x / 5.0) | |
divide_by_10 = lambda x: float(x / 10.0) | |
to_int = lambda string: dict(one=1, two=2, three=3, four=4, five=5, six=6).get(string, None) | |
to_str = lambda integer: {1: 'one', 2: 'two', 3: 'three', 4: 'four', 5: 'five', 6: 'six'}.get(integer, None) | |
### | |
# Collection #1, this one contains duplicates | |
# | |
collection1 = QueryableList(list(range(-1000, 1000)) + list(range(-1000, 1000))) | |
query1 = collection1.distinct().map(divide_by_10).filter(multiples_of_2).skip(10).take(10) | |
query2 = collection1.distinct().map(to_square).filter(multiples_of_3).take_last(10) | |
query3 = collection1.distinct().map(divide_by_10).filter(multiples_of_2).take_last(15) | |
query4 = collection1.distinct().drop_while(less_than_0).map(divide_by_5).filter_out(values_i_dont_like).skip(10).take(15) | |
query5 = collection1.distinct().take_while(less_than_0).map(to_square).reduce(operator.add) | |
### | |
# Collection #2 | |
# | |
collection2 = QueryableList([ | |
1, 2, 3, 4, 5, 6, # Integers | |
1.0001, 2.0002, 3.0003, 4.0004, 5.0005, 6.0006, # Floats | |
"one", "two", "three", "four", "five", "six", # Strings | |
(1, 2, 3), # Tuple | |
(4, 5, 6) # Tuple | |
]) | |
query6 = collection2.filter_by_type(int).map(add_one).reduce(operator.mul) | |
query7 = collection2.filter_by_type(float).map(to_square).reduce(operator.add) | |
query8 = collection2.filter_by_type(str).map(to_int).map(to_square).filter_out(less_than_10) | |
query9_a = collection2.filter_by_type(tuple).reduce(merge_tuples) | |
query9_b = QueryableList(query9_a).reduce(operator.add) | |
### | |
# Prepare Testing Data | |
# | |
assertions = [ | |
{'name': 'Query #1', 'value': query1, 'expected': [-80.0, -78.0, -76.0, -74.0, -72.0, -70.0, -68.0, -66.0, -64.0, -62.0]}, | |
{'name': 'Query #2', 'value': query2, 'expected': [944784, 950625, 956484, 962361, 968256, 974169, 980100, 986049, 992016, 998001]}, | |
{'name': 'Query #3', 'value': query3, 'expected': [70.0, 72.0, 74.0, 76.0, 78.0, 80.0, 82.0, 84.0, 86.0, 88.0, 90.0, 92.0, 94.0, 96.0, 98.0]}, | |
{'name': 'Query #4', 'value': query4, 'expected': [2.4, 2.6, 2.8, 3.2, 3.4, 3.6, 3.8, 4.2, 4.4, 4.6, 4.8, 5.0, 5.2, 5.4, 5.6]}, | |
{'name': 'Query #5', 'value': query5, 'expected': 333833500}, | |
{'name': 'Query #6', 'value': query6, 'expected': 5040}, | |
{'name': 'Query #7', 'value': query7, 'expected': 91.01820091}, | |
{'name': 'Query #8', 'value': query8, 'expected': [16, 25, 36]}, | |
{'name': 'Query #9.A', 'value': query9_a, 'expected': (1, 2, 3, 4, 5, 6)}, | |
{'name': 'Query #9.B', 'value': query9_b, 'expected': 21} | |
] | |
### | |
# Print Results and do some basic equality assertions | |
# | |
for assertion in assertions: | |
if assertion['value'] != assertion['expected']: | |
raise ValueError("Wrong query result at {}".format(assertion['name'])) | |
print("[*]\t{}:\r\n\tQuery result: {}\r\n\tExpected result: {}\r\n\tLength of query result: {}\r\n".format( | |
assertion['name'], assertion['value'], assertion['expected'], | |
len(assertion['value']) if hasattr(assertion['value'], "__len__") else 1)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Excellent!