Skip to content

Instantly share code, notes, and snippets.

@kmerenkov
Created March 6, 2012 17:10
Show Gist options
  • Save kmerenkov/1987548 to your computer and use it in GitHub Desktop.
Save kmerenkov/1987548 to your computer and use it in GitHub Desktop.
Соединятор с редисом, можно брать соединения для read+write или для readonly операций.
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import itertools
import redis
import logging
from django.core.cache import cache
log = logging.getLogger('db.connection')
READ_WRITE = 1
READ = 0
ERR_NO_CONN = u"Cannot connect to database"
MASTER_STATE_KEY = 'master_is_down'
def error_msg_from_config(config):
return ERR_NO_CONN + ': ' + ('%s:%s' % (config.get('host', '<unknown>'), config.get('port', '<unknown>')))
class DatabaseIsLoading(Exception):
pass
class NoConnection(Exception):
pass
class Connection(object):
def __init__(self, servers, master_death_timeout=15, client_constructor=None):
if not servers:
raise ValueError("servers can't be empty")
self.master = None
self.master_kwargs = servers[0]
self.death_timeout = master_death_timeout
self.client_constructor = client_constructor or redis.Redis
slaves = servers[1:]
self.slaves_count = len(slaves)
self.slaves = itertools.cycle(slaves)
self.current_client = None
def get_connection(self, server_kwargs):
try:
client = self.client_constructor(**server_kwargs)
self.is_alive(client)
return client
except (redis.ConnectionError, DatabaseIsLoading):
# log_warning()
return None
def is_alive(self, client):
try:
client.ping()
except redis.ConnectionError, e:
# log_warning()
if unicode(e).startswith(u'LOADING:'):
raise DatabaseIsLoading(u'%s:%s is loading dataset' % (client.host, client.port))
raise
def forget_master_state(self):
forget_master_state()
def master_is_down(self):
if self.death_timeout: # NOTE такой костыль для тестов
cache.set(MASTER_STATE_KEY, 'yes', self.death_timeout)
def get_master_connection(self):
# Если соединение с мастером есть...
if self.master:
try:
# ...то проверяем, живо ли оно...
self.is_alive(self.master)
cache.delete(MASTER_STATE_KEY)
log.debug("get_master_connection: using master, old connection is alive")
# ...и возвращаем его.
return self.master
except (redis.ConnectionError, DatabaseIsLoading):
# В противном случае, помечаем мастер как мёртвый
self.master_is_down()
self.master = None
return None
else:
master_is_down = cache.get(MASTER_STATE_KEY)
if not master_is_down:
log.debug("get_master_connection: trying to use master again (it's time)")
self.master = self.get_connection(self.master_kwargs)
# Если соединение к мастеру так и не установилось (get_connection возвращает None в случае неудачи)...
if not self.master:
# ...то снова помечаем мастер как мёртвый.
log.warn("get_master_connection: failed to get master connection")
self.master_is_down()
return None
# Всё хорошо, отдаём соединение наружу :-)
log.info("get_master_connection: using master, new connection")
cache.delete(MASTER_STATE_KEY)
return self.master
def get_rw_connection(self):
master = self.get_master_connection()
if not master:
log.error("get_rw_connection: failed to get rw connection")
raise NoConnection(error_msg_from_config(self.master_kwargs))
return master
def get_ro_connection(self):
log.debug("get_ro_connection: first trying to use master connection")
master = self.get_master_connection()
if master:
log.debug("get_ro_connection: using master connection")
return (READ_WRITE, master)
log.debug("get_ro_connection: failed to use master connection")
if self.current_client:
try:
self.is_alive(self.current_client)
log.debug("get_ro_connection: found working connection")
return (READ, self.current_client)
except redis.ConnectionError:
pass
except DatabaseIsLoading:
# Т.к. слейв ответил, что он загружает данные (99%, что из мастера)
# то пора переключаться на мастер.
# NOTE Тут может возникнуть полный отказ при схеме с двумя слейвами в следующем сценарии
# 1. Мастер не работает
# 2. Слейв перезагрузили (щито)
# 3. На второй слейв мы никогда не переключимся, ибо будем туда-сюда скакать между мастером и слейвом,
# т.к. первый не доступен и мы переключаемся на слейв, а второй LOADING.
# Худший вариант: Не работаем ровно столько времени, сколько слейв загружает данные
# Лучший вариант: Переключились на мастер сразу же после его поднятия
self.forget_master_state()
master = self.get_master_connection()
if master:
return (READ_WRITE, master)
log.warn("get_ro_connection: lost connection")
client_kwargs = {}
for _ in xrange(self.slaves_count):
client_kwargs = self.slaves.next()
client = self.get_connection(client_kwargs)
if client:
log.debug("get_ro_connection: found working connection")
self.current_client = client
return (READ, client)
log.error("get_rw_connection: failed to get ro connection")
raise NoConnection(error_msg_from_config(client_kwargs))
def forget_master_state():
log.debug("forget_master_state: reseted died_at time")
cache.delete(MASTER_STATE_KEY)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment