This example is meant to demonstrate the separation of concerns between Data
Modeling (i.e. storing things) and business logic (i.e. doing things) by
creating a Python Package called InventoryDB
which implements data models and
can be imported into a script that implements some business logic (i.e. actual work!).
To minimize the amount of effort involved in storing a pandas DataFrame in a MongoDB Document and resroting it to another Pandas DataFrame instance, we start by definig custom field type for storing our pandas DataFrmaes, like so:
# [InventoryDB/fields/DataFrameField.py]
import mongoengine.fields
from numpy import generic
from pandas import DataFrame, MultiIndex
class DataFrameField(mongoengine.fields.DictField):
"""A pandas DataFrame field.
Looks to the outside world like a Pandas.DataFrame, but stores
in the database as an using Pandas.DataFrame.to_dict("list").
"""
def __init__(self, orient="list", *args, **kwargs):
if orient not in ('dict', 'list', 'series', 'split', 'records', 'index'):
raise ValueError(u"orient must be one of ('dict', 'list', 'series', 'split', 'records', 'index') but got: %s")
self.orient = orient
super(DataFrameField, self).__init__(*args, **kwargs)
def __get__(self, instance, owner):
print("__get__:",instance, owner)
return DataFrame.from_dict(_as_native(super(DataFrameField, self).__get__(instance, owner)))
def __set__(self, instance, value):
if value is None or isinstance(value, dict):
return super(DataFrameField, self).__set__(instance, value)
if not isinstance(value, DataFrame):
raise ValueError("value is not a pandas.DataFrame instance")
if isinstance(value.index, MultiIndex):
self.error(u'value.index is a MultiIndex; MultiIndex objects may not be stored in MongoDB. Consider using `value.reset_index()` to flatten')
if isinstance(value.keys(), MultiIndex):
self.error(u'value.keys() is a MultiIndex; MultiIndex objects may not be stored in MongoDB. Consider using `value.unstack().reset_index()` to flatten')
obj = value.to_dict(self.orient)
# coerce numpy objects into python objects for lack of the BSON-numpy package on windows
for col in obj.itervalues():
if len(col) and isinstance(col[0],generic):
for i in range(len(col)):
col[i] = col[i].item()
return super(DataFrameField, self).__set__(instance, obj)
def to_python(self, value):
return value
# A helper function for coercing nested dicts and lists to python lists
# from mongoengine basedict and baselist (which throw Pandas for a loop)
def _as_native(x):
if isinstance(x,dict):
return dict([ (k,_as_native(v)) for k,v in x.iteritems()])
elif isinstance(x,list):
return [ _as_native(v) for v in x]
else:
return x
and this trick lets us import the directly via from InventoryDB.fields import *
:
rather than the slighly obnoxious from InventoryDB.fields.DataFrameField import DataFrameField
:
# [InventoryDB/fields/__init__.py]
from .DataFrameField import DataFrameField
Note, the DataFrameField above has a curious quirk.
Next, we define some models that will be shared by all our business logic scripts. In this vignette we'll only create one Document model, but most databases would have several (or even dozens of) document models.
# [InventoryDB/documents/stock_fill_request.py]
from mongoengine import *
class stock_fill_request(Document):
req_no = IntField(default=False,required=True)
requisition_sent = BooleanField(default=False,required=True)
stock_data = DataFrameField(required=True)
this trick lets us import the our document classes directly from InventoryDB.documents
in our business logic:
# [InventoryDB/documents/__init__.py]
from .stock_fill_request import stock_fill_request
Next, we need an __init__.py
file to make InventoryDB
a python module
# [InventoryDB/__init__.py]
pass
-- or alternatively, we could define a connect method that contains (some of) the connection logic (host, port, db, etc.):
# [InventoryDB/__init__.py]
import mongoengine # includes connect, Document, and the various fields
def connect(test=False,db="Inventory",*args,**kwargs):
if test:
return mongoengine.connect(db,port=12345,host="localhost")
else:
return mongoengine.connect(db,port=port,host=host,*args,**kwargs)
In this script, we'll create a new stock_fill_request
Document and insert
it into the stock_fill_request
collection.
# [insert_a_request.py]
import pandas as pd
import numpy as np
from mongoengine import connect # includes connect, Document, and the various fields
from InventoryDB.documents import stock_fill_request
# CONNECT TO THE DATABASE CALLED "inventory" at the default host and port
connect("inventory")
# alternative: InventoryDB.connect(test=False) # We're dealing with real data here
# alternative: InventoryDB.connect(test=True) # Connect to the test environment
# CREATE THE INVENTORY REQUEST DataFrame
df = pd.DataFrame({
'goods': ['a', 'a', 'b', 'b', 'b'],
'stock': [5, 10, 30, 40, 10],
'category': ['c1', 'c2', 'c1', 'c2', 'c1'],
'date': pd.to_datetime(['2014-01-01', '2014-02-01', '2014-01-06', '2014-02-09', '2014-03-09'])
})
# BUILD THE REQUEST (Document) OBJECT
request_data = stock_fill_request(
req_no = 123,
requisition_sent=False)
request_data.stock_data = df
-- or equivalently --
request_data = stock_fill_request(
req_no = 123,
requisition_sent=False,
stock_data = df)
# NOTE THAT because the `df` object is coerced to a python dictionary on
# assigmentmet (via a the `__set__` method of `DataFrameField`) and a new
# Pandas DataFrame is created when we access the `stock_data` (via a the
# `__get__` method of `DataFrameField`) `df` and request_data.stock_data will
# refer to separate python objects.
print df is request_data.stock_data
#> False
# VALIDATE AND SAVE THE REQUEST
request_data.save()
# [service_a_request.py]
from mongoengine import connect # includes connect, Document, and the various fields
from InventoryDB.documents import stock_fill_request
# CONNECT TO THE DATABASE CALLED "inventory" at the default host and port
connect("inventory")
# alternative: InventoryDB.connect(test=False) # We're dealing with real data here
# alternative: InventoryDB.connect(test=True) # Connect to the test environment
# create a request for unfilled inventory requests
request = stock_fill_request.objects(requisition_sent=False)
# pull the first matching object
cursor = request[0]
# note that the `stock_data` property of our data is an object.
print cursor["stock_data"].__class__.__name__
print cursor["stock_data"]
-- do things --
cursor.requisition_sent = True
cursor.save()
# how many requests exist
stock_fill_request.objects.count()
# how unfilled many requests exist
stock_fill_request.objects(requisition_sent=False).count()
In this example we have a custom field type, which is why we have the custom fields sub-module:
InventoryDB/fields/DataFrameField.py
InventoryDB/fields/__init__.py
InventoryDB/documents/stock_fill_request.py
InventoryDB/documents/__init__.py
InventoryDB/__init__.py
This is likely a more rare scenario, and the definition of our
DataFrameField
class could be moved into a separate module, which would
reduce our InventoryDB
to containing just an __init__.py
exposing the
document classes, and a file for each document class, like so:
InventoryDB/stock_fill_request.py
InventoryDB/__init__.py
Thanks for writing this example. What licence is this code under? Can I use it my project. Specifically the
DataFrameField
.