Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created October 13, 2014 17:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harlowja/a0647af2736427bc4438 to your computer and use it in GitHub Desktop.
Save harlowja/a0647af2736427bc4438 to your computer and use it in GitHub Desktop.
Redis-tooz-thing
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from concurrent import futures
import redis
from redis import lock as redis_lock
import tooz
from tooz import coordination
from tooz import locking
from tooz import utils
class RedisLock(locking.Lock):
_LOCK_PREFIX = b'__TOOZ_LOCK:'
def __init__(self, coord, client, name, timeout):
self.name = self._LOCK_PREFIX + name
self._lock = client.lock(self.name, timeout=timeout)
self._coord = coord
def acquire(self, blocking=True):
if blocking is True or blocking is False:
blocking_timeout = None
else:
blocking_timeout = float(blocking)
blocking_timeout = True
acked = self._lock.acquire(blocking=blocking,
blocking_timeout=blocking_timeout)
if acked:
self._coord._acquired_locks.append(self)
def release(self):
try:
self._coord._acquired_locks.remove(self)
except ValueError:
pass
try:
self._lock.release()
except redis_lock.LockError:
pass
def heartbeat(self):
self._lock.extend(self._lock.timeout)
class RedisDriver(coordination.CoordinationDriver):
_GROUP_LIST_KEY = b'__TOOZ_GROUP_LIST__'
# Redis deletes dictionaries that have no keys in them, which means the
# key will disappear which means we can't tell the difference between
# a group not existing and a group being empty without this key being
# saved...
_GROUP_EXISTS_KEY = b'__TOOZ_GROUP__'
_GROUP_LEADER_PREFIX = b'__TOOZ_GROUP_LEADER:'
_GROUP_PREFIX = b'__TOOZ_GROUP:'
def __init__(self, member_id, parsed_url, options):
super(RedisDriver, self).__init__()
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 6379)
default_timeout = options.get('timeout', ['30'])
self.timeout = int(default_timeout[-1])
self.lock_timeout = int(options.get(
'lock_timeout', default_timeout)[-1])
self._client = None
self._member_id = member_id
self._acquired_locks = []
self._executor = None
def get_lock(self, name):
return RedisLock(self, self._client, name, self.lock_timeout)
def start(self):
try:
self._client = redis.StrictRedis(
host=self.host[0], port=self.host[1],
socket_connect_timeout=self.timeout,
socket_timeout=self.timeout)
except Exception as e:
raise coordination.ToozConnectionError(utils.exception_message(e))
else:
self._acquired_locks = []
self._executor = futures.ThreadPoolExecutor(max_workers=1)
self.heartbeat()
@classmethod
def _encode_member_id(cls, member_id):
if member_id == cls._GROUP_EXISTS_KEY:
raise ValueError("Not allowed to use private keys as a member id")
return member_id
@staticmethod
def _decode_member_id(member_id):
return member_id
@classmethod
def _encode_group_id(cls, group_id):
return cls._GROUP_PREFIX + group_id
@classmethod
def _encode_group_leader(cls, group_id):
return cls._GROUP_LEADER_PREFIX + group_id
def heartbeat(self):
self._client.ping()
for lock in self._acquired_locks:
lock.heartbeat()
def stop(self):
for lock in list(self._acquired_locks):
lock.release()
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
if self._client is not None:
self._client = None
@staticmethod
def watch_join_group(group_id, callback):
raise tooz.NotImplemented
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
def _create_group(p):
if p.exists(encoded_group):
raise coordination.GroupAlreadyExist(group_id)
p.sadd(self._GROUP_LIST_KEY, group_id)
p.hset(encoded_group, self._GROUP_EXISTS_KEY, '1')
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_create_group,
encoded_group, self._GROUP_LIST_KEY,
value_from_callable=True))
def update_capabilities(self, group_id, capabilities):
encoded_group = self._encode_group_id(group_id)
encoded_member_id = self._encode_member_id(self._member_id)
def _update_capabilities(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
if not p.hexists(encoded_group, encoded_member_id):
raise coordination.MemberNotJoined(group_id,
self._member_id)
else:
p.hset(encoded_group, encoded_member_id, capabilities)
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_update_capabilities,
encoded_group, value_from_callable=True))
def leave_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
encoded_member_id = self._encode_member_id(self._member_id)
def _leave_group(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
c = p.hdel(encoded_group, encoded_member_id)
if c == 0:
raise coordination.MemberNotJoined(group_id,
self._member_id)
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_leave_group,
encoded_group, value_from_callable=True))
def get_members(self, group_id):
encoded_group = self._encode_group_id(group_id)
def _get_members(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
members = []
for k in p.hkeys(encoded_group):
if k != self._GROUP_EXISTS_KEY:
members.append(self._decode_member_id(k))
return members
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_get_members,
encoded_group, value_from_callable=True))
def get_member_capabilities(self, group_id, member_id):
encoded_group = self._encode_group_id(group_id)
encoded_member_id = self._encode_member_id(member_id)
def _get_member_capabilities(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
capabilities = p.hget(encoded_group, encoded_member_id)
if capabilities is None:
raise coordination.MemberNotJoined(group_id, member_id)
return capabilities
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_get_member_capabilities,
encoded_group, value_from_callable=True))
def join_group(self, group_id, capabilities=b""):
encoded_group = self._encode_group_id(group_id)
encoded_member_id = self._encode_member_id(self._member_id)
def _join_group(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
p.hset(encoded_group, encoded_member_id, capabilities)
return RedisFutureResult(
self._executor.submit(self._client.transaction,
_join_group,
encoded_group, value_from_callable=True))
def get_groups(self):
return RedisFutureResult(
self._executor.submit(self._client.smembers,
self._GROUP_LIST_KEY),
result_processor=lambda members: list(members))
@staticmethod
def unwatch_join_group(group_id, callback):
raise tooz.NotImplementedError
@staticmethod
def watch_leave_group(group_id, callback):
raise tooz.NotImplementedError
@staticmethod
def unwatch_leave_group(group_id, callback):
raise tooz.NotImplementedError
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplementedError
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplementedError
def run_watchers(self):
result = []
return result
class RedisFutureResult(coordination.CoordAsyncResult):
"""Redis asynchronous result that references a future."""
def __init__(self, fut, result_processor=None):
self._fut = fut
self._result_processor = result_processor
def get(self, timeout=10):
try:
res = self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
raise coordination.OperationTimedOut(utils.exception_message(e))
else:
if self._result_processor:
return self._result_processor(res)
else:
return res
def done(self):
return self._fut.done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment