Skip to content

Instantly share code, notes, and snippets.

@jongwook
Created January 31, 2018 23:36
Show Gist options
  • Save jongwook/5f20946128cacc3ccc74a6e21060deb9 to your computer and use it in GitHub Desktop.
Save jongwook/5f20946128cacc3ccc74a6e21060deb9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Proof-of-Concept: The Functional Streamer\n",
"\n",
"Below is an example of `Streamer` class that contains a finite or infinite stream of items. It wraps any Python iterable or generator and supports various functional transformations.\n",
"\n",
"### Transformations on a Streamer\n",
"\n",
"The following methods return another instance of `Streamer`.\n",
"\n",
"* `map(f)`: apply a function `f` to each element\n",
"* `flatmap(f)`: apply a function `f` returning a list to each element, and concatenate them\n",
"* `filter(f)`: filter the elements so that `f` returns `True` for all elements\n",
"* `take(n)`: take the first `n` elements\n",
"* `skip(n)`: skip the first `n` elements and get the others\n",
"* `repeat(c)`: loop the iterable multiple times, infinitely by default\n",
"* `group(size)`: group the adjacent elements by tuples of size `size`\n",
"* `shuffle(buffer_size)`: shuffle the elements using a buffer of size `buffer_size`\n",
"* `counting()`: returns a Streamer that has attribute 'count' that contains the number of items that it has passed\n",
"\n",
"These two are a bit tricky but useful for some complex use cases:\n",
"\n",
"* `mapstream(f)`: for each item, lazily call `f` (which is a generator function or returns an iterable) to build a stream and return a stream of the streams\n",
"* `cache()`: eagerly execute all necessary iterations and save the elements in an array; useful for operations having side-effects or large computational costs\n",
"\n",
"### Terminal Operations on a Streamer\n",
"\n",
"The following methods are meant to be the last stage of transformations\n",
"\n",
"* `foreach`: call a function with element\n",
"* `list`: for a finite streamer, return all elements as a list\n",
"* `__iter__`: return the underlying iterable for consuming data\n",
"\n",
"### Combining Streamers\n",
"\n",
"The following methods creates a new Streamer from one or more Streamers\n",
"\n",
"* `concat`: concatenate multiple streamers, assuming all but the last streamers are finite\n",
"* `zip`: create a Streamer that contains tuples containing one element from each streamer\n",
"* `roundrobin`: create a Streamer that pulls one element from each streamer in a round-robin manner"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from random import Random\n",
"\n",
"class Streamer:\n",
" def __init__(self, iterable_supplier, *args, **kwargs):\n",
" self._iterable_supplier = iterable_supplier\n",
" self._args = args\n",
" self._kwargs = kwargs\n",
" \n",
" if not callable(self._iterable_supplier):\n",
" self._iterable_supplier = lambda: iterable_supplier\n",
"\n",
" def map(self, func):\n",
" return Streamer(lambda: (func(item) for item in self))\n",
"\n",
" def flatmap(self, func):\n",
" return Streamer(lambda: (element for item in self for element in func(item)))\n",
"\n",
" def filter(self, func):\n",
" return Streamer(lambda: (item for item in self if func(item)))\n",
"\n",
" def foreach(self, func):\n",
" for item in self:\n",
" func(item)\n",
"\n",
" def take(self, n):\n",
" return Streamer(lambda: (item for _, item in zip(range(n), self)))\n",
"\n",
" def skip(self, n):\n",
" return Streamer(lambda: (item for i, item in enumerate(self) if i >= n))\n",
"\n",
" def repeat(self, loops=-1):\n",
" if loops == -1:\n",
" return Streamer(lambda: (item for _ in iter(int, 1) for item in self))\n",
" else:\n",
" return Streamer(lambda: (item for _ in range(loops) for item in self))\n",
"\n",
" def group(self, size):\n",
" def generator(iterator, size):\n",
" while True:\n",
" try:\n",
" yield tuple([next(iterator) for _ in range(size)])\n",
" except StopIteration:\n",
" return\n",
" return Streamer(generator, iter(self), size)\n",
"\n",
" def shuffle(self, buffer_size, seed=None):\n",
" def generator(buffer_size, seed):\n",
" random = Random(seed)\n",
"\n",
" # fill the buffer\n",
" iterator = iter(self)\n",
" buffer = [item for _, item in zip(range(buffer_size), iterator)]\n",
" random.shuffle(buffer)\n",
" \n",
" # sample one from the buffer and replace with a new one pulled from the iterator\n",
" for item in iterator:\n",
" i = random.randrange(buffer_size)\n",
" yield buffer[i]\n",
" buffer[i] = item\n",
" \n",
" # drain any remaining items\n",
" for item in buffer:\n",
" if item is not None:\n",
" yield item \n",
" \n",
" return Streamer(lambda: generator(buffer_size, seed))\n",
"\n",
" def mapstream(self, iterable_supplier):\n",
" return Streamer(lambda: (Streamer(iterable_supplier, item) for item in self))\n",
" \n",
" def cache(self):\n",
" return Streamer(self.list())\n",
" \n",
" def counting(self):\n",
" class CountingStreamer(Streamer): \n",
" def __init__(self, streamer):\n",
" super().__init__(streamer._iterable_supplier, *streamer._args, **streamer._kwargs)\n",
" self.count = 0\n",
" \n",
" def __iter__(self):\n",
" for item in super().__iter__():\n",
" self.count += 1\n",
" yield item \n",
" \n",
" return CountingStreamer(self)\n",
" \n",
" def list(self):\n",
" return [item for item in self]\n",
"\n",
" @classmethod\n",
" def concat(cls, streamers):\n",
" def generator(streamers):\n",
" for streamer in streamers:\n",
" for item in cls._materialize(streamer):\n",
" yield item\n",
" return Streamer(generator, streamers)\n",
"\n",
" @classmethod\n",
" def zip(cls, streamers):\n",
" return Streamer(lambda: zip(*map(iter, map(cls._materialize, streamers))))\n",
" \n",
" @classmethod\n",
" def roundrobin(cls, streamers):\n",
" def generator(streamers):\n",
" iterators = list(map(iter, map(cls._materialize, streamers)))\n",
" while len(iterators) > 0:\n",
" for iterator in iterators:\n",
" try:\n",
" yield next(iterator)\n",
" except StopIteration:\n",
" iterators.remove(iterator)\n",
" return Streamer(generator, streamers)\n",
" \n",
" @classmethod\n",
" def shufflemux(cls, streamers, buffer_size=None, seed=None):\n",
" def generator(streamers, buffer_size, seed):\n",
" random = Random(seed)\n",
" # fill the buffer of iterators\n",
" streamer_iterator = iter(streamers)\n",
" if buffer_size is None:\n",
" iterators = list(map(iter, map(cls._materialize, streamer_iterator)))\n",
" else:\n",
" iterators = [iter(cls._materialize(iterable)) for _, iterable in zip(range(buffer_size), streamer_iterator)]\n",
" \n",
" # keep choosing a random iterator yield one from it\n",
" while len(iterators) > 0:\n",
" i = random.randrange(len(iterators))\n",
" iterator = iterators[i]\n",
" try:\n",
" yield next(iterator)\n",
" except StopIteration:\n",
" try:\n",
" iterators[i] = iter(next(streamer_iterator))\n",
" except StopIteration:\n",
" iterators.pop(i)\n",
" \n",
" return Streamer(generator, streamers, buffer_size, seed)\n",
" \n",
" @classmethod\n",
" def _materialize(cls, iterable):\n",
" if callable(iterable):\n",
" iterable = iterable()\n",
" try:\n",
" return iter(iterable)\n",
" except TypeError:\n",
" raise RuntimeError('iterable expected')\n",
" \n",
" def __iter__(self):\n",
" return iter(self._iterable_supplier(*self._args, **self._kwargs))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 1. Simple Examples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`Streamer`'s initializer accepts any iterable (including generators), a function returning an iterable, or (equivalently) a generator function.\n",
"\n",
"You can apply a function for each of the elements, by calling **`foreach`** with the function as the argument:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\n",
"2\n",
"3\n",
"4\n",
"5\n"
]
}
],
"source": [
"Streamer([1, 2, 3, 4, 5]).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`map()`** takes a function and builds a new `Streamer` object composed of the results of the function appied to each element in the original `Streamer`:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[2, 4, 6, 8, 10]"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer([1, 2, 3, 4, 5]).map(lambda x: x * 2).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`filter()`** takes a function returning a bool value, and keeps only those the function returned `True` for."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[1, 3, 5, 7, 9]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer(range(10)).filter(lambda x: x % 2 != 0).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`repeat()`** makes the `Streamer` to loop over its elements infinitely. You can give the number of loops as an optional argument. **`take()`** returns a new `Streamer` instance that only contains a given number of the first elements:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer(range(5)).repeat().take(12).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`flatmap()`** takes a function that returns a list, which can have zero or more elements. The resulting `Streamer` contains a concatenation of the returned lists. This is suitable for use with a function that loads a data file and returns a list of data points, or a function that takes a data point and returns a list of augmented points."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['0a', '0b', '0c', '1a', '1b', '1c', '2a', '2b', '2c']"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer(range(3)).map(str).flatmap(lambda x: [x + 'a', x + 'b', x + 'c']).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`group()`** can group adjacent items to make tuples of given size:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 1, 2, 3, 4),\n",
" (5, 6, 7, 8, 9),\n",
" (10, 11, 12, 13, 14),\n",
" (15, 16, 17, 18, 19),\n",
" (20, 21, 22, 23, 24),\n",
" (25, 26, 27, 28, 29),\n",
" (30, 31, 32, 33, 34),\n",
" (35, 36, 37, 38, 39)]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer(range(42)).group(5).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another example of using **`group()`**, combined with two **`map()`** calls with type constructor `str` and bound method `\"\\t\".join`:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\t2\t3\t4\t5\t6\t7\t8\t9\n",
"2\t4\t6\t8\t10\t12\t14\t16\t18\n",
"3\t6\t9\t12\t15\t18\t21\t24\t27\n",
"4\t8\t12\t16\t20\t24\t28\t32\t36\n",
"5\t10\t15\t20\t25\t30\t35\t40\t45\n",
"6\t12\t18\t24\t30\t36\t42\t48\t54\n",
"7\t14\t21\t28\t35\t42\t49\t56\t63\n",
"8\t16\t24\t32\t40\t48\t56\t64\t72\n",
"9\t18\t27\t36\t45\t54\t63\t72\t81\n"
]
}
],
"source": [
"Streamer(x * y for x in range(1, 10) for y in range(1, 10)).map(str).group(9).map(\"\\t\".join).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Shuffle and Repeat\n",
"\n",
"**`shuffle()`** accepts a buffer size which may be useful when we want to ensure a full dataset sweep for an epoch. Note the difference of behavior according to the order of **`shuffle()`** and **`repeat()`**:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[(3, 2, 4, 1), (2, 1, 3, 4), (3, 2, 4, 1), (1, 4, 2, 3), (4, 1, 2, 3), (3, 4, 2, 1), (1, 3, 4, 2)]\n",
"[(4, 1, 2, 1), (3, 4, 2, 2), (1, 4, 1, 3), (2, 3, 3, 4), (2, 1, 3, 3), (4, 1, 4, 1), (4, 3, 2, 2)]\n"
]
}
],
"source": [
"# there are unique numbers in each group\n",
"print(Streamer([1, 2, 3, 4]).shuffle(4).repeat().group(4).take(7).list())\n",
"\n",
"# there can be duplicate numbers in each group\n",
"print(Streamer([1, 2, 3, 4]).repeat().shuffle(4).group(4).take(7).list())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Filtering prime numbers"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def integers(starting=2):\n",
" while True:\n",
" yield starting\n",
" starting += 1\n",
" \n",
"def sieve(streamer):\n",
" head = next(iter(streamer.take(1)))\n",
" tail = streamer.filter(lambda n: n % head != 0)\n",
" return Streamer.concat([Streamer([head]), lambda: sieve(tail)])\n",
"\n",
"streamer = Streamer(integers(starting=2))\n",
"\n",
"sieve(streamer).take(20).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### FizzBuzz"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\n",
"2\n",
"Fizz\n",
"4\n",
"Buzz\n",
"Fizz\n",
"7\n",
"8\n",
"Fizz\n",
"Buzz\n",
"11\n",
"Fizz\n",
"13\n",
"14\n",
"FizzBuzz\n",
"16\n",
"17\n",
"Fizz\n",
"19\n"
]
}
],
"source": [
"Streamer(range(1, 20)).map(lambda x: [x, 'Fizz', 'Buzz', 'FizzBuzz'][bool(x % 3 == 0) + 2 * bool(x % 5 == 0)]).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Counting\n",
"\n",
"**`count()`** will give a `Streamer` containing the same elements, and has an attribute `count` that contains how many elements it has iterated over. This may be useful when we want to track how many items has been supplied to an algorithm at any point in the program."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" before consuming: 0\n",
" after consuming: 5\n",
" after consuming twice: 10\n",
"after taking 42 more elements: 52\n"
]
}
],
"source": [
"streamer = Streamer([1, 2, 3, 4, 5]).counting()\n",
"print(\" before consuming:\", streamer.count)\n",
"\n",
"# Streamer works lazily and the counter works only when the content is consumed\n",
"streamer.list()\n",
"print(\" after consuming:\", streamer.count)\n",
"\n",
"# If the streamer is consumed mulitple times, the counter can be larger than the actual number of elements in the streamer\n",
"streamer.list()\n",
"print(\" after consuming twice:\", streamer.count)\n",
"\n",
"# downstream transforms can make the counter even larger\n",
"streamer.repeat().take(42).list()\n",
"print(\"after taking 42 more elements:\", streamer.count)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 2. Combining Streamers"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[4, 5, 6]\n",
"[7, 8, 9, 10]\n"
]
}
],
"source": [
"A = Streamer([1, 2, 3])\n",
"B = Streamer([4, 5, 6])\n",
"C = Streamer([7, 8, 9, 10])\n",
"\n",
"print(A.list())\n",
"print(B.list())\n",
"print(C.list())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`concat()`** simply concatenates given streamers."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer.concat([A, B, C]).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`zip()`** repeats taking one element from each of given streamers and building a tuple from them. The streamer will reach the end when any of the streamers stops:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(1, 4, 7), (2, 5, 8), (3, 6, 9)]"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer.zip([A, B, C]).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**`roundrobin()`** is like **`zip()`**, but does not build tuples and returns the items one by one from each streamer. It will skip already finished streamers, and continue until all streamers finish:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[1, 4, 7, 2, 5, 8, 3, 6, 9, 10]"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Streamer.roundrobin([A, B, C]).list()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 3. File Reading Examples"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"400 lines found\n",
"File A.txt line 2\n",
"File A.txt line 43\n",
"File B.txt line 3\n",
"File A.txt line 9\n",
"File B.txt line 97\n",
"File B.txt line 68\n",
"File B.txt line 73\n",
"File A.txt line 40\n",
"File B.txt line 46\n",
"File B.txt line 32\n"
]
}
],
"source": [
"def fake_read_file(filename):\n",
" for row in range(100):\n",
" yield \"File %s line %d\" % (filename, row)\n",
" \n",
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"\n",
"# contains all lines from all files\n",
"lines = files.flatmap(fake_read_file)\n",
"print('%d lines found' % len(list(lines)))\n",
"\n",
"# this is basically how TensorFlow does shuffling; only shows the random samples from A.txt and B.txt for a while\n",
"lines.shuffle(200).take(10).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File C.txt line 2\n",
"File B.txt line 92\n",
"File B.txt line 18\n",
"File C.txt line 84\n",
"File C.txt line 90\n",
"File C.txt line 10\n",
"File C.txt line 17\n",
"File B.txt line 65\n",
"File B.txt line 5\n",
"File D.txt line 6\n"
]
}
],
"source": [
"# this is an alternative, but still just reading from random two files\n",
"files.shuffle(4).flatmap(fake_read_file).shuffle(200).take(10).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 4. Multiplexing by Stream Combination\n",
"\n",
"It is possible to chain a combination of methods in `Streamer` for multiplexing:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File A.txt line 0\n",
"File B.txt line 0\n",
"File C.txt line 0\n",
"File D.txt line 0\n",
"File A.txt line 1\n",
"File B.txt line 1\n",
"File C.txt line 1\n",
"File D.txt line 1\n",
"File A.txt line 2\n",
"File B.txt line 2\n"
]
}
],
"source": [
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"streams = files.mapstream(fake_read_file) # equivalent to: Streamer(Streamer(fake_read_file, file) for file in files)\n",
"\n",
"# equivalent to RoundRobinMux\n",
"Streamer.roundrobin(streams).take(10).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File B.txt line 23\n",
"File B.txt line 24\n",
"File B.txt line 25\n",
"File B.txt line 26\n",
"File B.txt line 27\n",
"File B.txt line 28\n",
"File B.txt line 29\n",
"File B.txt line 30\n",
"File B.txt line 31\n",
"File B.txt line 32\n"
]
}
],
"source": [
"# sequential sweep of all streams\n",
"Streamer.concat(streams).repeat().skip(923).take(10).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File A.txt line 0\n",
"File A.txt line 1\n",
"File A.txt line 2\n",
"File C.txt line 0\n",
"File C.txt line 1\n",
"File A.txt line 3\n",
"File A.txt line 4\n",
"File D.txt line 0\n",
"File A.txt line 5\n",
"File C.txt line 2\n",
"File D.txt line 1\n",
"File C.txt line 3\n",
"File B.txt line 0\n",
"File D.txt line 2\n",
"File B.txt line 1\n",
"File C.txt line 4\n",
"File C.txt line 5\n",
"File B.txt line 2\n",
"File C.txt line 6\n",
"File B.txt line 3\n"
]
}
],
"source": [
"# repeatly draw one from a randomly chosen file, sweeping sequentially\n",
"Streamer.shufflemux(streams).take(20).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File B.txt line 54\n",
"File B.txt line 78\n",
"File A.txt line 45\n",
"File C.txt line 86\n",
"File C.txt line 21\n",
"File D.txt line 28\n",
"File C.txt line 38\n",
"File D.txt line 88\n",
"File A.txt line 8\n",
"File B.txt line 77\n",
"File A.txt line 22\n",
"File B.txt line 81\n",
"File A.txt line 50\n",
"File A.txt line 40\n",
"File A.txt line 16\n",
"File B.txt line 71\n",
"File C.txt line 45\n",
"File C.txt line 56\n",
"File C.txt line 50\n",
"File D.txt line 75\n"
]
}
],
"source": [
"# by shuffling each file\n",
"streams = streams.map(lambda stream: stream.shuffle(100))\n",
"Streamer.shufflemux(streams).take(20).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File F.txt line 98\n",
"File A.txt line 83\n",
"File C.txt line 99\n",
"File C.txt line 77\n",
"File C.txt line 58\n",
"File H.txt line 41\n",
"File E.txt line 68\n",
"File H.txt line 77\n",
"File H.txt line 92\n",
"File H.txt line 26\n",
"File E.txt line 83\n",
"File H.txt line 9\n",
"File E.txt line 92\n",
"File E.txt line 55\n",
"File G.txt line 64\n",
"File D.txt line 5\n",
"File G.txt line 2\n",
"File G.txt line 86\n",
"File D.txt line 97\n",
"File D.txt line 84\n"
]
}
],
"source": [
"# similar to StochasticMux\n",
"from numpy.random import poisson\n",
"\n",
"k = 2 # number of active streams\n",
"rate = 4 # the Poission rate\n",
"\n",
"filenames = [\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\", \"E.txt\", \"F.txt\", \"G.txt\", \"H.txt\"]\n",
"\n",
"files = Streamer(filenames).shuffle(buffer_size=len(filenames)).repeat()\n",
"\n",
"# now we have a stream of streams of data\n",
"streams = files.mapstream(fake_read_file)\n",
"\n",
"# cut each stream according to Poisson distribution\n",
"streams = streams.map(lambda s: s.shuffle(100).take(poisson(rate)))\n",
"\n",
"data = Streamer.shufflemux(streams, buffer_size=k)\n",
"\n",
"data.take(20).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 5. Multiplexing by Subclassing\n",
"\n",
"Using specialized subclasses might be a better idea than forcing users to always use cleverly chained calls, both for easier usage and extending features:\n",
"\n",
"### RoundRobinMux"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"class RoundRobinMux(Streamer):\n",
" def __init__(self, streamers):\n",
" self.streamers = streamers\n",
"\n",
" def __iter__(self):\n",
" iterators = list(map(iter, map(Streamer._materialize, self.streamers)))\n",
" while len(iterators) > 0:\n",
" for iterator in iterators:\n",
" try:\n",
" yield next(iterator)\n",
" except StopIteration:\n",
" iterators.remove(iterator)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File A.txt line 0\n",
"File B.txt line 0\n",
"File C.txt line 0\n",
"File D.txt line 0\n",
"File A.txt line 1\n",
"File B.txt line 1\n",
"File C.txt line 1\n",
"File D.txt line 1\n",
"File A.txt line 2\n",
"File B.txt line 2\n"
]
}
],
"source": [
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"streamers = [Streamer(fake_read_file, file) for file in files]\n",
"\n",
"RoundRobinMux(streamers).take(10).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### ChainMux"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"class ChainMux(Streamer):\n",
" def __init__(self, streamers):\n",
" self.streamers = streamers\n",
"\n",
" def __iter__(self):\n",
" for stream in self.streamers:\n",
" for item in stream:\n",
" yield item"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"length: 400\n",
"File A.txt line 0\n",
"File A.txt line 1\n",
"File A.txt line 2\n",
"File A.txt line 3\n",
"File A.txt line 4\n",
"File A.txt line 5\n",
"File A.txt line 6\n",
"File A.txt line 7\n",
"File A.txt line 8\n",
"File A.txt line 9\n"
]
}
],
"source": [
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"streamers = [Streamer(fake_read_file, file) for file in files]\n",
"\n",
"chain = ChainMux(streamers)\n",
"print('length:', len(list(chain)))\n",
"chain.take(10).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### ShuffleMux"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"class ShuffleMux(Streamer):\n",
" def __init__(self, streamers, buffer_size=None, seed=None, streamer_shuffle_size=None):\n",
" self.streamers = streamers\n",
" \n",
" if streamer_shuffle_size:\n",
" self.streamers = [streamer.shuffle(streamer_shuffle_size) for streamer in self.streamers]\n",
" \n",
" self.buffer_size = buffer_size\n",
" self.seed = seed\n",
" \n",
" def __iter__(self):\n",
" random = Random(self.seed)\n",
" # fill the buffer of iterators\n",
" streamer_iterator = iter(self.streamers)\n",
" if self.buffer_size is None:\n",
" iterators = list(map(iter, map(Streamer._materialize, streamer_iterator)))\n",
" else:\n",
" iterators = [iter(cls._materialize(iterable)) for _, iterable in zip(range(buffer_size), streamer_iterator)]\n",
"\n",
" # keep choosing a random iterator yield one from it\n",
" while len(iterators) > 0:\n",
" i = random.randrange(len(iterators))\n",
" iterator = iterators[i]\n",
" try:\n",
" yield next(iterator)\n",
" except StopIteration:\n",
" try:\n",
" iterators[i] = iter(next(streamer_iterator))\n",
" except StopIteration:\n",
" iterators.pop(i)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File A.txt line 0\n",
"File D.txt line 0\n",
"File D.txt line 1\n",
"File B.txt line 0\n",
"File D.txt line 2\n",
"File B.txt line 1\n",
"File B.txt line 2\n",
"File C.txt line 0\n",
"File D.txt line 3\n",
"File D.txt line 4\n"
]
}
],
"source": [
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"streamers = files.mapstream(fake_read_file) # equivalent to: Streamer(Streamer(fake_read_file, file) for file in files)\n",
"\n",
"ShuffleMux(streamers).take(10).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"length: 400\n",
"File B.txt line 84\n",
"File B.txt line 10\n",
"File A.txt line 79\n",
"File A.txt line 39\n",
"File D.txt line 64\n",
"File B.txt line 20\n",
"File C.txt line 96\n",
"File C.txt line 53\n",
"File C.txt line 4\n",
"File B.txt line 12\n"
]
}
],
"source": [
"# shuffling in each file\n",
"mux = ShuffleMux(streamers, streamer_shuffle_size=100)\n",
"print('length:', len(list(mux)))\n",
"mux.take(10).foreach(print)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### StochasticMux"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from numpy.random import poisson\n",
"\n",
"class StochasticMux(Streamer):\n",
" def __init__(self, streamers, k, rate, seed=None, streamer_shuffle_size=100):\n",
" stream_processor = lambda s: s.shuffle(streamer_shuffle_size).take(poisson(rate))\n",
" self.streamers = streamers.map(stream_processor)\n",
" self.seed = seed\n",
" self.k = k\n",
" \n",
" def __iter__(self):\n",
" random = Random(self.seed)\n",
" # fill the buffer of iterators\n",
" streamer_iterator = iter(self.streamers)\n",
" iterators = [iter(Streamer._materialize(iterable)) for _, iterable in zip(range(self.k), streamer_iterator)]\n",
"\n",
" # keep choosing a random iterator yield one from it\n",
" while len(iterators) > 0:\n",
" i = random.randrange(len(iterators))\n",
" iterator = iterators[i]\n",
" try:\n",
" yield next(iterator)\n",
" except StopIteration:\n",
" try:\n",
" iterators[i] = iter(next(streamer_iterator))\n",
" except StopIteration:\n",
" iterators.pop(i)"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File B.txt line 10\n",
"File A.txt line 93\n",
"File B.txt line 85\n",
"File B.txt line 19\n",
"File B.txt line 76\n",
"File A.txt line 25\n",
"File B.txt line 55\n",
"File A.txt line 83\n",
"File C.txt line 73\n",
"File C.txt line 67\n",
"File A.txt line 89\n",
"File A.txt line 72\n",
"File A.txt line 29\n",
"File D.txt line 5\n",
"File D.txt line 84\n",
"File C.txt line 66\n",
"File C.txt line 46\n",
"File A.txt line 97\n",
"File C.txt line 1\n",
"File A.txt line 32\n",
"File C.txt line 90\n",
"File C.txt line 72\n",
"File C.txt line 25\n",
"File A.txt line 43\n",
"File A.txt line 46\n",
"File A.txt line 7\n",
"File B.txt line 22\n",
"File B.txt line 59\n",
"File B.txt line 56\n",
"File C.txt line 70\n"
]
}
],
"source": [
"files = Streamer([\"A.txt\", \"B.txt\", \"C.txt\", \"D.txt\"])\n",
"streamers = files.mapstream(fake_read_file).repeat()\n",
"\n",
"StochasticMux(streamers, k=2, rate=5).take(30).foreach(print)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment