Skip to content

Instantly share code, notes, and snippets.

@eldadcohen1
Created April 11, 2021 13:05
Show Gist options
  • Save eldadcohen1/ffd668268beeca920d692304007f41ce to your computer and use it in GitHub Desktop.
Save eldadcohen1/ffd668268beeca920d692304007f41ce to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.7-final"
},
"orig_nbformat": 2,
"kernelspec": {
"name": "python3",
"display_name": "Python 3.7.7 64-bit ('streamz': pipenv)",
"metadata": {
"interpreter": {
"hash": "b85b4cc3b9c9728aed8495a435dff41b758379143e0bed7a56d08d2a2e37def0"
}
}
}
},
"nbformat": 4,
"nbformat_minor": 2,
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"client = Client()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from streamz import Stream\n",
"stream = Stream()\n",
"dstream = stream.scatter()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from streamz.dataframe import Aggregation\n",
"\n",
"class Mean(Aggregation):\n",
" def initial(self, new):\n",
" state = new.iloc[:0].sum(), new.iloc[:0].count()\n",
" return state\n",
"\n",
" def on_new(self, state, new):\n",
" total, count = state\n",
" total = total + new.sum()\n",
" count = count + new.count()\n",
" new_state = (total, count)\n",
" new_value = total / count\n",
" return new_state, new_value\n",
"\n",
" def on_old(self, state, old):\n",
" total, count = state\n",
" total = total - old.sum() # switch + for - here\n",
" count = count - old.count() # switch + for - here\n",
" new_state = (total, count)\n",
" new_value = total / count\n",
" return new_state, new_value"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "Output()",
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "7442e347b7f64d238260edc061698814"
}
},
"metadata": {}
}
],
"source": [
"import pandas as pd\n",
"from streamz.dataframe import DataFrame\n",
"\n",
"example = pd.DataFrame({'name': [], 'amount': []})\n",
"sdf = DataFrame(dstream, example=example)\n",
"\n",
"sdf[sdf.name == 'Alice'].amount.sum()\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "Output()",
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "ae8dcb1290d04228a374a57c99380c4f"
}
},
"metadata": {}
}
],
"source": [
"sdf[sdf.name == 'Alice'].amount.aggregate(Mean())"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"stream.emit(example.append({'name' : 'Alice' , 'amount':1},ignore_index=True))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment