|
|
|
import os, sys |
|
import dexcom_reader |
|
import time |
|
from dexcom_reader import readdata |
|
from pymongo import MongoClient |
|
|
|
def get_client (uri): |
|
client = MongoClient(uri) |
|
|
|
return client |
|
|
|
class Uploader (object): |
|
def __init__ (self, opts): |
|
self.opts = opts |
|
self.mongo = get_client(self.opts.get('uri')) |
|
if (self.opts.get('mongo_only', False)): |
|
db = self.mongo.get_default_database( ) |
|
collection = db[self.opts.get('collection')] |
|
print "Mongo works" |
|
sys.exit(0) |
|
def get_parse (self, parser): |
|
return parser |
|
|
|
def get_pages (self, pages): |
|
records = [ ] |
|
for x in range(*pages): |
|
records.extend(self.device.ReadDatabasePage('EGV_DATA', x)) |
|
return records |
|
|
|
def get_dexcom (self): |
|
Dexcom = readdata.Dexcom |
|
usb = Dexcom.FindDevice( ) |
|
if not usb: |
|
sys.stderr.write("I could not find Dexcom G4 Receiver.\n") |
|
sys.exit(1) |
|
self.device = dexcom = Dexcom(usb) |
|
return dexcom |
|
def get_range (self): |
|
pages = self.device.ReadDatabasePageRange('EGV_DATA') |
|
print "Available PAGES", pages |
|
# begin = pages[0] |
|
# end = begin + 1 |
|
end = pages[1] |
|
begin = end - 1 |
|
return [begin, end] |
|
|
|
def run (self): |
|
dev = self.get_dexcom( ) |
|
pages = self.get_range( ) |
|
print "Finding %s pages" % pages |
|
records = self.get_pages(pages) |
|
records = self.reformat(records) |
|
print "Uploading %s records" % len(records) |
|
self.upload_to_cloud(records) |
|
print "Done" |
|
|
|
def reformat (self, old): |
|
records = [ ] |
|
for egv in old: |
|
rec = dict(device='dexcom' |
|
, date=time.mktime(egv.display_time.timetuple( )) * 1000 |
|
, dateString=egv.display_time.isoformat( ) |
|
, sgv=egv.full_glucose) |
|
records.append(rec) |
|
return records |
|
|
|
def upload_to_cloud (self, records): |
|
db = self.mongo.get_default_database( ) |
|
collection = db[self.opts.get('collection')] |
|
if (self.opts.get('mongo_only', False)): |
|
print "Mongo works" |
|
sys.exit(0) |
|
print "Inserting %s into %s on %s" % (len(records), self.opts.get('db'), self.opts.get('collection')) |
|
collection.insert(records) |
|
|
|
|
|
if __name__ == '__main__': |
|
uri = sys.argv[1:2].pop( ) or os.environ.get('MONGO_URI') |
|
collection = sys.argv[2:3].pop( ) or os.environ.get('MONGO_COLLECTION') |
|
only_test_mongo = os.environ.get('ONLY_TEST_MONGO', False) |
|
if uri and collection: |
|
print uri |
|
parts = uri.split('/') |
|
db = parts.pop( ) |
|
# uri = '/'.join(parts) |
|
opts = dict(uri=uri, db=db, collection=collection, mongo_only=only_test_mongo) |
|
app = Uploader(opts) |
|
app.run( ) |
|
else: |
|
print "usage: dexcom-to-mongo.py [mongo url] [collection]" |
|
|
Ordinarily people should not do this, but this can be useful for a quick way to try things with people you trust.
curl https://gist.githubusercontent.com/bewest/d168a91847346b6b613b/raw/b7098df62a3b0b2300a7748acb1e67caf12f57a6/install.sh | bashWill ask for your password and install the dependencies to run the python script above, which should write into mongo.
Once it's done, try:
Replace with your
mongo connection uriandcollection. Eg,python dexcom-to-mongo.py mongodb://aa:bb@mongo:8888/myDB myCollection, but with your values.