Skip to content

Instantly share code, notes, and snippets.

@wichert
Last active September 26, 2015 14:17
Show Gist options
  • Save wichert/1109971 to your computer and use it in GitHub Desktop.
Save wichert/1109971 to your computer and use it in GitHub Desktop.
redis deform.interfaces.FileUploadTempStore implementation
import unittest
import pyramid.testing
class RedisTempStoreTests(unittest.TestCase):
def setUp(self):
from .tmpstore import DummyRedis
self.config = pyramid.testing.setUp()
self.config.registry.settings['redis'] = self.redis = DummyRedis()
def tearDown(self):
pyramid.testing.tearDown()
def RedisTempStore(self, *a, **kw):
from voipro.portal.ui.form import RedisTempStore
return RedisTempStore(*a, **kw)
def test_setitem_without_file(self):
store = self.RedisTempStore()
store['dummy'] = {'filename': 'x.py', 'uid': '123456'}
self.assertTrue('tmpstore:dummy' in self.redis.hashes)
data = self.redis.hashes['tmpstore:dummy']
self.assertEqual(set(data.keys()), set(['filename', 'uid']))
def test_setitem_with_None_file(self):
store = self.RedisTempStore()
store['dummy'] = {'filename': 'x.py', 'fp': None}
self.assertTrue('tmpstore:dummy' in self.redis.hashes)
data = self.redis.hashes['tmpstore:dummy']
self.assertEqual(set(data.keys()), set(['filename', 'fp']))
self.assertEqual(data['fp'], None)
def test_setitem_with_file(self):
from cStringIO import StringIO
store = self.RedisTempStore()
store['dummy'] = {'filename': 'x.py',
'fp': StringIO('This is the data')}
self.assertTrue('tmpstore:dummy' in self.redis.hashes)
data = self.redis.hashes['tmpstore:dummy']
self.assertEqual(set(data.keys()), set(['filename', '_data']))
self.assertEqual(data['_data'], 'This is the data')
def test_getitem_unknown_key(self):
store = self.RedisTempStore()
self.assertRaises(KeyError, store.__getitem__, 'dummy')
def test_getitem_with_file(self):
self.redis.hmset('tmpstore:dummy',
{'foo': 'bar', '_data': 'This is the data'})
store = self.RedisTempStore()
data = store['dummy']
self.assertTrue(isinstance(data, dict))
self.assertEqual(set(data.keys()), set(['foo', 'fp']))
self.assertEqual(data['foo'], 'bar')
self.assertEqual(data['fp'].read(), 'This is the data')
def test_getitem_without_file(self):
self.redis.hmset('tmpstore:dummy', {'foo': 'bar'})
store = self.RedisTempStore()
data = store['dummy']
self.assertTrue(isinstance(data, dict))
self.assertEqual(set(data.keys()), set(['foo']))
def test_get_unknown_key_with_default(self):
store = self.RedisTempStore()
marker = []
self.assertTrue(store.get('dummy', marker) is marker)
def test_contains_unknown_key(self):
store = self.RedisTempStore()
self.assertEqual('dummy' in store, False)
def test_contains_known_key(self):
from cStringIO import StringIO
store = self.RedisTempStore()
store['dummy'] = {'filename': 'x.py',
'fp': StringIO('This is the data')}
self.assertEqual('dummy' in store, True)
def test_preview_url_stub(self):
store = self.RedisTempStore()
self.assertEqual(store.preview_url(None), None)
from cStringIO import StringIO
from pyramid.threadlocal import get_current_registry
class RedisTempStore(object):
"""An implementation of :py:class:`deform.interfaces.FileUploadTempStore`.
This implementation stores all data on a redis server.
"""
prefix = 'tmpstore:'
ttl = 3600
def __init__(self):
settings = get_current_registry().settings
self.redis = settings['redis']
def __setitem__(self, name, value):
key = self._key_name(name)
value = value.copy()
if value.get('fp'):
value['_data'] = value.pop('fp').read()
self.redis.hmset(key, value)
self.redis.expire(key, self.ttl)
def __getitem__(self, name):
data = self.get(name)
if data is None:
raise KeyError(name)
return data
def get(self, name, default=None):
key = self._key_name(name)
data = self.redis.hgetall(key)
if not data:
return default
if '_data' in data:
data['fp'] = StringIO(data.pop('_data'))
return data
def __contains__(self, name):
key = self._key_name(name)
return self.redis.exists(key)
def preview_url(self, name):
return None
def _key_name(self, name):
return self.prefix + name
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment