Created
December 29, 2014 14:50
-
-
Save lJoublanc/c8591cd8e918024d505a to your computer and use it in GitHub Desktop.
Serializing pandas df to mongodb using existing msgpack packers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"metadata": { | |
"name": "", | |
"signature": "sha256:6ed4f406d673811a2e33ede715811d88c22ccb308f597828557355ec72785ee7" | |
}, | |
"nbformat": 3, | |
"nbformat_minor": 0, | |
"worksheets": [ | |
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"import numpy as np\n", | |
"import pandas as pd" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"orderbook = pd.DataFrame( np.around(np.random.uniform(size=(4,10)),4)*100000, \n", | |
" pd.date_range(\"2014-12-15 10:07:00\", periods=4, freq=\"s\"), \n", | |
" columns=pd.MultiIndex.from_arrays( [[\"bid\"]*5 + [\"ask\"]*5,\n", | |
" np.linspace(95.,99.5,10)]))" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "raw", | |
"metadata": {}, | |
"source": [ | |
"orderbook = pd.DataFrame( np.random.rand(3,2), pd.date_range(\"2014-12-15 10:07:00\", periods=3, freq=\"s\"), columns=[\"bid\",\"offer\"])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"print(orderbook)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"records = orderbook.to_records()\n", | |
"records" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%time orderbook.to_msgpack(\"msgpacktest.bin\")" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"An example using Blosc for compression\n", | |
"--------------------------------------\n", | |
"\n", | |
"This is pretty messy as we are using all the msgpack functionality to pack the dataframe, but overriding the serialization of numpy ararys with `Binary`, which by default would return a string (latin1 encoded)." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"import pandas.io.packers\n", | |
"from pandas.io.packers import encode\n", | |
"from pandas.io.packers import pack" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"#Monkey patch!\n", | |
"pandas.io.packers.compressor = \"blosc\"" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"from bson.binary import Binary\n", | |
"import blosc\n", | |
"# ripped this out of pandas.io.packers\n", | |
"def myConvert(values):\n", | |
" \"\"\" convert the numpy values to a list \"\"\"\n", | |
"\n", | |
" dtype = values.dtype\n", | |
" if pandas.io.packers.needs_i8_conversion(dtype):\n", | |
" values = values.view('i8')\n", | |
" v = values.ravel()\n", | |
"\n", | |
" # convert object\n", | |
" if dtype == np.object_:\n", | |
" return v.tolist()\n", | |
"\n", | |
" if pandas.io.packers.compressor == 'zlib':\n", | |
"\n", | |
" # return string arrays like they are\n", | |
" if dtype == np.object_:\n", | |
" return v.tolist()\n", | |
"\n", | |
" # convert to a bytes array\n", | |
" v = v.tostring()\n", | |
" return zlib.compress(v)\n", | |
"\n", | |
" elif pandas.io.packers.compressor == 'blosc' and True:\n", | |
"\n", | |
" # return string arrays like they are\n", | |
" if dtype == np.object_:\n", | |
" return v.tolist()\n", | |
"\n", | |
" # convert to a bytes array\n", | |
" v = v.tostring()\n", | |
" return Binary(blosc.compress(v, typesize=dtype.itemsize))\n", | |
"\n", | |
" # ndarray (on original dtype)\n", | |
" return v.tostring()\n", | |
"pandas.io.packers.convert = myConvert" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"def curseRecursion(iterable):\n", | |
" if isinstance(iterable,dict):\n", | |
" return dict(zip(iterable.keys(),map(curseRecursion,iterable.itervalues())))\n", | |
" elif isinstance(iterable,list):\n", | |
" return map(encode,map(curseRecursion,iterable))\n", | |
" else:\n", | |
" return encode(iterable)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"encodedOrderbook = curseRecursion(encode(orderbook))\n", | |
"encodedOrderbook" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"orderbook.to_json()" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Now save it to the DB" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"import pymongo" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"mongocli = pymongo.MongoClient()" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"db = mongocli.db[\"dev\"]" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"db.collection[\"historical_prices\"].insert(encodedOrderbook)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Try a more serious example" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"fileList = [ \"../data/EURJPY_20140514.csv\"]\n", | |
"data = {}\n", | |
"for f in fileList :\n", | |
" print(\"Processing \" + f)\n", | |
" data[f] = pd.read_csv(f,header=[0,1,2],parse_dates=[0])\n", | |
" data[f].drop_duplicates(cols=data[f].columns[0],take_last=True,inplace=True)\n", | |
" data[f].set_index(data[f].columns[0],inplace=True)\n", | |
" data[f].index.name = \"Timestamp\"\n", | |
" if type(data[f].index) != pd.DatetimeIndex :\n", | |
" print(\"It seems that timestamps weren't correctly parsed for \" + f)\n", | |
"\n", | |
"ccy = pd.concat(data.values())" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"import sys\n", | |
"print(\"Num records : %i, size : %i\" % (len(ccy),ccy.values.size))\n", | |
"ccy.head()" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%time encodedOrderbook = curseRecursion(encode(ccy))" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%time db.collection[\"historical_prices\"].insert(encodedOrderbook)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [] | |
} | |
], | |
"metadata": {} | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment