Skip to content

Instantly share code, notes, and snippets.

@aalhour
Last active March 25, 2016 03:03
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save aalhour/f80b874add5c34f7c8b7 to your computer and use it in GitHub Desktop.
A Queryable List for Python 3
from functools import reduce
from itertools import islice, takewhile, dropwhile
from types import FunctionType, BuiltinFunctionType, BuiltinMethodType
class QueryableList(list):
"""
Lists made awesome!
"""
def __init__(self, iterable):
# Call the base list class __init__()
super().__init__(iterable)
# List of supported function types and attributes
self._function_types = [FunctionType, BuiltinFunctionType, BuiltinMethodType]
def _is_function(self, function):
"""
Helper method that checks if a given function is of the supported function types.
:param function: The function to check.
:return: Boolean.
"""
for _type in self._function_types:
if not isinstance(function, _type) and not hasattr(function, "__call__"):
return False
return True
def first_or_default(self, predicate, default=None):
"""
Returns the first match of a predicate or "default" if no match was found.
:param predicate: A function that accepts one value and returns a boolean.
:param default: Default value to return if no match was found.
:return: Either the firstly matched value or default.
"""
# Normalize the boolean finding method
match = (lambda x: predicate(x)) if self._is_function(predicate) else (lambda y: y == predicate)
# Process depending on type of predicate
return next(filter(match, iter(self)), default)
def last_or_default(self, predicate, default=None):
"""
Returns the lastly matched of a predicate or "default" if no match was found.
:param predicate: A function that accepts one value and returns a boolean.
:param default: Default value to return if no match was found.
:return: Either the lastly matched value or default.
"""
# Normalize the boolean finding method
match = (lambda x: predicate(x)) if self._is_function(predicate) else (lambda y: y == predicate)
# Process
last_item = default
for it in iter(self):
last_item = it if match(it) else last_item
return last_item
def filter(self, predicate):
"""
Processes all items in the collection and takes only the ones that match the predicate.
:param predicate: A function that accepts one value and returns a boolean.
:return: A new QueryableList.
"""
# Validate predicate
if not self._is_function(predicate):
raise ValueError("Predicate must be a function.")
# Process
return QueryableList(filter(predicate, iter(self)))
def filter_out(self, predicate):
"""
Processes all items in the collection and takes only the ones that do not match the predicate, leaving the
ones that match the predicate out.
:param predicate: A function that accepts one value and returns a boolean.
:return: A new QueryableList.
"""
# Validate predicate
if not self._is_function(predicate):
raise ValueError("Predicate must be a function.")
# Process
# filter out matches method
is_not_a_match = lambda item: not predicate(item)
return QueryableList(filter(is_not_a_match, iter(self)))
def filter_not_none(self, predicate=None):
"""
Processes all items in the collection and takes only the ones that are both not None and match the predicate.
If the predicate was not given, this method filters out the None values from the collection.
:param predicate: A function that accepts one value and returns a boolean.
:return: A new QueryableList.
"""
# Validate input
if predicate and not self._is_function(predicate):
raise ValueError("Predicate can only be either None or a function.")
# Process
if not predicate:
return QueryableList(filter(lambda item: item is not None, iter(self)))
else:
return QueryableList(filter(lambda item: item is not None and predicate(item), iter(self)))
def filter_by_type(self, instance_type):
"""
Processes all items in the collection and takes only the ones that are instances of a given class or type.
:param instance_type: The class or type to filter for.
:return: A new QueryableList.
"""
# Validate instance_type
if instance_type is None:
raise ValueError("Instance Type cannot be None.")
# Process
is_instance_of = lambda item: isinstance(item, instance_type)
try:
return QueryableList(self.filter(is_instance_of))
except TypeError:
raise ValueError("Instance Type is neither a class nor a type.")
def filter_out_by_type(self, instance_type):
"""
Processes all items in the collection and takes only the ones that are not instances of a given class or type.
:param instance_type: The class or type to filter against.
:return: A new QueryableList.
"""
# Validate instance_type
if instance_type is None:
raise ValueError("Instance Type cannot be None.")
# Process
is_not_instance_of = lambda item: not isinstance(item, instance_type)
try:
return QueryableList(self.filter(is_not_instance_of))
except TypeError:
raise ValueError("Instance Type is neither a class nor a type.")
def map(self, function):
"""
Processes all items in the collection, applying the given function to every element.
:param function: A function that maps a single value to another one.
:return: A new QueryableList.
"""
# Validate function
if not self._is_function(function):
raise ValueError("Function must be a function.")
# Process
return QueryableList(map(function, iter(self)))
def map_not_none(self, function):
"""
Processes all items in the collection that are not None, applying the given function to every element.
:param function: A function that maps a single value to another one.
:return: A new QueryableList.
"""
# Validate function
if not self._is_function(function):
raise ValueError("Function must be a function.")
# Process
return self.filter_not_none().map(function)
def reduce(self, function, initial_value=None):
"""
Reduces the collection to a single value using a given function and an initial value. If the initial value is
None, the first item in the collection will be considered the initial value.
:param function: A function that reduces two values to a single value.
:param initial_value: The initial_value to use with function.
:return: A single value. The result of reducing the collection with "function" and "identity".
"""
# Validate function
if not self._is_function(function):
raise ValueError("Function must be a function.")
# Process
if initial_value is None:
return reduce(function, iter(self))
else:
return reduce(function, iter(self), initial_value)
def take(self, number):
"""
Returns a specific number of elements from the head of the collection (starting at the beginning).
:param number: A positive integer denoting the number of elements to return.
:return: A new QueryableList.
"""
# Validate input
if not number or not isinstance(number, int) or number <= 0:
raise ValueError("Number must be a positive integer.")
# Process
return QueryableList(islice(iter(self), number))
def take_last(self, number):
"""
Returns a specific number of elements from the tail of the collection.
:param number: A positive integer denoting the number of elements to return.
:return: A new QueryableList.
"""
# Validate input
if not number or not isinstance(number, int) or number <= 0:
raise ValueError("Number must be a positive integer.")
def helper():
return list(iter(self))[-abs(number):]
# Process
return QueryableList(iter(helper()))
def skip(self, number):
"""
Drops the first number of elements from the collection, starting at the beginning.
:param number: A positive integer denoting the number of elements to drop.
:return: A new QueryableList.
"""
# Validate input
if not number or not isinstance(number, int) or number <= 0:
raise ValueError("Number must be a positive integer.")
# Process
return QueryableList(islice(iter(self), number, None))
def take_while(self, predicate):
"""
Takes all elements from the collection that match the criterion, the moment it finds the first value that does
not match the criterion it will stop.
:param predicate: A function that accepts one value and returns a boolean
:return: A new QueryableList.
"""
# Validate input
if not predicate or not isinstance(predicate, FunctionType):
raise ValueError("Predicate must be a function.")
# Process
return QueryableList(takewhile(predicate, iter(self)))
def drop_while(self, predicate):
"""
Drops elements from the collection that match the criterion, the moment it finds an an element that does not
match the criterion it will stop.
:param predicate: A function that accepts one value and returns a boolean
:return: A new QueryableList.
"""
# Validate input
if not predicate or not isinstance(predicate, FunctionType):
raise ValueError("Predicate must be a function.")
# Process
return QueryableList(dropwhile(predicate, iter(self)))
def distinct(self):
"""
Returns the unique elements from the collection.
:return: A new QueryableList.
"""
def helper():
seen = set()
for item in iter(self):
if item not in seen:
seen.add(item)
yield item
del seen
# Process
return QueryableList(helper())
import operator
from queryable_collections.queryable_list import QueryableList
###
# Predicates
less_than_0 = lambda x: x < 0
less_than_10 = lambda x: x < 10
multiples_of_2 = lambda x: x % 2 == 0
multiples_of_3 = lambda x: x % 3 == 0
values_i_dont_like = lambda x: multiples_of_2(x) or multiples_of_3(x)
merge_tuples = lambda x, y: x + y
add_one = lambda x: x + 1
to_square = lambda x: x * x
divide_by_5 = lambda x: float(x / 5.0)
divide_by_10 = lambda x: float(x / 10.0)
to_int = lambda string: dict(one=1, two=2, three=3, four=4, five=5, six=6).get(string, None)
to_str = lambda integer: {1: 'one', 2: 'two', 3: 'three', 4: 'four', 5: 'five', 6: 'six'}.get(integer, None)
###
# Collection #1, this one contains duplicates
#
collection1 = QueryableList(list(range(-1000, 1000)) + list(range(-1000, 1000)))
query1 = collection1.distinct().map(divide_by_10).filter(multiples_of_2).skip(10).take(10)
query2 = collection1.distinct().map(to_square).filter(multiples_of_3).take_last(10)
query3 = collection1.distinct().map(divide_by_10).filter(multiples_of_2).take_last(15)
query4 = collection1.distinct().drop_while(less_than_0).map(divide_by_5).filter_out(values_i_dont_like).skip(10).take(15)
query5 = collection1.distinct().take_while(less_than_0).map(to_square).reduce(operator.add)
###
# Collection #2
#
collection2 = QueryableList([
1, 2, 3, 4, 5, 6, # Integers
1.0001, 2.0002, 3.0003, 4.0004, 5.0005, 6.0006, # Floats
"one", "two", "three", "four", "five", "six", # Strings
(1, 2, 3), # Tuple
(4, 5, 6) # Tuple
])
query6 = collection2.filter_by_type(int).map(add_one).reduce(operator.mul)
query7 = collection2.filter_by_type(float).map(to_square).reduce(operator.add)
query8 = collection2.filter_by_type(str).map(to_int).map(to_square).filter_out(less_than_10)
query9_a = collection2.filter_by_type(tuple).reduce(merge_tuples)
query9_b = QueryableList(query9_a).reduce(operator.add)
###
# Prepare Testing Data
#
assertions = [
{'name': 'Query #1', 'value': query1, 'expected': [-80.0, -78.0, -76.0, -74.0, -72.0, -70.0, -68.0, -66.0, -64.0, -62.0]},
{'name': 'Query #2', 'value': query2, 'expected': [944784, 950625, 956484, 962361, 968256, 974169, 980100, 986049, 992016, 998001]},
{'name': 'Query #3', 'value': query3, 'expected': [70.0, 72.0, 74.0, 76.0, 78.0, 80.0, 82.0, 84.0, 86.0, 88.0, 90.0, 92.0, 94.0, 96.0, 98.0]},
{'name': 'Query #4', 'value': query4, 'expected': [2.4, 2.6, 2.8, 3.2, 3.4, 3.6, 3.8, 4.2, 4.4, 4.6, 4.8, 5.0, 5.2, 5.4, 5.6]},
{'name': 'Query #5', 'value': query5, 'expected': 333833500},
{'name': 'Query #6', 'value': query6, 'expected': 5040},
{'name': 'Query #7', 'value': query7, 'expected': 91.01820091},
{'name': 'Query #8', 'value': query8, 'expected': [16, 25, 36]},
{'name': 'Query #9.A', 'value': query9_a, 'expected': (1, 2, 3, 4, 5, 6)},
{'name': 'Query #9.B', 'value': query9_b, 'expected': 21}
]
###
# Print Results and do some basic equality assertions
#
for assertion in assertions:
if assertion['value'] != assertion['expected']:
raise ValueError("Wrong query result at {}".format(assertion['name']))
print("[*]\t{}:\r\n\tQuery result: {}\r\n\tExpected result: {}\r\n\tLength of query result: {}\r\n".format(
assertion['name'], assertion['value'], assertion['expected'],
len(assertion['value']) if hasattr(assertion['value'], "__len__") else 1))
@Jdsleppy
Copy link

Excellent!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment