Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save LinuxIsCool/15a5df910e9b231dbc36f509cb9ca7dd to your computer and use it in GitHub Desktop.
Save LinuxIsCool/15a5df910e9b231dbc36f509cb9ca7dd to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from datetime import datetime\n",
"import json\n",
"import random\n",
"\n",
"i = 0\n",
"record_names = ['Alice', 'Bob', 'Charlie']\n",
"\n",
"def create_record():\n",
" global i\n",
" i += 1\n",
" record = {'name': random.choice(record_names),\n",
" 'i': i,\n",
" 'x': random.random(),\n",
" 'y': random.randint(0, 10),\n",
" 'time': str(datetime.now())}\n",
" return json.dumps(record)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"create_record()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"type(create_record())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Basic Streams and Map"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from streamz import Stream\n",
"from tornado.ioloop import IOLoop\n",
"\n",
"source = Stream()\n",
"source"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"records = source.map(json.loads)\n",
"records"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"names = records.map(lambda r: r['name'])\n",
"names"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"records.map(lambda r: r['time'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"record = create_record()\n",
"record"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"source.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"record = create_record()\n",
"source.emit(record) # push data into front side of stream"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Continuous updates\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from tornado import gen\n",
"from tornado.ioloop import IOLoop\n",
"\n",
"async def f():\n",
" while True:\n",
" await gen.sleep(0.100)\n",
" record = create_record()\n",
" await source.emit(record, asynchronous=True)\n",
" \n",
"IOLoop.current().add_callback(f)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Accumulators"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"records"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def add(acc, new):\n",
" return acc + new\n",
"\n",
"records.map(lambda d: d['x']).accumulate(add, start=0)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def accumulator(acc, new):\n",
" acc = acc.copy()\n",
" if new in acc:\n",
" acc[new] += 1\n",
" else:\n",
" acc[new] = 1 \n",
" return acc\n",
" \n",
" \n",
"names.accumulate(accumulator, start={})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Streams of Dataframes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"batches = records.timed_window('200ms')\n",
"dfs = batches.map(list).map(pd.DataFrame)\n",
"dfs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def query(df):\n",
" return df[df.name == 'Alice']\n",
"\n",
"def aggregate(acc, new):\n",
" if len(new) == 0:\n",
" return acc\n",
" else:\n",
" return acc + new.x.sum()\n",
"\n",
"dfs.map(query).accumulate(aggregate, start=0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Streaming Dataframes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from streamz.dataframe import DataFrame\n",
"\n",
"example = pd.DataFrame([json.loads(create_record())])\n",
"\n",
"df = DataFrame(stream=dfs, example=example)\n",
"# df.tail(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df[df.name == 'Alice'].x.sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df['time'] = df['time'].astype('M8[ns]')\n",
"df = df.set_index('time')\n",
"df.tail(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.window('5s').groupby('name')[['x', 'y']].mean()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import streamz.dataframe.holoviews"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.window('5s').groupby('name')[['x', 'y']].mean().plot.bar()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.x.plot.hist()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"source.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment