Created
October 13, 2014 17:05
-
-
Save harlowja/a0647af2736427bc4438 to your computer and use it in GitHub Desktop.
Redis-tooz-thing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
from __future__ import absolute_import | |
from concurrent import futures | |
import redis | |
from redis import lock as redis_lock | |
import tooz | |
from tooz import coordination | |
from tooz import locking | |
from tooz import utils | |
class RedisLock(locking.Lock): | |
_LOCK_PREFIX = b'__TOOZ_LOCK:' | |
def __init__(self, coord, client, name, timeout): | |
self.name = self._LOCK_PREFIX + name | |
self._lock = client.lock(self.name, timeout=timeout) | |
self._coord = coord | |
def acquire(self, blocking=True): | |
if blocking is True or blocking is False: | |
blocking_timeout = None | |
else: | |
blocking_timeout = float(blocking) | |
blocking_timeout = True | |
acked = self._lock.acquire(blocking=blocking, | |
blocking_timeout=blocking_timeout) | |
if acked: | |
self._coord._acquired_locks.append(self) | |
def release(self): | |
try: | |
self._coord._acquired_locks.remove(self) | |
except ValueError: | |
pass | |
try: | |
self._lock.release() | |
except redis_lock.LockError: | |
pass | |
def heartbeat(self): | |
self._lock.extend(self._lock.timeout) | |
class RedisDriver(coordination.CoordinationDriver): | |
_GROUP_LIST_KEY = b'__TOOZ_GROUP_LIST__' | |
# Redis deletes dictionaries that have no keys in them, which means the | |
# key will disappear which means we can't tell the difference between | |
# a group not existing and a group being empty without this key being | |
# saved... | |
_GROUP_EXISTS_KEY = b'__TOOZ_GROUP__' | |
_GROUP_LEADER_PREFIX = b'__TOOZ_GROUP_LEADER:' | |
_GROUP_PREFIX = b'__TOOZ_GROUP:' | |
def __init__(self, member_id, parsed_url, options): | |
super(RedisDriver, self).__init__() | |
self.host = (parsed_url.hostname or "localhost", | |
parsed_url.port or 6379) | |
default_timeout = options.get('timeout', ['30']) | |
self.timeout = int(default_timeout[-1]) | |
self.lock_timeout = int(options.get( | |
'lock_timeout', default_timeout)[-1]) | |
self._client = None | |
self._member_id = member_id | |
self._acquired_locks = [] | |
self._executor = None | |
def get_lock(self, name): | |
return RedisLock(self, self._client, name, self.lock_timeout) | |
def start(self): | |
try: | |
self._client = redis.StrictRedis( | |
host=self.host[0], port=self.host[1], | |
socket_connect_timeout=self.timeout, | |
socket_timeout=self.timeout) | |
except Exception as e: | |
raise coordination.ToozConnectionError(utils.exception_message(e)) | |
else: | |
self._acquired_locks = [] | |
self._executor = futures.ThreadPoolExecutor(max_workers=1) | |
self.heartbeat() | |
@classmethod | |
def _encode_member_id(cls, member_id): | |
if member_id == cls._GROUP_EXISTS_KEY: | |
raise ValueError("Not allowed to use private keys as a member id") | |
return member_id | |
@staticmethod | |
def _decode_member_id(member_id): | |
return member_id | |
@classmethod | |
def _encode_group_id(cls, group_id): | |
return cls._GROUP_PREFIX + group_id | |
@classmethod | |
def _encode_group_leader(cls, group_id): | |
return cls._GROUP_LEADER_PREFIX + group_id | |
def heartbeat(self): | |
self._client.ping() | |
for lock in self._acquired_locks: | |
lock.heartbeat() | |
def stop(self): | |
for lock in list(self._acquired_locks): | |
lock.release() | |
if self._executor is not None: | |
self._executor.shutdown(wait=True) | |
self._executor = None | |
if self._client is not None: | |
self._client = None | |
@staticmethod | |
def watch_join_group(group_id, callback): | |
raise tooz.NotImplemented | |
def create_group(self, group_id): | |
encoded_group = self._encode_group_id(group_id) | |
def _create_group(p): | |
if p.exists(encoded_group): | |
raise coordination.GroupAlreadyExist(group_id) | |
p.sadd(self._GROUP_LIST_KEY, group_id) | |
p.hset(encoded_group, self._GROUP_EXISTS_KEY, '1') | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_create_group, | |
encoded_group, self._GROUP_LIST_KEY, | |
value_from_callable=True)) | |
def update_capabilities(self, group_id, capabilities): | |
encoded_group = self._encode_group_id(group_id) | |
encoded_member_id = self._encode_member_id(self._member_id) | |
def _update_capabilities(p): | |
if not p.exists(encoded_group): | |
raise coordination.GroupNotCreated(group_id) | |
if not p.hexists(encoded_group, encoded_member_id): | |
raise coordination.MemberNotJoined(group_id, | |
self._member_id) | |
else: | |
p.hset(encoded_group, encoded_member_id, capabilities) | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_update_capabilities, | |
encoded_group, value_from_callable=True)) | |
def leave_group(self, group_id): | |
encoded_group = self._encode_group_id(group_id) | |
encoded_member_id = self._encode_member_id(self._member_id) | |
def _leave_group(p): | |
if not p.exists(encoded_group): | |
raise coordination.GroupNotCreated(group_id) | |
c = p.hdel(encoded_group, encoded_member_id) | |
if c == 0: | |
raise coordination.MemberNotJoined(group_id, | |
self._member_id) | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_leave_group, | |
encoded_group, value_from_callable=True)) | |
def get_members(self, group_id): | |
encoded_group = self._encode_group_id(group_id) | |
def _get_members(p): | |
if not p.exists(encoded_group): | |
raise coordination.GroupNotCreated(group_id) | |
members = [] | |
for k in p.hkeys(encoded_group): | |
if k != self._GROUP_EXISTS_KEY: | |
members.append(self._decode_member_id(k)) | |
return members | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_get_members, | |
encoded_group, value_from_callable=True)) | |
def get_member_capabilities(self, group_id, member_id): | |
encoded_group = self._encode_group_id(group_id) | |
encoded_member_id = self._encode_member_id(member_id) | |
def _get_member_capabilities(p): | |
if not p.exists(encoded_group): | |
raise coordination.GroupNotCreated(group_id) | |
capabilities = p.hget(encoded_group, encoded_member_id) | |
if capabilities is None: | |
raise coordination.MemberNotJoined(group_id, member_id) | |
return capabilities | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_get_member_capabilities, | |
encoded_group, value_from_callable=True)) | |
def join_group(self, group_id, capabilities=b""): | |
encoded_group = self._encode_group_id(group_id) | |
encoded_member_id = self._encode_member_id(self._member_id) | |
def _join_group(p): | |
if not p.exists(encoded_group): | |
raise coordination.GroupNotCreated(group_id) | |
p.hset(encoded_group, encoded_member_id, capabilities) | |
return RedisFutureResult( | |
self._executor.submit(self._client.transaction, | |
_join_group, | |
encoded_group, value_from_callable=True)) | |
def get_groups(self): | |
return RedisFutureResult( | |
self._executor.submit(self._client.smembers, | |
self._GROUP_LIST_KEY), | |
result_processor=lambda members: list(members)) | |
@staticmethod | |
def unwatch_join_group(group_id, callback): | |
raise tooz.NotImplementedError | |
@staticmethod | |
def watch_leave_group(group_id, callback): | |
raise tooz.NotImplementedError | |
@staticmethod | |
def unwatch_leave_group(group_id, callback): | |
raise tooz.NotImplementedError | |
@staticmethod | |
def watch_elected_as_leader(group_id, callback): | |
raise tooz.NotImplementedError | |
@staticmethod | |
def unwatch_elected_as_leader(group_id, callback): | |
raise tooz.NotImplementedError | |
def run_watchers(self): | |
result = [] | |
return result | |
class RedisFutureResult(coordination.CoordAsyncResult): | |
"""Redis asynchronous result that references a future.""" | |
def __init__(self, fut, result_processor=None): | |
self._fut = fut | |
self._result_processor = result_processor | |
def get(self, timeout=10): | |
try: | |
res = self._fut.result(timeout=timeout) | |
except futures.TimeoutError as e: | |
raise coordination.OperationTimedOut(utils.exception_message(e)) | |
else: | |
if self._result_processor: | |
return self._result_processor(res) | |
else: | |
return res | |
def done(self): | |
return self._fut.done() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment