Skip to content

Instantly share code, notes, and snippets.

@swilcox
Created May 2, 2017 13:26
Show Gist options
  • Save swilcox/e203daf58f1c9914f4f64458780a844b to your computer and use it in GitHub Desktop.
Save swilcox/e203daf58f1c9914f4f64458780a844b to your computer and use it in GitHub Desktop.
Getting a connection to redis
import redis
import mockredis
from django.conf import settings
from django.core.signals import request_finished
try:
from eventlet.corolocal import local
except ImportError:
from threading import local
REDIS_URL = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
REDIS_LOCAL = local()
def get_redis():
try:
return REDIS_LOCAL.client
except AttributeError:
if settings.TESTING:
client = mockredis.mock_redis_client()
else:
client = redis.from_url(REDIS_URL)
REDIS_LOCAL.client = client
return client
def cleanup(sender, **kwargs):
try:
client = REDIS_LOCAL.client
del REDIS_LOCAL.client
except AttributeError:
return
try:
client.disconnect()
except (KeyboardInterrupt, SystemExit, MemoryError):
raise
except:
pass
request_finished.connect(cleanup)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment