Skip to content

Instantly share code, notes, and snippets.

@reubano
Last active June 14, 2016 10:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reubano/d9ae4d47ffb1efbe08a5667da1f121a4 to your computer and use it in GitHub Desktop.
Save reubano/d9ae4d47ffb1efbe08a5667da1f121a4 to your computer and use it in GitHub Desktop.
riko readme

riko: A stream processing framework modeled after Yahoo! Pipes

travis versions pypi

Index

Introduction | Requirements | Word Count | Motivation | Usage | Installation | Design Principles | Scripts | Command-line Interface | Contributing | Credits | More Info | Project Structure | License

Introduction

riko is a pure Python framework for analyzing and processing streams of structured data. riko has synchronous and asynchronous APIs, supports parallel execution, and is well suited for processing rss feeds1. riko also supplies a command-line interface for executing flows.

With riko, you can

  • Read csv/xml/json/html files
  • Create text and data processing flows via modular pipes
  • Parse, extract, and process rss feeds
  • Create awesome mashups2, APIs, and maps
  • Perform parallel processing via cpus/processors or threads
  • and much more...

Notes

Requirements

riko has been tested and is known to work on Python 2.7 and PyPy2 5.1.1.

Optional Dependencies

Feature Dependency Installation
Async API Twisted pip install riko[async]
Accelerated xml parsing lxml3 pip install riko[lxml]

Notes

Word Count

In this example, we use several pipes to count the words on a webpage.

>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `detag` option will strip all html tags from the result
>>> url = get_path('users.jyu.fi.html')                                            # 1
>>> fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True}  # 2
>>> replace_conf = {'rule': {'find': '\n', 'replace': ' '}}
>>>
>>> flow = (SyncPipe('fetchpage', conf=fetch_conf)
...     .strreplace(conf=replace_conf, assign='content')
...     .stringtokenizer(conf={'delimiter': ' '}, emit=True)
...     .count())
>>>
>>> stream = flow.output
>>> next(stream)
{'count': 70}

Motivation

Why I built riko

Yahoo! Pipes4 was a user friendly web application used to:

aggregate, manipulate, and mashup content from around the web

Wanting to create custom pipes, I came across pipe2py which translated a Yahoo! Pipes pipe into python code. pipe2py suited my needs at the time but was unmaintained and lacked asynchronous or parallel processing APIs.

riko addresses the shortcomings of pipe2py but removed support for importing Yahoo! Pipes json workflow schemas. riko contains ~40 built-in modules, aka pipes, that allow you to programatically perform most of the tasks Yahoo! Pipes allowed.

Why you should use riko

riko provides a number of benefits / differences from other stream processing applications such as Huginn, Flink, Spark, and Storm5. Namely:

  • a small footprint (CPU and memory usage)
  • native RSS support
  • simple installation and usage
  • a pure python library with pypy support
  • modular pipes to filter, sort, and modify streams

The subsequent tradeoffs riko makes are:

  • not distributed (able to run on a cluster of servers)
  • no GUI for creating flows
  • doesn't continually monitor streams for new data
  • can't react to specific events
  • iterator (pull) based so streams only supports a single consumer6

The following table summaries these observations:

Framework Stream Type Footprint RSS simple7 async parallel CEP8 distributed

riko pipe2py Huginn

pull pull push

small small med

√ √ √

√ √

9

Others push large 10 11 12

For more detailed information, please check-out the FAQ.

Notes

Usage

riko is intended to be used directly as a Python library.

Usage Index

Fetching streams

riko can fetching streams from both local and remote filepaths via source pipes. Each source pipe returns a stream, i.e., an iterator of dictionaries, aka items.

>>> from riko.modules.pipefetch import pipe as fetch
>>> from riko.modules.pipefetchsitefeed import pipe as fetchsitefeed
>>>
>>> ### Fetch an rss feed ###
>>> stream = fetch(conf={'url': 'https://news.ycombinator.com/rss'})
>>>
>>> ### Fetch the first rss feed found ###
>>> stream = fetchsitefeed(conf={'url': 'http://www.bbc.com/news'})
>>>
>>> ### View the fetched rss feed(s) ###
>>> #
>>> # Note: regardless of how you fetch an rss feed, it will have the same
>>> # structure
>>> item = next(stream)
>>> sorted(item.keys())
[
    'author', 'author.name', 'author.uri', 'comments', 'content',
    'dc:creator', 'id', 'link', 'pubDate', 'summary', 'title',
    'updated', 'updated_parsed', 'y:id', 'y:published', 'y:title']
>>> item['title'], item['author'], item['link']
(
    'Using NFC tags in the car', 'Liam Green-Hughes',
    'http://www.greenhughes.com/content/using-nfc-tags-car')

Please see the FAQ for a complete list of supported file types and protocols. Please see Fetching data and feeds for more examples.

Synchronous processing

riko can modify streams by combining any of the 40 built-in pipes

>>> from itertools import chain
>>> from riko import get_path
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> fetch_conf = {'url': get_path('feed.xml')}                                          # 1
>>> filter_rule = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> sub_conf = {'path': 'content.value'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> sort_conf = {'rule': {'sort_key': 'content', 'sort_dir': 'desc'}}
>>>
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> #
>>> # The following flow will:
>>> #   1. fetch the rss feed
>>> #   2. filter for items published before 2/5/2009
>>> #   3. extract the path `content.value` from each feed item
>>> #   4. replace the extracted text with the last href url contained
>>> #      within it
>>> #   5. reverse sort the items by the replaced url
>>> #   6. obtain the raw stream
>>> #
>>> # Note: sorting is not lazy so take caution when using this pipe
>>> from riko.lib.collections import SyncPipe
>>>
>>> flow = (SyncPipe('fetch', conf=fetch_conf)  # 1
...     .filter(conf={'rule': filter_rule})     # 2
...     .subelement(conf=sub_conf, emit=True)   # 3
...     .regex(conf={'rule': regex_rule})       # 4
...     .sort(conf=sort_conf))                  # 5
>>>
>>> stream = flow.output                        # 6
>>> next(stream)
{'content': 'mailto:mail@writetoreply.org'}

Please see Alternate workflow creation for an alternative (function based) method for creating a stream. Please see pipes for a complete list of available pipes.

Parallel processing

An example using riko's parallel API to spawn a ThreadPool13

>>> from riko import get_path
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml')                                                          # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a parallel SyncPipe flow ###
>>> #
>>> # The following flow will:
>>> #   1. fetch the rss feed
>>> #   2. filter for items published before 2/5/2009
>>> #   3. extract the path `content.value` from each feed item
>>> #   4. replace the extracted text with the last href url contained
>>> #      within it
>>> #   5. filter for items with local file urls (which happen to be rss
>>> #      feeds)
>>> #   6. strip any trailing `\` from the url
>>> #   7. remove duplicate urls
>>> #   8. fetch each rss feed
>>> #   9. merge the feeds into a single stream of items
>>> flow = (SyncPipe('fetch', conf={'url': url}, parallel=True)  # 1
...     .filter(conf={'rule': filter_rule1})                     # 2
...     .subelement(conf=sub_conf, emit=True)                    # 3
...     .regex(conf={'rule': regex_rule})                        # 4
...     .filter(conf={'rule': filter_rule2})                     # 5
...     .strtransform(conf=strtransform_conf)                    # 6
...     .uniq(conf={'uniq_key': 'strtransform'})                 # 7
...     .fetch(conf={'url': {'subkey': 'strtransform'}}))        # 8
>>>
>>> stream = flow.list                                           # 9
>>> len(stream)
25

Asynchronous processing

To enable asynchronous processing, you must install the async module.

pip install riko[async]

An example using riko's asynchronous API.

>>> from twisted.internet.task import react
>>> from twisted.internet.defer import inlineCallbacks
>>> from riko import get_path
>>> from riko.twisted.collections import AsyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml')                                                          # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a AsyncPipe flow ###
>>> #
>>> # See `Parallel processing` above for an explanation of the steps this
>>> # performs
>>> @inlineCallbacks
... def run(reactor):
...     flow = yield (AsyncPipe('fetch', conf={'url': url})
...         .filter(conf={'rule': filter_rule1})
...         .subelement(conf=sub_conf, emit=True)
...         .regex(conf={'rule': regex_rule})
...         .filter(conf={'rule': filter_rule2})
...         .strtransform(conf=strtransform_conf)
...         .uniq(conf={'uniq_key': 'strtransform'})
...         .fetch(conf={'url': {'subkey': 'strtransform'}}))
...
...     stream = flow.list
...     print(len(stream))
...
>>> react(run)
25

Cookbook

Please see the cookbook or ipython notebook for more examples.

Notes

Installation

(You are using a virtualenv, right?)

At the command line, install riko using either pip (recommended)

pip install riko

or easy_install

easy_install riko

Please see the installation doc for more details.

Design Principles

The primary data structures in riko are the item and stream. An item is just a python dictionary, and a stream is an iterator of items. You can create a stream manually with something as simple as [{'content': 'hello world'}]. You manipulate streams in riko via pipes. A pipe is simply a function that accepts either a stream or item, and returns a stream. pipes are composable: you can use the output of one pipe as the input to another pipe.

riko pipes come in two flavors; operators and processors. operators operate on an entire stream at once and are unable to handle individual items. Example operators include pipecount, pipefilter, and pipereverse.

>>> from riko.modules.pipereverse import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'title': 'riko pt. 2'}

processors process individual items and can be parallelized across threads or processes. Example processors include pipefetchsitefeed, pipehash, pipeitembuilder, and piperegex.

>>> from riko.modules.pipehash import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> stream = pipe(item, field='title')
>>> next(stream)
{'title': 'riko pt. 1', 'hash': 2853617420}

Some processors, e.g., pipestringtokenizer, return multiple results.

>>> from riko.modules.pipestringtokenizer import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> tokenizer_conf = {'delimiter': ' '}
>>> stream = pipe(item, conf=tokenizer_conf, field='title')
>>> next(stream)
{
    'title': 'riko pt. 1',
    'stringtokenizer': [
        {'content': 'riko'},
        {'content': 'pt.'},
        {'content': '1'}]}

>>> # In this case, if we just want the result, we can `emit` it instead
>>> stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
>>> next(stream)
{'content': 'riko'}

operators are split into sub-types of aggregators and composers. aggregators, e.g., pipecount, combine all items of an input stream into a new stream with a single item; while composers, e.g., pipefilter, create a new stream containing some or all items of an input stream.

>>> from riko.modules.pipecount import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'count': 2}

processors are split into sub-types of source and transformer. sources, e.g., pipeitembuilder, can create a stream while transformers, e.g. pipehash can only transform items in a stream.

>>> from riko.modules.pipeitembuilder import pipe
>>>
>>> attrs = {'key': 'title', 'value': 'riko pt. 1'}
>>> next(pipe(conf={'attrs': attrs}))
{'title': 'riko pt. 1'}

The following table summaries these observations:

type sub-type input output parallelizable? creates streams?
operator

aggregator

-------------+

composer

stream

--------+

stream

stream14

-------------+

stream

-----------------+ ------------------+
processor

source

-------------+

transformer

item

--------+

item

stream

-------------+

stream

-----------------+

------------------+

If you are unsure of the type of pipe you have, check its metadata.

>>> from riko.modules.pipefetchpage import asyncPipe
>>> from riko.modules.pipecount import pipe
>>>
>>> asyncPipe.__dict__
{'type': 'processor', 'name': 'fetchpage', 'sub_type': 'source'}
>>> pipe.__dict__
{'type': 'operator', 'name': 'count', 'sub_type': 'aggregator'}

The SyncPipe and AsyncPipe classes (among other things) perform this check for you to allow for convenient method chaining and transparent parallelization.

>>> from riko.lib.collections import SyncPipe
>>>
>>> attrs = [
...     {'key': 'title', 'value': 'riko pt. 1'},
...     {'key': 'content', 'value': "Let's talk about riko!"}]
>>> flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
>>> flow.list[0]
[
    {
        'title': 'riko pt. 1',
        'content': "Let's talk about riko!",
        'hash': 1346301218}]

Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.

Notes

Command-line Interface

riko provides a command, runpipe, to execute workflows. A workflow is simply a file containing a function named pipe that creates a flow and processes the resulting stream.

CLI Setup

flow.py

from __future__ import print_function
from riko.lib.collections import SyncPipe

conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}

def pipe(test=False):
    flow = (SyncPipe('itembuilder', conf=conf1, test=test)
        .strreplace(conf=conf2))

    stream = flow.output
    for i in stream:
        print(i)

CLI Usage

Now to execute flow.py, type the command runpipe flow. You should then see the following output in your terminal:

https://google.co.uk

runpipe will also search the examples directory for workflows. Type runpipe demo and you should see the following output:

something...

Scripts

riko comes with a built in task manager manage.

Setup

pip install riko[develop]

Examples

Run python linter and nose tests

manage lint
manage test

Contributing

Please mimic the coding style/conventions used in this repo. If you add new classes or functions, please add the appropriate doc blocks with examples. Also, make sure the python linter and nose tests pass.

Please see the contributing doc for more details.

Credits

Shoutout to pipe2py for heavily inspiring riko. riko started out as a fork of pipe2py, but has since diverged so much that little (if any) of the original code-base remains.

More Info

Project Structure

┌── benchmarks
│   ├── __init__.py
│   └── parallel.py
├── bin
│   └── run
├── data/*
├── docs
│   ├── AUTHORS.rst
│   ├── CHANGES.rst
│   ├── COOKBOOK.rst
│   ├── FAQ.rst
│   ├── INSTALLATION.rst
│   └── TODO.rst
├── examples/*
├── helpers/*
├── riko
│   ├── __init__.py
│   ├── lib
│   │   ├── __init__.py
│   │   ├── autorss.py
│   │   ├── collections.py
│   │   ├── dotdict.py
│   │   ├── log.py
│   │   ├── tags.py
│   │   └── utils.py
│   ├── modules/*
│   └── twisted
│       ├── __init__.py
│       ├── collections.py
│       └── utils.py
├── tests
│   ├── __init__.py
│   ├── standard.rc
│   └── test_examples.py
├── CONTRIBUTING.rst
├── dev-requirements.txt
├── LICENSE
├── Makefile
├── manage.py
├── MANIFEST.in
├── optional-requirements.txt
├── py2-requirements.txt
├── README.rst
├── requirements.txt
├── setup.cfg
├── setup.py
└── tox.ini

License

riko is distributed under the MIT License.


  1. Really Simple Syndication

  2. Mashup (web application hybrid)

  3. If lxml isn't present, riko will default to the builtin Python xml parser

  4. Yahoo discontinued Yahoo! Pipes in 2015, but you can view what remains

  5. Huginn, Flink, Spark, and Storm

  6. You can mitigate this via the split module

  7. Doesn't depend on outside services like MySQL, Kafka, YARN, ZooKeeper, or Mesos

  8. Complex Event Processing

  9. Huginn doesn't appear to make async web requests

  10. Many frameworks can't parse RSS streams without the use of 3rd party libraries

  11. While most frameworks offer a local mode, many require integrating with a data ingestor (e.g., Flume/Kafka) to do anything useful

  12. I can't find evidence that these frameworks offer an async api (apparently Spark doesn't)

  13. You can instead enable a ProcessPool by additionally passing threads=False to SyncPipe, i.e., SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False).

  14. the output stream of an aggregator is an iterator of only 1 item.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment