## import the necessary libraries | |
from io import BytesIO | |
from pymongo import MongoClient | |
import gridfs | |
import random | |
from math import ceil | |
## delete all versions of the file | |
def deleteFile(db, fs, name): | |
oldFile = db['fs.files'].find_one({"filename":name}) | |
while(oldFile): | |
fs.delete( oldFile[u'_id'] ) | |
oldFile = db['fs.files'].find_one({"filename":name}) | |
mongoHost = 'localhost' | |
mongoPort = 27017 | |
mongo = MongoClient(mongoHost, mongoPort) | |
db = mongo.workingArea | |
fs = gridfs.GridFS(db) | |
filename = "test.bin" | |
data = [123, 3, 255, 0, 100] | |
metadata= {"a": 1, "b":2, "c":3} | |
deleteFile(db, fs, filename) | |
f = fs.new_file( filename=filename, **metadata ) | |
try: | |
ba = bytearray(data) | |
stream = BytesIO() | |
stream.write(ba) | |
stream.seek(0) | |
f.write(stream) | |
finally: | |
f.close() | |
print "Write binary array to DB" | |
ba = bytearray(fs.get_last_version(filename= filename).read()) | |
allTrue = True | |
for idx, v in enumerate(ba): | |
allTrue &= ( v == data[idx]) | |
print "Write and Retreive validation test: ", allTrue | |
print "" | |
filename = "binaryRandom.bin" | |
deleteFile(db, fs, filename) | |
count = 0 | |
total = 0 | |
def genAndTrackRand(): | |
global count | |
global total | |
v = random.randint(0,255) | |
count = count + 1 | |
total = total + int(v) | |
return v | |
thisId = None | |
f = fs.new_file( filename=filename, **metadata ) | |
thisId = f._id | |
try: | |
for i in range (0, 1000): | |
ba = bytearray([genAndTrackRand() for p in range(0,1024)]) | |
stream = BytesIO() | |
stream.write(ba) | |
stream.seek(0) | |
f.write(stream) | |
finally: | |
f.close() | |
print "Write large binary array to DB" | |
print "" | |
go = fs.get(thisId) | |
fileLength = go.length | |
chunkSize = go.chunk_size | |
chunks = int(ceil(float(fileLength) / chunkSize)) | |
go.close() | |
print "total file size: ", fileLength | |
print "chunk count: ", chunks | |
print "chunk length: ", chunkSize | |
print "" | |
computedCount = 0 | |
computedTotal = 0 | |
## Compute are running total in parts reading one chunk at a time | |
## We'd do this in a multi-threaded / process manner to do more parallel computing | |
## I don't try to get that fancy in this code sample | |
for cIdx in range(0,chunks): | |
go = fs.get(thisId) | |
go.seek( cIdx * chunkSize ) | |
position = go.tell() | |
goodEnd = min([(position + chunkSize -1), fileLength]) | |
## print "reading from ", position, " to ", goodEnd | |
localCount = 0 | |
localTotal = 0 | |
for v in bytearray(go.readchunk()): | |
localCount = localCount + 1 | |
localTotal = localTotal + v | |
## print "now at: ", go.tell() | |
computedCount = computedCount + localCount | |
computedTotal = computedTotal + localTotal | |
go.close() | |
## print confirmation that computed results match expectations | |
print "" | |
print "-- Tracked stat: computed when data was being generated" | |
print "tracked count: ", count | |
print "tracked total: ", total | |
print "" | |
print "compare to" | |
print "" | |
print "-- Computed stat: computed after insertion of binary stream into database" | |
print "computed count: ", computedCount | |
print "computed total: ", computedTotal |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment