Created
July 25, 2014 22:29
-
-
Save adoc/4694d8072fb73b335d92 to your computer and use it in GitHub Desktop.
formencode upload / image validators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#--- | |
import os | |
import shutil | |
import pickle | |
import cgi | |
import tempfile | |
import formencode | |
import hashlib | |
import base64 | |
class PersistentUpload(formencode.validators.FieldStorageUploadConverter): | |
"""Validate an upload and store it. | |
""" | |
messages = { | |
'too_large':'The image uploaded is too large.'} | |
min = 1024 | |
#max = 10 * 1024 ** 2 | |
max = 120 * 1024 | |
dir = tempfile.gettempdir() | |
fileprefix = '' | |
upload_key = "upload" | |
stored_key = "stored" | |
file_key = "file" | |
@property | |
def checksum_filepath(self): | |
return os.path.join(self.dir, 'checksum.dat') | |
checksum_blocksize = 8192 | |
checksum_map = {} | |
allow_duplicate = True | |
def __init__(self, *args, **kwa): | |
if 'min' in kwa: | |
self.min = kwa.pop('min') | |
if 'max' in kwa: | |
self.max = kwa.pop('max') | |
if 'dir' in kwa: | |
self.dir = kwa.pop('dir') | |
if 'upload_key' in kwa: | |
self.upload_key = kwa.pop('upload_key') | |
if 'stored_key' in kwa: | |
self.stored_key = kwa.pop('stored_key') | |
if 'checksum_filepath' in kwa: | |
self.checksum_filepath = kwa.pop('checksum_filepath') | |
if 'checksum_blocksize' in kwa: | |
self.checksum_blocksize = kwa.pop('checksum_blocksize') | |
if 'allow_duplicate' in kwa: | |
self.allow_duplicate = kwa.pop('allow_duplicate') | |
self.checksum_map = {} | |
try: | |
checksum_file = open(self.checksum_filepath, "rb") | |
except FileNotFoundError: | |
self.checksum_file = open(self.checksum_filepath, "wb") | |
else: | |
try: | |
self.checksum_map = pickle.load(checksum_file) | |
except EOFError: | |
pass | |
self.checksum_file = open(self.checksum_filepath, "r+b") | |
(formencode.validators.FieldStorageUploadConverter. | |
__init__(self, *args, **kwa)) | |
def _checksum(self, fieldobj, rewind=True): | |
md5 = hashlib.md5() | |
fp = fieldobj.file | |
for block in iter( | |
lambda: fp.read(self.checksum_blocksize), ''): | |
if not block: | |
break | |
md5.update(block) | |
if rewind is True: | |
fieldobj.file.seek(0) | |
return md5.digest() | |
def _checkdupe(self, fieldobj): | |
digest = self._checksum(fieldobj) | |
if digest in self.checksum_map: | |
# We already have the file. | |
fieldobj.filename = self.checksum_map[digest] | |
return True | |
else: | |
self.checksum_map[digest] = fieldobj.filename | |
pickle.dump(self.checksum_map, self.checksum_file) | |
self.checksum_file.seek(0) | |
def _persist(self, fieldobj): | |
shutil.copyfileobj(fieldobj.file, | |
open(os.path.join(self.dir, fieldobj.filename), 'wb')) | |
def _retreive(self, filename): | |
fieldobj = cgi.FieldStorage() | |
fieldobj.file = open(os.path.join(self.dir, filename), 'rb') | |
fieldobj.filename = filename | |
#fieldobj.mimetype = ? | |
return fieldobj | |
def _convert_to_python(self, value, state): | |
value = (formencode.validators.FieldStorageUploadConverter. | |
_convert_to_python(self, value, state)) | |
# Let's save the file. | |
uploadobj = value.get(self.upload_key) | |
fieldstored = value.get(self.stored_key) | |
if isinstance(uploadobj, cgi.FieldStorage): | |
# Do some validation here. | |
if uploadobj.bytes_read > self.max: | |
raise formencode.Invalid(self.message('too_large', state), | |
value, state) | |
(formencode.validators.UnicodeString(max=256, min=3). | |
to_python(uploadobj.filename)) | |
if (self.allow_duplicate is True or | |
self._checkdupe(uploadobj) is not True): | |
self._persist(uploadobj) | |
value[self.file_key] = uploadobj | |
value[self.stored_key] = uploadobj.filename | |
elif fieldstored is not None: | |
fieldobj = self._retreive(fieldstored) | |
value[self.file_key] = fieldobj | |
value[self.stored_key] = fieldobj.filename | |
else: | |
return None | |
return value | |
#--- | |
import os | |
import PIL | |
class ImageUpload(PersistentUpload): | |
""" | |
""" | |
dir = os.path.join(tempfile.gettempdir(), 'image_upload') | |
format = "JPEG" | |
quality = 95 | |
lower_ext = True | |
random_name = True | |
random_length = 8 | |
unlink_upload = False | |
make_dir = True | |
upload_key = "upload" | |
storage_key = "image" | |
def __init__(self, *args, **kwa): | |
if 'format' in kwa: | |
self.convert_format = kwa.pop('convert_format') | |
if 'quality' in kwa: | |
self.convert_quality = kwa.pop('convert_quality') | |
if 'lower_ext' in kwa: | |
self.lower_ext = kwa.pop('lower_ext') | |
if 'random_name' in kwa: | |
self.random_name = kwa.pop('random_name') | |
if 'random_length' in kwa: | |
self.random_length = kwa.pop('random_length') | |
if 'keep_upload' in kwa: | |
self.unlink_upload = kwa.pop('unlink_upload') | |
if 'make_dir' in kwa: | |
self.make_dir = kwa.pop('make_dir') | |
if self.make_dir is True: | |
if not os.path.exists(self.dir): | |
os.makedirs(self.dir) | |
PersistentUpload.__init__(self, *args, **kwa) | |
def _persist(self, fieldobj): | |
image = PIL.Image.open(fieldobj.file) | |
image.convert("RGB") | |
image.save(os.path.join(self.dir, fieldobj.filename), self.format, | |
quality=self.quality) | |
return fieldobj | |
def _convert_to_python(self, value, state): | |
fieldobj = value.get(self.upload_key) | |
if isinstance(fieldobj, cgi.FieldStorage): | |
filename, fileext = os.path.splitext(fieldobj.filename) | |
if self.lower_ext is True: | |
fileext = fileext.lower() | |
if self.random_name is True: | |
filename = oae.core.util.random_filename(self.random_length) | |
fieldobj.filename = filename+fileext | |
return PersistentUpload._convert_to_python(self, value, state) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment