Created
March 6, 2012 17:10
-
-
Save kmerenkov/1987548 to your computer and use it in GitHub Desktop.
Соединятор с редисом, можно брать соединения для read+write или для readonly операций.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from datetime import datetime, timedelta | |
import itertools | |
import redis | |
import logging | |
from django.core.cache import cache | |
log = logging.getLogger('db.connection') | |
READ_WRITE = 1 | |
READ = 0 | |
ERR_NO_CONN = u"Cannot connect to database" | |
MASTER_STATE_KEY = 'master_is_down' | |
def error_msg_from_config(config): | |
return ERR_NO_CONN + ': ' + ('%s:%s' % (config.get('host', '<unknown>'), config.get('port', '<unknown>'))) | |
class DatabaseIsLoading(Exception): | |
pass | |
class NoConnection(Exception): | |
pass | |
class Connection(object): | |
def __init__(self, servers, master_death_timeout=15, client_constructor=None): | |
if not servers: | |
raise ValueError("servers can't be empty") | |
self.master = None | |
self.master_kwargs = servers[0] | |
self.death_timeout = master_death_timeout | |
self.client_constructor = client_constructor or redis.Redis | |
slaves = servers[1:] | |
self.slaves_count = len(slaves) | |
self.slaves = itertools.cycle(slaves) | |
self.current_client = None | |
def get_connection(self, server_kwargs): | |
try: | |
client = self.client_constructor(**server_kwargs) | |
self.is_alive(client) | |
return client | |
except (redis.ConnectionError, DatabaseIsLoading): | |
# log_warning() | |
return None | |
def is_alive(self, client): | |
try: | |
client.ping() | |
except redis.ConnectionError, e: | |
# log_warning() | |
if unicode(e).startswith(u'LOADING:'): | |
raise DatabaseIsLoading(u'%s:%s is loading dataset' % (client.host, client.port)) | |
raise | |
def forget_master_state(self): | |
forget_master_state() | |
def master_is_down(self): | |
if self.death_timeout: # NOTE такой костыль для тестов | |
cache.set(MASTER_STATE_KEY, 'yes', self.death_timeout) | |
def get_master_connection(self): | |
# Если соединение с мастером есть... | |
if self.master: | |
try: | |
# ...то проверяем, живо ли оно... | |
self.is_alive(self.master) | |
cache.delete(MASTER_STATE_KEY) | |
log.debug("get_master_connection: using master, old connection is alive") | |
# ...и возвращаем его. | |
return self.master | |
except (redis.ConnectionError, DatabaseIsLoading): | |
# В противном случае, помечаем мастер как мёртвый | |
self.master_is_down() | |
self.master = None | |
return None | |
else: | |
master_is_down = cache.get(MASTER_STATE_KEY) | |
if not master_is_down: | |
log.debug("get_master_connection: trying to use master again (it's time)") | |
self.master = self.get_connection(self.master_kwargs) | |
# Если соединение к мастеру так и не установилось (get_connection возвращает None в случае неудачи)... | |
if not self.master: | |
# ...то снова помечаем мастер как мёртвый. | |
log.warn("get_master_connection: failed to get master connection") | |
self.master_is_down() | |
return None | |
# Всё хорошо, отдаём соединение наружу :-) | |
log.info("get_master_connection: using master, new connection") | |
cache.delete(MASTER_STATE_KEY) | |
return self.master | |
def get_rw_connection(self): | |
master = self.get_master_connection() | |
if not master: | |
log.error("get_rw_connection: failed to get rw connection") | |
raise NoConnection(error_msg_from_config(self.master_kwargs)) | |
return master | |
def get_ro_connection(self): | |
log.debug("get_ro_connection: first trying to use master connection") | |
master = self.get_master_connection() | |
if master: | |
log.debug("get_ro_connection: using master connection") | |
return (READ_WRITE, master) | |
log.debug("get_ro_connection: failed to use master connection") | |
if self.current_client: | |
try: | |
self.is_alive(self.current_client) | |
log.debug("get_ro_connection: found working connection") | |
return (READ, self.current_client) | |
except redis.ConnectionError: | |
pass | |
except DatabaseIsLoading: | |
# Т.к. слейв ответил, что он загружает данные (99%, что из мастера) | |
# то пора переключаться на мастер. | |
# NOTE Тут может возникнуть полный отказ при схеме с двумя слейвами в следующем сценарии | |
# 1. Мастер не работает | |
# 2. Слейв перезагрузили (щито) | |
# 3. На второй слейв мы никогда не переключимся, ибо будем туда-сюда скакать между мастером и слейвом, | |
# т.к. первый не доступен и мы переключаемся на слейв, а второй LOADING. | |
# Худший вариант: Не работаем ровно столько времени, сколько слейв загружает данные | |
# Лучший вариант: Переключились на мастер сразу же после его поднятия | |
self.forget_master_state() | |
master = self.get_master_connection() | |
if master: | |
return (READ_WRITE, master) | |
log.warn("get_ro_connection: lost connection") | |
client_kwargs = {} | |
for _ in xrange(self.slaves_count): | |
client_kwargs = self.slaves.next() | |
client = self.get_connection(client_kwargs) | |
if client: | |
log.debug("get_ro_connection: found working connection") | |
self.current_client = client | |
return (READ, client) | |
log.error("get_rw_connection: failed to get ro connection") | |
raise NoConnection(error_msg_from_config(client_kwargs)) | |
def forget_master_state(): | |
log.debug("forget_master_state: reseted died_at time") | |
cache.delete(MASTER_STATE_KEY) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment