Last active
December 14, 2018 22:00
-
-
Save malcolmgreaves/91734551fd7271d40a6df92b53c85a26 to your computer and use it in GitHub Desktop.
Data type class for Python's Iterable & pySpark's RDD. Inspired by the Scala Data typeclass: https://github.com/malcolmgreaves/data-tc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from typing import TypeVar, Generic, Iterable, Union, Callable, Sequence, Optional, Iterator, Tuple | |
from pyspark.rdd import RDD | |
A = TypeVar('A') | |
B = TypeVar('B') | |
def new_instance(elements: Union[RDD[A], Iterable[A], Callable[[], Iterable[A]]], | |
) -> 'DataTypeclass[A]': | |
"""Constructs an appropriate implementation given the input data's type. | |
""" | |
if isinstance(elements, RDD): | |
return RddDataEvidence(elements) | |
else: | |
return LocalDataEvidence(elements) | |
class DataTypeclass(Generic[A]): | |
"""A one-size-fits-all API for data transformations. | |
""" | |
def map(self, apply: Callable[[A], B]) -> 'DataTypeclass[B]': | |
"""Apply the function to every element in the data. | |
""" | |
raise NotImplementedError | |
def map_partition(self, apply: Callable[[Iterator[A]], Iterator[B]]) -> 'DataTypeclass[B]': | |
"""Apply the function to each partition of the internal data. | |
""" | |
raise NotImplementedError | |
def flat_map(self, apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'DataTypeclass[B]': | |
"""Apply the function to every element, flattening results as appropriate. | |
""" | |
raise NotImplementedError | |
def aggregate(self, | |
zero: B, | |
seq_op: Callable[[B, A], B], | |
comb_op: Callable[[B, B], B]) -> B: | |
raise NotImplementedError | |
def reduce(self, comb_op: Callable[[A, A], A]) -> A: | |
"""Combine elements together, yielding a single value. | |
NOTE: The combination operator, `comb_ob`, must be a commutative and associative operation. | |
""" | |
raise NotImplementedError | |
def collect(self) -> Sequence[A]: | |
"""Obtain all data elements into a single indexable sequence. | |
NOTE: There are no ordering guarentees on the resulting `Sequence`. | |
""" | |
raise NotImplementedError | |
def take(self, n: int) -> Sequence[A]: | |
"""Obtain at most `n` arbitrary distinct elements from the data. | |
NOTE: Will obtain less than `n` elements iff `n` > :func:`size()`. | |
""" | |
raise NotImplementedError | |
def group_by(self, key: Callable[[A], B]) -> 'DataTypeclass[Tuple[B, Sequence[A]]]': | |
"""Group data elements that have the same value using the `key` function. | |
NOTE: The groups have arbitrary ordering. | |
""" | |
raise NotImplementedError | |
def iter(self) -> Iterator[A]: | |
"""Iterate through all data elements, in an arbitrary order. | |
""" | |
raise NotImplementedError | |
def __iter__(self) -> Iterator[A]: | |
"""Alias for :func:`iter`. | |
""" | |
return self.iter() | |
def size() -> int: | |
"""The number of distinct data elements. | |
""" | |
raise NotImplementedError | |
def __len__(self) -> int: | |
"""Alias for :func:`size`. | |
""" | |
return self.size() | |
class Local(DataTypeclass[A]): | |
def __init__(self, source: Union[Iterable[A], Callable[[], Iterable[A]]]) -> None: | |
if isinstance(source, Iterable): | |
self._source = tuple(source) | |
self._size = len(self._source) | |
elif callable(source): | |
self._source = source | |
self._size = -1 | |
else: | |
raise TypeError(f"Unexpected source type: not a Sequence nor a () -> Iterable: " | |
f"{type(source)}") | |
def _elements(self) -> Iterable[A]: | |
"""The sequence of data items or an evaluation of the Iterable-producing source function. | |
""" | |
if callable(self._source): | |
elements = self._source() | |
if isinstance(elements, Sequence): | |
# if we had a lazy-evaluated thing before, i.e. a () -> Iterable[A], | |
# since we just called it, we check if that thing returned a Sequence[A] | |
# if it does, then we can cache the function with its returned sequence | |
self._source = elements | |
else: | |
elements = self._source | |
return elements | |
def iter(self) -> Iterator[A]: | |
return iter(self._elements()) | |
def flat_map(self, | |
apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'LocalDataEvidence[B]': | |
new_elements = [] | |
for e in self._elements(): | |
res = apply(e) | |
if res is not None: | |
if isinstance(res, Iterable): | |
for expanded_e in res: | |
new_elements.append(expanded_e) | |
else: | |
new_elements.append(e) | |
return LocalDataEvidence(tuple(new_elements)) | |
def map_partition(self, apply: Callable[[Iterable[A]], Iterable[B]]) -> 'LocalDataEvidence[B]': | |
return LocalDataEvidence(lambda: apply(self._elements())) | |
def map(self, apply: Callable[[A], B]) -> 'LocalDataEvidence[B]': | |
if isinstance(self._source, Sequence): | |
return LocalDataEvidence(tuple(map(apply, self._source))) | |
else: | |
def delay_map_application(): | |
for x in self._elements(): | |
yield apply(x) | |
return LocalDataEvidence[B](delay_map_application) | |
def aggregate(self, | |
zero: B, | |
seq_op: Callable[[B, A], B], | |
comb_op: Optional[Callable[[B, B], B]] = None) -> B: | |
zero = zero | |
for x in self._elements(): | |
zero = seq_op(zero, x) | |
return zero | |
def reduce(self, comb_op: Callable[[A, A], A]) -> A: | |
if isinstance(self._source, Sequence): | |
first, second, rest = self._source[0], self._source[1], self._source[2:] | |
else: | |
elmns = iter(self._elements()) | |
first = next(elmns) | |
second = next(elmns) | |
rest = elmns | |
zero = comb_op(first, second) | |
for x in rest: | |
zero = comb_op(zero, x) | |
return zero | |
def collect(self) -> Sequence[A]: | |
if isinstance(self._source, Sequence): | |
return list(self._source) | |
else: | |
return list(self._elements()) | |
def take(self, n: int) -> Sequence[A]: | |
if isinstance(self._source, Sequence): | |
return list(self._source[0:n]) | |
else: | |
elements = [] | |
i = 0 | |
for e in self._elements(): | |
if i >= n: | |
break | |
elements.append(e) | |
i += 1 | |
return elements | |
def group_by(self, key: Callable[[A], B]) -> 'LocalDataEvidence[Tuple[B, Sequence[A]]]': | |
d = defaultdict(list) | |
for e in self._elements(): | |
d[key(e)].append(e) | |
return LocalDataEvidence(tuple(d.items())) | |
def __str__(self) -> str: | |
"""Displays the underlying collection's `__str__` representation and this class's name. | |
""" | |
return f"{type(self).__name__}({str(self._source)})" | |
def size() -> int: | |
if self._size == -1: | |
# need to calculate size by iterating through elements | |
self._size = 0 | |
for _ in self._elements(): | |
self._size += 1 | |
return self._size | |
class Rdd(DataTypeclass[A]): | |
def __init__(self, rdd: RDD[A]) -> None: | |
self._rdd = rdd | |
def iter(self) -> Iterator[A]: | |
return self._rdd.toLocalIterator() | |
def aggregate(self, | |
zero: B, | |
seq_op: Callable[[B, A], B], | |
comb_op: Callable[[B, B], B]) -> B: | |
return self._rdd.aggregate(zero, seq_op, comb_op) | |
def reduce(self, comb_op: Callable[[A, A], A]) -> A: | |
return self._rdd.reduce(comb_op) | |
def collect(self) -> Sequence[A]: | |
return self._rdd.collect() | |
def take(self, n: int) -> Sequence[A]: | |
return self._rdd.take(n) | |
def map(self, apply: Callable[[A], B]) -> 'RddDataEvidence[B]': | |
return self._rdd.map(apply) | |
def map_partition(self, | |
apply: Callable[[Iterator[A]], Iterator[B]]) -> 'RddDataEvidence[B]': | |
return self._rdd.mapPartiions(apply) | |
def flat_map(self, | |
apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'RddDataEvidence[B]': | |
return self._rdd.flatMap(apply) | |
def group_by(self, key: Callable[[A], B]) -> 'RddDataEvidence[Tuple[B, Sequence[A]]]': | |
return self._rdd.groupBy(key) | |
def size() -> int: | |
return self._rdd.count() | |
def __str__(self) -> str: | |
"""Display the underlying RDD's :func:`__str__` representation & this class name. | |
""" | |
return f"{type(self).__name__}({str(self._rdd)})" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment