Skip to content

Instantly share code, notes, and snippets.

@spikeekips
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spikeekips/9364007 to your computer and use it in GitHub Desktop.
Save spikeekips/9364007 to your computer and use it in GitHub Desktop.
naughty rpc client for `serf` for python
import logging
import msgpack
import socket
import threading
import string
DEFAULT_IPC_VERSION = 1
DEFAULT_READ_BUFFER_SIZE = 8192 *100
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 7373
DEFAULT_TIMEOUT = 2
PAYLOAD_SIZE_LIMIT = 256
STREAM_COMMANDS = ('stream', 'monitor', )
COMMAND_LIST = (
'handshake',
'event',
'force_leave',
'join',
'members',
'stream',
'monitor',
'stop',
'leave',
)
log = logging.getLogger('serf-rpc-client', )
class Client (threading.local, ) :
def __init__ (self,
host=DEFAULT_HOST,
port=DEFAULT_PORT,
ipc_version=DEFAULT_IPC_VERSION,
) :
self._conn = Connection(host, port, )
self.ipc_version = ipc_version
self.seq = 0
self._command_handlers = dict()
self._requests_container = list()
self._got_first_stream_response = False
def connect (self, ) :
self._conn.connection
def disconnect (self, ) :
self._conn.disconnect()
def __getattr__ (self, command, ) :
# convert python attribute to the real command
if command not in COMMAND_LIST :
return super(Client, self, ).__getattribute__(command, )
_func = lambda **kw : self.__call__(command, **kw)
_func.__name__ = command
return _func
def __call__ (self, command, **body) :
self.request_by_request(
BaseRequest.get_request_class(
command,
)(**body),
)
return self
def add_callback (self, handler, ) :
assert len(self._requests_container) > 0
self._requests_container[-1].callbacks.append(handler, )
return self
def _request_handshake (self, ) :
_request = BaseRequest.get_request_class('handshake', )(
Version=self.ipc_version,
)
self._request(_request, )
_response = self._get_response()
if not _response.is_success :
raise RpcError('failed to call `handshake`, %s.' % _response.error, )
log.debug('successfully handshaked', )
return _response.is_success
def request_by_request (self, request, ) :
# remove the duplicated command, unperiodically `serf` rpc server miss
# the some responses.
for _n, i in enumerate(self._requests_container) :
if i.command == request.command :
del self._requests_container[_n]
self._requests_container.append(request, )
return self
def request (self, timeout=DEFAULT_TIMEOUT, ) :
if not self._requests_container :
raise RpcError('no requests registered.', )
_missing_handshake = self._requests_container[0].command != 'handshake'
_stream_request = None
for i in self._requests_container :
# check whether the connection is still available or not.
self._conn.connection
if self._conn.reconnected and _missing_handshake :
self._request_handshake()
if i.is_stream :
_stream_request = i
self._request(i, )
return self._handle_response(_stream_request, timeout=timeout, )
def _handle_response (self, stream_request=None, timeout=DEFAULT_TIMEOUT, ) :
if stream_request :
self._got_first_stream_response = False
_responses = list()
while True :
if not stream_request and not self._requests_container :
return _responses
_response = self._get_response(
is_stream=bool(stream_request, )
if stream_request and self._got_first_stream_response else False,
timeout=None if stream_request is not None else timeout,
)
_response.callback()
_responses.append(_response, )
if stream_request :
self._got_first_stream_response = True
if _response.request.seq in self._command_handlers :
try :
self._requests_container.remove(_response.request, )
except ValueError :
pass
self._requests_container = list()
self._command_handlers = dict()
if stream_request :
self._got_first_stream_response = False
return self.request_by_request(stream_request, ).request()
def _request (self, request, ) :
request.seq = self.seq
self._command_handlers[self.seq] = request
self.seq += 1
log.debug('> call %s' % (repr(request, ), ), )
_method = getattr(self, '_request__%s' % request.command, self._request_default, )
_method(request, )
return
def _request_default (self, request, ) :
self._conn.write(request, )
return
def _get_response (self, is_stream=False, timeout=DEFAULT_TIMEOUT, ) :
_data = self._conn.read(timeout=timeout, )
unpacker = msgpack.Unpacker(use_list=True, )
unpacker.feed(_data, )
_header = None
_request = None
_body = None
_response_class = None
while True:
try:
_parsed = unpacker.next()
if _header is not None :
_body = _parsed
break
else :
_header = _parsed
if _header.get('Seq') not in self._command_handlers :
raise RpcError('got invalid response: %s' % _header, )
_request = self._command_handlers.get(_header.get('Seq'), )
_response_class = _request.get_response_handler(is_stream=is_stream, )
if not _response_class.has_body :
break
except StopIteration:
_data = self._conn.read(timeout=timeout, )
if not _data :
return
unpacker.feed(_data)
if _response_class is None :
raise RpcError('got invalid response', )
return _response_class(_request, _header, _body, )
class InvalidRequest (Exception, ) : pass
class Disconnected (Exception, ) : pass
class ConnectionError (Exception, ) : pass
class RpcError (Exception, ) : pass
class Connection (object, ) :
def __init__ (self, host='127.0.0.1', port=7373, timeout=DEFAULT_TIMEOUT, ) :
self._host = host
self._port = port
self._timeout = timeout
self._conn = None
self.reconnected = True
self._disconnected = False
def __del__ (self, ) :
try :
self.disconnect()
except :
pass
def _get_timeout (self, ) :
return self._timeout
def _set_timeout (self, t, ) :
self._timeout = t
self.connection.settimeout(float(self._timeout) if self._timeout else None, )
return
timeout = property(_get_timeout, _set_timeout, )
@property
def connection (self, ) :
if self._disconnected :
raise Disconnected('already disconnected, (%s:%d)' % (
self._host, self._port, ), )
if self._conn :
return self._conn
try :
_sock = self._connect()
except socket.timeout, e :
raise ConnectionError('%s (%s:%d)' % (e, self._host, self._port, ), )
except socket.error, e :
raise ConnectionError('%s (%s:%d)' % (e, self._host, self._port, ), )
self._conn = _sock
return self._conn
def _connect (self, ) :
log.debug('> trying to connect to %s:%d' % (self._host, self._port, ), )
_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
_sock.settimeout(self.timeout, )
_sock.connect((self._host, self._port, ), )
log.debug('< connected to %s:%d' % (self._host, self._port, ), )
self.reconnected = True
return _sock
def _disconnect (self, ) :
log.debug('> trying to disconnect connection of %s:%d' % (self._host, self._port, ), )
if self._conn is None :
return
try :
self._conn.close()
except socket.error :
pass
self._conn = None
return
def disconnect (self, ) :
self._disconnect()
self._disconnected = True
return
def reconnect (self, ) :
log.debug('> trying to re-connect to %s:%d' % (self._host, self._port, ), )
self.disconnect()
self._conn = self._connect()
return
def write (self, request, ) :
_data = str(request, )
log.debug('> trying to request command: %s' % (repr(request), ), )
try :
self.connection.sendall(_data, )
except (socket.error, socket.timeout, ) :
self.reconnect()
self.connection.sendall(_data, )
self.reconnected = False
log.debug('> send data: %s' % ((_data, ), ), )
return
def read (self, buflen=DEFAULT_READ_BUFFER_SIZE, timeout=DEFAULT_TIMEOUT, ) :
assert DEFAULT_READ_BUFFER_SIZE > 10
assert timeout is None or timeout > 0
self.timeout = timeout
_data = self.connection.recv(buflen, )
if not _data :
raise Disconnected()
return _data
class BaseRequest (object ) :
command = None
@classmethod
def get_request_class (cls, command, ) :
_base = REQUEST_HANDLER.get(command, cls, )
return type(
'Request%s' % ''.join(map(string.capitalize, command.split('_'), ), ),
(_base, ),
dict(command=command, ),
)
def __init__ (self, **body) :
self.seq = None
self.body = body
self.callbacks = list()
self.check()
def check (self, ) :
return
def __getstate__ (self, ) :
return dict(
command=self.command,
seq=self.seq,
body=self.body,
callbacks=self.callbacks,
)
def __repr__ (self, ) :
return '<%s: %s, %s, %s>' % (
self.__class__.__name__,
self.command,
self.seq,
str(self.body) if self.body else '',
)
def __str__ (self, ) :
if self.seq is None :
self.seq = 0
return msgpack.packb(dict(
Command=self.command.replace('_', '-', ),
Seq=self.seq,
), ) + (msgpack.packb(self.body, ) if self.body else '')
@property
def is_stream (self, ) :
return self.command in STREAM_COMMANDS
def get_response_handler (self, is_stream=False, ) :
_command = self.command
if is_stream :
_command = '%s_result' % (self.command, )
if _command in RESPONSE_HANDLER :
return RESPONSE_HANDLER.get(_command, ResponseWithoutBody, )
return RESPONSE_HANDLER.get(self.command, ResponseWithoutBody, )
class RequestHandshake (BaseRequest, ) :
"""
{"Version": 1}
"""
def check (self, ) :
if 'Version' not in self.body :
raise InvalidRequest('`Version` is missing.', )
return
class RequestEvent (BaseRequest, ) :
"""
{"Name": "foo", "Payload": "test payload", "Coalesce": true}
"""
_available_body_parameters = (
'Name',
'Payload',
'Coalesce',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['Name']
self.body['Payload']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('Name', ), ) not in (str, unicode, ) :
raise InvalidRequest('invalid request, `Name` must be str.', )
if type(self.body.get('Payload', ), ) not in (str, unicode, ) :
raise InvalidRequest('invalid request, `Payload` must be str.', )
if type(self.body.get('Coalesce', ), ) not in (bool, ) :
raise InvalidRequest('invalid request, `Coalesce` must be bool.', )
# payload
if len(str(self)) > PAYLOAD_SIZE_LIMIT :
raise InvalidRequest(
'invalid request, message size must be smaller than %s.' % (
PAYLOAD_SIZE_LIMIT,
), )
return
class RequestStream (BaseRequest, ) :
"""
{"Type": "member-join,user:deploy"}`
"""
_available_body_parameters = (
'Type',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['Type']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('Type', ), ) not in (str, unicode, ) :
raise InvalidRequest('invalid request, `Type` must be str.', )
_types = filter(string.strip, self.body.get('Type').split(','), )
if len(_types) < 1 :
raise InvalidRequest('invalid request, `Type` must be filled.', )
return
class RequestLeave (BaseRequest, ) :
"""
{"Node": "failed-node-name"}
"""
_available_body_parameters = (
'Node',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['Node']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('Node', ), ) not in (str, unicode, ) :
raise InvalidRequest('invalid request, `Type` must be str.', )
return
class RequestMonitor (BaseRequest, ) :
"""
{"LogLevel": "DEBUG"}
"""
_available_body_parameters = (
'LogLevel',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['LogLevel']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('LogLevel', ), ) not in (str, unicode, ) :
raise InvalidRequest('invalid request, `Type` must be str.', )
return
class RequestStop (BaseRequest, ) :
"""
{"Stop": 50}
"""
_available_body_parameters = (
'Stop',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['Stop']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('Stop', ), ) not in (int, long, ) :
raise InvalidRequest('invalid request, `Type` must be int.', )
return
class RequestJoin (BaseRequest, ) :
"""
{"Existing": ["192.168.0.1:6000", "192.168.0.2:6000"], "Replay": false}
"""
_available_body_parameters = (
'Existing',
'Replay',
)
def check (self, ) :
if set(self.body.keys()) - set(self._available_body_parameters) :
raise InvalidRequest('invalid request', )
try :
self.body['Existing']
except KeyError :
raise InvalidRequest('invalid request, some key is missing.', )
if type(self.body.get('Existing', ), ) not in (list, tuple, ) :
raise InvalidRequest('invalid request, `Existing` must be list or tuple.', )
if self.body.get('Replay', ) and type(self.body.get('Replay', ), ) not in (bool, ) :
raise InvalidRequest('invalid request, `Replay` must be bool.', )
return
class BaseResponse (object, ) :
has_body = True
def __init__ (self, request, header, body, ) :
self.request = request
self.header = header
self._body = body
def __repr__ (self, ) :
return '<Response: %s, seq:%d, has_body:%s>' % (
repr(self.request, ),
self.header.get('Seq'),
self.has_body,
)
def _parse_body (self, ) :
return self._body
@property
def body (self, ) :
return self._parse_body()
@property
def seq (self, ) :
return self.header.get('Seq', )
@property
def error (self, ) :
return self.header.get('Error', )
@property
def is_success (self, ) :
assert type(self.error) in (str, )
return not self.error.strip()
def callback (self, ) :
if not self.request.callbacks :
return
for i in self.request.callbacks :
i(self, )
return
class ResponseWithoutBody (BaseResponse, ) :
has_body = False
class ResponseWithBody (BaseResponse, ) :
has_body = True
class ResponseJoin (BaseResponse, ) :
has_body = True
@property
def is_success (self, ) :
if not super(ResponseJoin, self).is_success or type(self._body) not in (dict, ):
return False
return self._body.get('Num', 0, ) > 0
class ResponseMembers (ResponseWithBody, ) :
def __init__ (self, *a, **kw) :
super(ResponseMembers, self).__init__(*a, **kw)
self._body_parsed = None
def _parse_body (self, ) :
# FIXME: in the current `serf` has bugs, https://github.com/hashicorp/serf/issues/158 .
if self._body_parsed is None :
self._body_parsed = dict(Members=list(), )
for i in self._body.get('Members', ) :
if type(i.get('Addr')) not in (list, tuple, ) :
i['Addr'] = map(int, socket.inet_ntoa(i.get('Addr', ), ).split('.', ), )
elif type(i.get('Addr')) in (str, unicode, ) :
i['Addr'] = map(int, i.get('Addr', ).split('.', ), )
self._body_parsed['Members'].append(i, )
return self._body_parsed
def normalize_command (c, ) :
return c.replace('_', '-', )
RESPONSE_HANDLER = {
'handshake': ResponseWithoutBody,
'event': ResponseWithoutBody,
'force_leave': ResponseWithoutBody,
'join': ResponseJoin,
'members': ResponseMembers,
'stream': ResponseWithoutBody,
'stream_result': ResponseWithBody,
'monitor': ResponseWithoutBody,
'monitor_result': ResponseWithBody,
'stop': ResponseWithoutBody,
'leave': ResponseWithoutBody,
}
REQUEST_HANDLER = {
'handshake': RequestHandshake,
'event': RequestEvent,
'force_leave': RequestLeave,
'join': RequestJoin,
'stream': RequestStream,
'monitor': RequestMonitor,
'stop': RequestStop,
'leave': RequestLeave,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment