Created
June 18, 2010 22:00
-
-
Save ctriolo/444285 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ----------------------------------------------- # | |
# Title: nginx-gridfs-test.py | |
# Author: Christopher Triolo | |
# Description: This tests the nginx-gridfs module. | |
# ----------------------------------------------- # | |
import StringIO | |
import random | |
import pycurl | |
from pymongo import Connection | |
from gridfs import GridFS | |
# ------- TEST CONFIGURATION ------- # | |
HOSTNAME = "127.0.0.1:80/" | |
FILENAME_LOC = "gridfs/filename/" | |
AUTH_REQUIRED = True | |
USER = "user" | |
PASSWORD = "pass" | |
_ID_LOC = "gridfs/_id/" | |
UPPER_BOUND = 1024*256 * 4 | |
LOWER_BOUND = 1024*128 | |
DELTA = 1024*128 | |
TYPES = ["application/pdf", "application/zip", "audio/midi", "audio/mpeg", "images/jpeg", "images/png", "image/tiff", "text/plain", "text/html", "text/richtext", "video/mp4", "video/mpeg"] | |
POSSIBLE_CONTENT = "abcdefghijklmonpqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ" | |
#------------------------------------# | |
# Test access by _id # | |
#------------------------------------# | |
def test_access_by_id( fs ): | |
for size in range(LOWER_BOUND, UPPER_BOUND, DELTA): | |
data = "" | |
file = fs.new_file(content_type="text/plain", filename="garbage") | |
for i in range(size): | |
data = data + random.choice(POSSIBLE_CONTENT) | |
file.write(data) | |
file.close() | |
b = StringIO.StringIO() | |
c = pycurl.Curl() | |
c.setopt(pycurl.URL, HOSTNAME + _ID_LOC + str(file._id)) | |
c.setopt(pycurl.WRITEFUNCTION, b.write) | |
c.perform() | |
if data != b.getvalue(): | |
print "Data inconsistent! Size: " + str(size) + " Access by: _id" | |
else : print "Passed! Size: " + str(size) + " Access by: _id" | |
fs.delete(file._id) | |
#------------------------------------# | |
# Test access by filename # | |
#------------------------------------# | |
def test_access_by_filename( fs ): | |
for size in range(LOWER_BOUND, UPPER_BOUND, DELTA): | |
data = "" | |
file = fs.new_file(content_type="text/plain", filename="garbage") | |
for i in range(size): | |
data = data + random.choice(POSSIBLE_CONTENT) | |
file.write(data) | |
file.close() | |
b = StringIO.StringIO() | |
c = pycurl.Curl() | |
c.setopt(pycurl.URL, HOSTNAME + FILENAME_LOC + "garbage") | |
c.setopt(pycurl.WRITEFUNCTION, b.write) | |
c.perform() | |
if data != b.getvalue(): | |
print "Data inconsistent! Size: " + str(size) + " Access by: filename" | |
else : print "Passed! Size: " + str(size) + " Access by: filename" | |
fs.delete(file._id) | |
#------------------------------------# | |
# Test content types # | |
#------------------------------------# | |
def test_content_types( fs ): | |
for type in TYPES: | |
file = fs.new_file(content_type=type) | |
file.write("hello, world!") | |
file.close() | |
b = StringIO.StringIO() | |
c = pycurl.Curl() | |
c.setopt(pycurl.URL, HOSTNAME + _ID_LOC + str(file._id)) | |
c.setopt(pycurl.WRITEFUNCTION, b.write) | |
c.perform() | |
recieved_type = c.getinfo(pycurl.CONTENT_TYPE) | |
if type != recieved_type: print "Wrong Content Type! Expected: " + type + " Recieved: " + recieved_type | |
else : print "Passed! Type: " + type | |
fs.delete(file._id) | |
#------------------------------------# | |
# Test http range request # | |
# DOESNT WORK YET # | |
#------------------------------------# | |
def test_range_request( fs ): | |
data = "" | |
file = fs.new_file(content_type="text/plain", filename="garbage") | |
for i in range(UPPER_BOUND): | |
data = data + random.choice(POSSIBLE_CONTENT) | |
file.write(data) | |
file.close() | |
b = StringIO.StringIO() | |
c = pycurl.Curl() | |
c.setopt(pycurl.URL, HOSTNAME + FILENAME_LOC + "garbage") | |
c.setopt(pycurl.WRITEFUNCTION, b.write) | |
c.setopt(pycurl.RANGE, "10 - 200") | |
c.perform() | |
if data != b.getvalue(): | |
print "Data inconsistent! Size: " + str(UPPER_BOUND) + " Access by: filename" | |
else : print "Passed! Size: " + str(UPPER_BOUND) + " Access by: filename" | |
fs.delete(file._id) | |
#------------------------------------# | |
# Main Script # | |
#------------------------------------# | |
db = Connection().test | |
if (AUTH_REQUIRED): | |
db.authenticate(USER,PASSWORD) | |
fs = GridFS(db) | |
test_access_by_id(fs) | |
test_access_by_filename(fs) | |
test_content_types(fs) | |
#test_range_request(fs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment