Skip to content

Instantly share code, notes, and snippets.

@adoc
Created July 25, 2014 22:29
Show Gist options
  • Save adoc/4694d8072fb73b335d92 to your computer and use it in GitHub Desktop.
Save adoc/4694d8072fb73b335d92 to your computer and use it in GitHub Desktop.
formencode upload / image validators
#---
import os
import shutil
import pickle
import cgi
import tempfile
import formencode
import hashlib
import base64
class PersistentUpload(formencode.validators.FieldStorageUploadConverter):
"""Validate an upload and store it.
"""
messages = {
'too_large':'The image uploaded is too large.'}
min = 1024
#max = 10 * 1024 ** 2
max = 120 * 1024
dir = tempfile.gettempdir()
fileprefix = ''
upload_key = "upload"
stored_key = "stored"
file_key = "file"
@property
def checksum_filepath(self):
return os.path.join(self.dir, 'checksum.dat')
checksum_blocksize = 8192
checksum_map = {}
allow_duplicate = True
def __init__(self, *args, **kwa):
if 'min' in kwa:
self.min = kwa.pop('min')
if 'max' in kwa:
self.max = kwa.pop('max')
if 'dir' in kwa:
self.dir = kwa.pop('dir')
if 'upload_key' in kwa:
self.upload_key = kwa.pop('upload_key')
if 'stored_key' in kwa:
self.stored_key = kwa.pop('stored_key')
if 'checksum_filepath' in kwa:
self.checksum_filepath = kwa.pop('checksum_filepath')
if 'checksum_blocksize' in kwa:
self.checksum_blocksize = kwa.pop('checksum_blocksize')
if 'allow_duplicate' in kwa:
self.allow_duplicate = kwa.pop('allow_duplicate')
self.checksum_map = {}
try:
checksum_file = open(self.checksum_filepath, "rb")
except FileNotFoundError:
self.checksum_file = open(self.checksum_filepath, "wb")
else:
try:
self.checksum_map = pickle.load(checksum_file)
except EOFError:
pass
self.checksum_file = open(self.checksum_filepath, "r+b")
(formencode.validators.FieldStorageUploadConverter.
__init__(self, *args, **kwa))
def _checksum(self, fieldobj, rewind=True):
md5 = hashlib.md5()
fp = fieldobj.file
for block in iter(
lambda: fp.read(self.checksum_blocksize), ''):
if not block:
break
md5.update(block)
if rewind is True:
fieldobj.file.seek(0)
return md5.digest()
def _checkdupe(self, fieldobj):
digest = self._checksum(fieldobj)
if digest in self.checksum_map:
# We already have the file.
fieldobj.filename = self.checksum_map[digest]
return True
else:
self.checksum_map[digest] = fieldobj.filename
pickle.dump(self.checksum_map, self.checksum_file)
self.checksum_file.seek(0)
def _persist(self, fieldobj):
shutil.copyfileobj(fieldobj.file,
open(os.path.join(self.dir, fieldobj.filename), 'wb'))
def _retreive(self, filename):
fieldobj = cgi.FieldStorage()
fieldobj.file = open(os.path.join(self.dir, filename), 'rb')
fieldobj.filename = filename
#fieldobj.mimetype = ?
return fieldobj
def _convert_to_python(self, value, state):
value = (formencode.validators.FieldStorageUploadConverter.
_convert_to_python(self, value, state))
# Let's save the file.
uploadobj = value.get(self.upload_key)
fieldstored = value.get(self.stored_key)
if isinstance(uploadobj, cgi.FieldStorage):
# Do some validation here.
if uploadobj.bytes_read > self.max:
raise formencode.Invalid(self.message('too_large', state),
value, state)
(formencode.validators.UnicodeString(max=256, min=3).
to_python(uploadobj.filename))
if (self.allow_duplicate is True or
self._checkdupe(uploadobj) is not True):
self._persist(uploadobj)
value[self.file_key] = uploadobj
value[self.stored_key] = uploadobj.filename
elif fieldstored is not None:
fieldobj = self._retreive(fieldstored)
value[self.file_key] = fieldobj
value[self.stored_key] = fieldobj.filename
else:
return None
return value
#---
import os
import PIL
class ImageUpload(PersistentUpload):
"""
"""
dir = os.path.join(tempfile.gettempdir(), 'image_upload')
format = "JPEG"
quality = 95
lower_ext = True
random_name = True
random_length = 8
unlink_upload = False
make_dir = True
upload_key = "upload"
storage_key = "image"
def __init__(self, *args, **kwa):
if 'format' in kwa:
self.convert_format = kwa.pop('convert_format')
if 'quality' in kwa:
self.convert_quality = kwa.pop('convert_quality')
if 'lower_ext' in kwa:
self.lower_ext = kwa.pop('lower_ext')
if 'random_name' in kwa:
self.random_name = kwa.pop('random_name')
if 'random_length' in kwa:
self.random_length = kwa.pop('random_length')
if 'keep_upload' in kwa:
self.unlink_upload = kwa.pop('unlink_upload')
if 'make_dir' in kwa:
self.make_dir = kwa.pop('make_dir')
if self.make_dir is True:
if not os.path.exists(self.dir):
os.makedirs(self.dir)
PersistentUpload.__init__(self, *args, **kwa)
def _persist(self, fieldobj):
image = PIL.Image.open(fieldobj.file)
image.convert("RGB")
image.save(os.path.join(self.dir, fieldobj.filename), self.format,
quality=self.quality)
return fieldobj
def _convert_to_python(self, value, state):
fieldobj = value.get(self.upload_key)
if isinstance(fieldobj, cgi.FieldStorage):
filename, fileext = os.path.splitext(fieldobj.filename)
if self.lower_ext is True:
fileext = fileext.lower()
if self.random_name is True:
filename = oae.core.util.random_filename(self.random_length)
fieldobj.filename = filename+fileext
return PersistentUpload._convert_to_python(self, value, state)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment