Skip to content

Instantly share code, notes, and snippets.

@malcolmgreaves
Last active December 14, 2018 22:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malcolmgreaves/91734551fd7271d40a6df92b53c85a26 to your computer and use it in GitHub Desktop.
Save malcolmgreaves/91734551fd7271d40a6df92b53c85a26 to your computer and use it in GitHub Desktop.
Data type class for Python's Iterable & pySpark's RDD. Inspired by the Scala Data typeclass: https://github.com/malcolmgreaves/data-tc
from collections import defaultdict
from typing import TypeVar, Generic, Iterable, Union, Callable, Sequence, Optional, Iterator, Tuple
from pyspark.rdd import RDD
A = TypeVar('A')
B = TypeVar('B')
def new_instance(elements: Union[RDD[A], Iterable[A], Callable[[], Iterable[A]]],
) -> 'DataTypeclass[A]':
"""Constructs an appropriate implementation given the input data's type.
"""
if isinstance(elements, RDD):
return RddDataEvidence(elements)
else:
return LocalDataEvidence(elements)
class DataTypeclass(Generic[A]):
"""A one-size-fits-all API for data transformations.
"""
def map(self, apply: Callable[[A], B]) -> 'DataTypeclass[B]':
"""Apply the function to every element in the data.
"""
raise NotImplementedError
def map_partition(self, apply: Callable[[Iterator[A]], Iterator[B]]) -> 'DataTypeclass[B]':
"""Apply the function to each partition of the internal data.
"""
raise NotImplementedError
def flat_map(self, apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'DataTypeclass[B]':
"""Apply the function to every element, flattening results as appropriate.
"""
raise NotImplementedError
def aggregate(self,
zero: B,
seq_op: Callable[[B, A], B],
comb_op: Callable[[B, B], B]) -> B:
raise NotImplementedError
def reduce(self, comb_op: Callable[[A, A], A]) -> A:
"""Combine elements together, yielding a single value.
NOTE: The combination operator, `comb_ob`, must be a commutative and associative operation.
"""
raise NotImplementedError
def collect(self) -> Sequence[A]:
"""Obtain all data elements into a single indexable sequence.
NOTE: There are no ordering guarentees on the resulting `Sequence`.
"""
raise NotImplementedError
def take(self, n: int) -> Sequence[A]:
"""Obtain at most `n` arbitrary distinct elements from the data.
NOTE: Will obtain less than `n` elements iff `n` > :func:`size()`.
"""
raise NotImplementedError
def group_by(self, key: Callable[[A], B]) -> 'DataTypeclass[Tuple[B, Sequence[A]]]':
"""Group data elements that have the same value using the `key` function.
NOTE: The groups have arbitrary ordering.
"""
raise NotImplementedError
def iter(self) -> Iterator[A]:
"""Iterate through all data elements, in an arbitrary order.
"""
raise NotImplementedError
def __iter__(self) -> Iterator[A]:
"""Alias for :func:`iter`.
"""
return self.iter()
def size() -> int:
"""The number of distinct data elements.
"""
raise NotImplementedError
def __len__(self) -> int:
"""Alias for :func:`size`.
"""
return self.size()
class Local(DataTypeclass[A]):
def __init__(self, source: Union[Iterable[A], Callable[[], Iterable[A]]]) -> None:
if isinstance(source, Iterable):
self._source = tuple(source)
self._size = len(self._source)
elif callable(source):
self._source = source
self._size = -1
else:
raise TypeError(f"Unexpected source type: not a Sequence nor a () -> Iterable: "
f"{type(source)}")
def _elements(self) -> Iterable[A]:
"""The sequence of data items or an evaluation of the Iterable-producing source function.
"""
if callable(self._source):
elements = self._source()
if isinstance(elements, Sequence):
# if we had a lazy-evaluated thing before, i.e. a () -> Iterable[A],
# since we just called it, we check if that thing returned a Sequence[A]
# if it does, then we can cache the function with its returned sequence
self._source = elements
else:
elements = self._source
return elements
def iter(self) -> Iterator[A]:
return iter(self._elements())
def flat_map(self,
apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'LocalDataEvidence[B]':
new_elements = []
for e in self._elements():
res = apply(e)
if res is not None:
if isinstance(res, Iterable):
for expanded_e in res:
new_elements.append(expanded_e)
else:
new_elements.append(e)
return LocalDataEvidence(tuple(new_elements))
def map_partition(self, apply: Callable[[Iterable[A]], Iterable[B]]) -> 'LocalDataEvidence[B]':
return LocalDataEvidence(lambda: apply(self._elements()))
def map(self, apply: Callable[[A], B]) -> 'LocalDataEvidence[B]':
if isinstance(self._source, Sequence):
return LocalDataEvidence(tuple(map(apply, self._source)))
else:
def delay_map_application():
for x in self._elements():
yield apply(x)
return LocalDataEvidence[B](delay_map_application)
def aggregate(self,
zero: B,
seq_op: Callable[[B, A], B],
comb_op: Optional[Callable[[B, B], B]] = None) -> B:
zero = zero
for x in self._elements():
zero = seq_op(zero, x)
return zero
def reduce(self, comb_op: Callable[[A, A], A]) -> A:
if isinstance(self._source, Sequence):
first, second, rest = self._source[0], self._source[1], self._source[2:]
else:
elmns = iter(self._elements())
first = next(elmns)
second = next(elmns)
rest = elmns
zero = comb_op(first, second)
for x in rest:
zero = comb_op(zero, x)
return zero
def collect(self) -> Sequence[A]:
if isinstance(self._source, Sequence):
return list(self._source)
else:
return list(self._elements())
def take(self, n: int) -> Sequence[A]:
if isinstance(self._source, Sequence):
return list(self._source[0:n])
else:
elements = []
i = 0
for e in self._elements():
if i >= n:
break
elements.append(e)
i += 1
return elements
def group_by(self, key: Callable[[A], B]) -> 'LocalDataEvidence[Tuple[B, Sequence[A]]]':
d = defaultdict(list)
for e in self._elements():
d[key(e)].append(e)
return LocalDataEvidence(tuple(d.items()))
def __str__(self) -> str:
"""Displays the underlying collection's `__str__` representation and this class's name.
"""
return f"{type(self).__name__}({str(self._source)})"
def size() -> int:
if self._size == -1:
# need to calculate size by iterating through elements
self._size = 0
for _ in self._elements():
self._size += 1
return self._size
class Rdd(DataTypeclass[A]):
def __init__(self, rdd: RDD[A]) -> None:
self._rdd = rdd
def iter(self) -> Iterator[A]:
return self._rdd.toLocalIterator()
def aggregate(self,
zero: B,
seq_op: Callable[[B, A], B],
comb_op: Callable[[B, B], B]) -> B:
return self._rdd.aggregate(zero, seq_op, comb_op)
def reduce(self, comb_op: Callable[[A, A], A]) -> A:
return self._rdd.reduce(comb_op)
def collect(self) -> Sequence[A]:
return self._rdd.collect()
def take(self, n: int) -> Sequence[A]:
return self._rdd.take(n)
def map(self, apply: Callable[[A], B]) -> 'RddDataEvidence[B]':
return self._rdd.map(apply)
def map_partition(self,
apply: Callable[[Iterator[A]], Iterator[B]]) -> 'RddDataEvidence[B]':
return self._rdd.mapPartiions(apply)
def flat_map(self,
apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'RddDataEvidence[B]':
return self._rdd.flatMap(apply)
def group_by(self, key: Callable[[A], B]) -> 'RddDataEvidence[Tuple[B, Sequence[A]]]':
return self._rdd.groupBy(key)
def size() -> int:
return self._rdd.count()
def __str__(self) -> str:
"""Display the underlying RDD's :func:`__str__` representation & this class name.
"""
return f"{type(self).__name__}({str(self._rdd)})"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment