Skip to content

Instantly share code, notes, and snippets.

@lJoublanc
Created December 29, 2014 14:50
Show Gist options
  • Save lJoublanc/c8591cd8e918024d505a to your computer and use it in GitHub Desktop.
Save lJoublanc/c8591cd8e918024d505a to your computer and use it in GitHub Desktop.
Serializing pandas df to mongodb using existing msgpack packers
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:6ed4f406d673811a2e33ede715811d88c22ccb308f597828557355ec72785ee7"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"import numpy as np\n",
"import pandas as pd"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"orderbook = pd.DataFrame( np.around(np.random.uniform(size=(4,10)),4)*100000, \n",
" pd.date_range(\"2014-12-15 10:07:00\", periods=4, freq=\"s\"), \n",
" columns=pd.MultiIndex.from_arrays( [[\"bid\"]*5 + [\"ask\"]*5,\n",
" np.linspace(95.,99.5,10)]))"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"orderbook = pd.DataFrame( np.random.rand(3,2), pd.date_range(\"2014-12-15 10:07:00\", periods=3, freq=\"s\"), columns=[\"bid\",\"offer\"])"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print(orderbook)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"records = orderbook.to_records()\n",
"records"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time orderbook.to_msgpack(\"msgpacktest.bin\")"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An example using Blosc for compression\n",
"--------------------------------------\n",
"\n",
"This is pretty messy as we are using all the msgpack functionality to pack the dataframe, but overriding the serialization of numpy ararys with `Binary`, which by default would return a string (latin1 encoded)."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import pandas.io.packers\n",
"from pandas.io.packers import encode\n",
"from pandas.io.packers import pack"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#Monkey patch!\n",
"pandas.io.packers.compressor = \"blosc\""
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from bson.binary import Binary\n",
"import blosc\n",
"# ripped this out of pandas.io.packers\n",
"def myConvert(values):\n",
" \"\"\" convert the numpy values to a list \"\"\"\n",
"\n",
" dtype = values.dtype\n",
" if pandas.io.packers.needs_i8_conversion(dtype):\n",
" values = values.view('i8')\n",
" v = values.ravel()\n",
"\n",
" # convert object\n",
" if dtype == np.object_:\n",
" return v.tolist()\n",
"\n",
" if pandas.io.packers.compressor == 'zlib':\n",
"\n",
" # return string arrays like they are\n",
" if dtype == np.object_:\n",
" return v.tolist()\n",
"\n",
" # convert to a bytes array\n",
" v = v.tostring()\n",
" return zlib.compress(v)\n",
"\n",
" elif pandas.io.packers.compressor == 'blosc' and True:\n",
"\n",
" # return string arrays like they are\n",
" if dtype == np.object_:\n",
" return v.tolist()\n",
"\n",
" # convert to a bytes array\n",
" v = v.tostring()\n",
" return Binary(blosc.compress(v, typesize=dtype.itemsize))\n",
"\n",
" # ndarray (on original dtype)\n",
" return v.tostring()\n",
"pandas.io.packers.convert = myConvert"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def curseRecursion(iterable):\n",
" if isinstance(iterable,dict):\n",
" return dict(zip(iterable.keys(),map(curseRecursion,iterable.itervalues())))\n",
" elif isinstance(iterable,list):\n",
" return map(encode,map(curseRecursion,iterable))\n",
" else:\n",
" return encode(iterable)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"encodedOrderbook = curseRecursion(encode(orderbook))\n",
"encodedOrderbook"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"orderbook.to_json()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now save it to the DB"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import pymongo"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"mongocli = pymongo.MongoClient()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"db = mongocli.db[\"dev\"]"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"db.collection[\"historical_prices\"].insert(encodedOrderbook)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Try a more serious example"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fileList = [ \"../data/EURJPY_20140514.csv\"]\n",
"data = {}\n",
"for f in fileList :\n",
" print(\"Processing \" + f)\n",
" data[f] = pd.read_csv(f,header=[0,1,2],parse_dates=[0])\n",
" data[f].drop_duplicates(cols=data[f].columns[0],take_last=True,inplace=True)\n",
" data[f].set_index(data[f].columns[0],inplace=True)\n",
" data[f].index.name = \"Timestamp\"\n",
" if type(data[f].index) != pd.DatetimeIndex :\n",
" print(\"It seems that timestamps weren't correctly parsed for \" + f)\n",
"\n",
"ccy = pd.concat(data.values())"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import sys\n",
"print(\"Num records : %i, size : %i\" % (len(ccy),ccy.values.size))\n",
"ccy.head()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time encodedOrderbook = curseRecursion(encode(ccy))"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time db.collection[\"historical_prices\"].insert(encodedOrderbook)"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment