public
Last active

  • Download Gist
nginx-gridfs-test.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
# ----------------------------------------------- #
# Title: nginx-gridfs-test.py
# Author: Christopher Triolo
# Description: This tests the nginx-gridfs module.
# ----------------------------------------------- #
 
import StringIO
import random
import pycurl
from pymongo import Connection
from gridfs import GridFS
 
# ------- TEST CONFIGURATION ------- #
HOSTNAME = "127.0.0.1:80/"
FILENAME_LOC = "gridfs/filename/"
AUTH_REQUIRED = True
USER = "user"
PASSWORD = "pass"
_ID_LOC = "gridfs/_id/"
UPPER_BOUND = 1024*256 * 4
LOWER_BOUND = 1024*128
DELTA = 1024*128
TYPES = ["application/pdf", "application/zip", "audio/midi", "audio/mpeg", "images/jpeg", "images/png", "image/tiff", "text/plain", "text/html", "text/richtext", "video/mp4", "video/mpeg"]
POSSIBLE_CONTENT = "abcdefghijklmonpqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ"
 
#------------------------------------#
# Test access by _id #
#------------------------------------#
def test_access_by_id( fs ):
 
for size in range(LOWER_BOUND, UPPER_BOUND, DELTA):
data = ""
file = fs.new_file(content_type="text/plain", filename="garbage")
for i in range(size):
data = data + random.choice(POSSIBLE_CONTENT)
file.write(data)
file.close()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, HOSTNAME + _ID_LOC + str(file._id))
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.perform()
if data != b.getvalue():
print "Data inconsistent! Size: " + str(size) + " Access by: _id"
else : print "Passed! Size: " + str(size) + " Access by: _id"
fs.delete(file._id)
 
#------------------------------------#
# Test access by filename #
#------------------------------------#
def test_access_by_filename( fs ):
 
for size in range(LOWER_BOUND, UPPER_BOUND, DELTA):
data = ""
file = fs.new_file(content_type="text/plain", filename="garbage")
for i in range(size):
data = data + random.choice(POSSIBLE_CONTENT)
file.write(data)
file.close()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, HOSTNAME + FILENAME_LOC + "garbage")
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.perform()
if data != b.getvalue():
print "Data inconsistent! Size: " + str(size) + " Access by: filename"
else : print "Passed! Size: " + str(size) + " Access by: filename"
fs.delete(file._id)
#------------------------------------#
# Test content types #
#------------------------------------#
def test_content_types( fs ):
 
for type in TYPES:
file = fs.new_file(content_type=type)
file.write("hello, world!")
file.close()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, HOSTNAME + _ID_LOC + str(file._id))
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.perform()
recieved_type = c.getinfo(pycurl.CONTENT_TYPE)
if type != recieved_type: print "Wrong Content Type! Expected: " + type + " Recieved: " + recieved_type
else : print "Passed! Type: " + type
fs.delete(file._id)
#------------------------------------#
# Test http range request #
# DOESNT WORK YET #
#------------------------------------#
def test_range_request( fs ):
data = ""
file = fs.new_file(content_type="text/plain", filename="garbage")
for i in range(UPPER_BOUND):
data = data + random.choice(POSSIBLE_CONTENT)
file.write(data)
file.close()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, HOSTNAME + FILENAME_LOC + "garbage")
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.RANGE, "10 - 200")
c.perform()
if data != b.getvalue():
print "Data inconsistent! Size: " + str(UPPER_BOUND) + " Access by: filename"
else : print "Passed! Size: " + str(UPPER_BOUND) + " Access by: filename"
fs.delete(file._id)
#------------------------------------#
# Main Script #
#------------------------------------#
 
db = Connection().test
if (AUTH_REQUIRED):
db.authenticate(USER,PASSWORD)
fs = GridFS(db)
 
test_access_by_id(fs)
test_access_by_filename(fs)
test_content_types(fs)
#test_range_request(fs)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.