Last active
August 29, 2015 13:57
-
-
Save spikeekips/9364007 to your computer and use it in GitHub Desktop.
naughty rpc client for `serf` for python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import msgpack | |
import socket | |
import threading | |
import string | |
DEFAULT_IPC_VERSION = 1 | |
DEFAULT_READ_BUFFER_SIZE = 8192 *100 | |
DEFAULT_HOST = '127.0.0.1' | |
DEFAULT_PORT = 7373 | |
DEFAULT_TIMEOUT = 2 | |
PAYLOAD_SIZE_LIMIT = 256 | |
STREAM_COMMANDS = ('stream', 'monitor', ) | |
COMMAND_LIST = ( | |
'handshake', | |
'event', | |
'force_leave', | |
'join', | |
'members', | |
'stream', | |
'monitor', | |
'stop', | |
'leave', | |
) | |
log = logging.getLogger('serf-rpc-client', ) | |
class Client (threading.local, ) : | |
def __init__ (self, | |
host=DEFAULT_HOST, | |
port=DEFAULT_PORT, | |
ipc_version=DEFAULT_IPC_VERSION, | |
) : | |
self._conn = Connection(host, port, ) | |
self.ipc_version = ipc_version | |
self.seq = 0 | |
self._command_handlers = dict() | |
self._requests_container = list() | |
self._got_first_stream_response = False | |
def connect (self, ) : | |
self._conn.connection | |
def disconnect (self, ) : | |
self._conn.disconnect() | |
def __getattr__ (self, command, ) : | |
# convert python attribute to the real command | |
if command not in COMMAND_LIST : | |
return super(Client, self, ).__getattribute__(command, ) | |
_func = lambda **kw : self.__call__(command, **kw) | |
_func.__name__ = command | |
return _func | |
def __call__ (self, command, **body) : | |
self.request_by_request( | |
BaseRequest.get_request_class( | |
command, | |
)(**body), | |
) | |
return self | |
def add_callback (self, handler, ) : | |
assert len(self._requests_container) > 0 | |
self._requests_container[-1].callbacks.append(handler, ) | |
return self | |
def _request_handshake (self, ) : | |
_request = BaseRequest.get_request_class('handshake', )( | |
Version=self.ipc_version, | |
) | |
self._request(_request, ) | |
_response = self._get_response() | |
if not _response.is_success : | |
raise RpcError('failed to call `handshake`, %s.' % _response.error, ) | |
log.debug('successfully handshaked', ) | |
return _response.is_success | |
def request_by_request (self, request, ) : | |
# remove the duplicated command, unperiodically `serf` rpc server miss | |
# the some responses. | |
for _n, i in enumerate(self._requests_container) : | |
if i.command == request.command : | |
del self._requests_container[_n] | |
self._requests_container.append(request, ) | |
return self | |
def request (self, timeout=DEFAULT_TIMEOUT, ) : | |
if not self._requests_container : | |
raise RpcError('no requests registered.', ) | |
_missing_handshake = self._requests_container[0].command != 'handshake' | |
_stream_request = None | |
for i in self._requests_container : | |
# check whether the connection is still available or not. | |
self._conn.connection | |
if self._conn.reconnected and _missing_handshake : | |
self._request_handshake() | |
if i.is_stream : | |
_stream_request = i | |
self._request(i, ) | |
return self._handle_response(_stream_request, timeout=timeout, ) | |
def _handle_response (self, stream_request=None, timeout=DEFAULT_TIMEOUT, ) : | |
if stream_request : | |
self._got_first_stream_response = False | |
_responses = list() | |
while True : | |
if not stream_request and not self._requests_container : | |
return _responses | |
_response = self._get_response( | |
is_stream=bool(stream_request, ) | |
if stream_request and self._got_first_stream_response else False, | |
timeout=None if stream_request is not None else timeout, | |
) | |
_response.callback() | |
_responses.append(_response, ) | |
if stream_request : | |
self._got_first_stream_response = True | |
if _response.request.seq in self._command_handlers : | |
try : | |
self._requests_container.remove(_response.request, ) | |
except ValueError : | |
pass | |
self._requests_container = list() | |
self._command_handlers = dict() | |
if stream_request : | |
self._got_first_stream_response = False | |
return self.request_by_request(stream_request, ).request() | |
def _request (self, request, ) : | |
request.seq = self.seq | |
self._command_handlers[self.seq] = request | |
self.seq += 1 | |
log.debug('> call %s' % (repr(request, ), ), ) | |
_method = getattr(self, '_request__%s' % request.command, self._request_default, ) | |
_method(request, ) | |
return | |
def _request_default (self, request, ) : | |
self._conn.write(request, ) | |
return | |
def _get_response (self, is_stream=False, timeout=DEFAULT_TIMEOUT, ) : | |
_data = self._conn.read(timeout=timeout, ) | |
unpacker = msgpack.Unpacker(use_list=True, ) | |
unpacker.feed(_data, ) | |
_header = None | |
_request = None | |
_body = None | |
_response_class = None | |
while True: | |
try: | |
_parsed = unpacker.next() | |
if _header is not None : | |
_body = _parsed | |
break | |
else : | |
_header = _parsed | |
if _header.get('Seq') not in self._command_handlers : | |
raise RpcError('got invalid response: %s' % _header, ) | |
_request = self._command_handlers.get(_header.get('Seq'), ) | |
_response_class = _request.get_response_handler(is_stream=is_stream, ) | |
if not _response_class.has_body : | |
break | |
except StopIteration: | |
_data = self._conn.read(timeout=timeout, ) | |
if not _data : | |
return | |
unpacker.feed(_data) | |
if _response_class is None : | |
raise RpcError('got invalid response', ) | |
return _response_class(_request, _header, _body, ) | |
class InvalidRequest (Exception, ) : pass | |
class Disconnected (Exception, ) : pass | |
class ConnectionError (Exception, ) : pass | |
class RpcError (Exception, ) : pass | |
class Connection (object, ) : | |
def __init__ (self, host='127.0.0.1', port=7373, timeout=DEFAULT_TIMEOUT, ) : | |
self._host = host | |
self._port = port | |
self._timeout = timeout | |
self._conn = None | |
self.reconnected = True | |
self._disconnected = False | |
def __del__ (self, ) : | |
try : | |
self.disconnect() | |
except : | |
pass | |
def _get_timeout (self, ) : | |
return self._timeout | |
def _set_timeout (self, t, ) : | |
self._timeout = t | |
self.connection.settimeout(float(self._timeout) if self._timeout else None, ) | |
return | |
timeout = property(_get_timeout, _set_timeout, ) | |
@property | |
def connection (self, ) : | |
if self._disconnected : | |
raise Disconnected('already disconnected, (%s:%d)' % ( | |
self._host, self._port, ), ) | |
if self._conn : | |
return self._conn | |
try : | |
_sock = self._connect() | |
except socket.timeout, e : | |
raise ConnectionError('%s (%s:%d)' % (e, self._host, self._port, ), ) | |
except socket.error, e : | |
raise ConnectionError('%s (%s:%d)' % (e, self._host, self._port, ), ) | |
self._conn = _sock | |
return self._conn | |
def _connect (self, ) : | |
log.debug('> trying to connect to %s:%d' % (self._host, self._port, ), ) | |
_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
_sock.settimeout(self.timeout, ) | |
_sock.connect((self._host, self._port, ), ) | |
log.debug('< connected to %s:%d' % (self._host, self._port, ), ) | |
self.reconnected = True | |
return _sock | |
def _disconnect (self, ) : | |
log.debug('> trying to disconnect connection of %s:%d' % (self._host, self._port, ), ) | |
if self._conn is None : | |
return | |
try : | |
self._conn.close() | |
except socket.error : | |
pass | |
self._conn = None | |
return | |
def disconnect (self, ) : | |
self._disconnect() | |
self._disconnected = True | |
return | |
def reconnect (self, ) : | |
log.debug('> trying to re-connect to %s:%d' % (self._host, self._port, ), ) | |
self.disconnect() | |
self._conn = self._connect() | |
return | |
def write (self, request, ) : | |
_data = str(request, ) | |
log.debug('> trying to request command: %s' % (repr(request), ), ) | |
try : | |
self.connection.sendall(_data, ) | |
except (socket.error, socket.timeout, ) : | |
self.reconnect() | |
self.connection.sendall(_data, ) | |
self.reconnected = False | |
log.debug('> send data: %s' % ((_data, ), ), ) | |
return | |
def read (self, buflen=DEFAULT_READ_BUFFER_SIZE, timeout=DEFAULT_TIMEOUT, ) : | |
assert DEFAULT_READ_BUFFER_SIZE > 10 | |
assert timeout is None or timeout > 0 | |
self.timeout = timeout | |
_data = self.connection.recv(buflen, ) | |
if not _data : | |
raise Disconnected() | |
return _data | |
class BaseRequest (object ) : | |
command = None | |
@classmethod | |
def get_request_class (cls, command, ) : | |
_base = REQUEST_HANDLER.get(command, cls, ) | |
return type( | |
'Request%s' % ''.join(map(string.capitalize, command.split('_'), ), ), | |
(_base, ), | |
dict(command=command, ), | |
) | |
def __init__ (self, **body) : | |
self.seq = None | |
self.body = body | |
self.callbacks = list() | |
self.check() | |
def check (self, ) : | |
return | |
def __getstate__ (self, ) : | |
return dict( | |
command=self.command, | |
seq=self.seq, | |
body=self.body, | |
callbacks=self.callbacks, | |
) | |
def __repr__ (self, ) : | |
return '<%s: %s, %s, %s>' % ( | |
self.__class__.__name__, | |
self.command, | |
self.seq, | |
str(self.body) if self.body else '', | |
) | |
def __str__ (self, ) : | |
if self.seq is None : | |
self.seq = 0 | |
return msgpack.packb(dict( | |
Command=self.command.replace('_', '-', ), | |
Seq=self.seq, | |
), ) + (msgpack.packb(self.body, ) if self.body else '') | |
@property | |
def is_stream (self, ) : | |
return self.command in STREAM_COMMANDS | |
def get_response_handler (self, is_stream=False, ) : | |
_command = self.command | |
if is_stream : | |
_command = '%s_result' % (self.command, ) | |
if _command in RESPONSE_HANDLER : | |
return RESPONSE_HANDLER.get(_command, ResponseWithoutBody, ) | |
return RESPONSE_HANDLER.get(self.command, ResponseWithoutBody, ) | |
class RequestHandshake (BaseRequest, ) : | |
""" | |
{"Version": 1} | |
""" | |
def check (self, ) : | |
if 'Version' not in self.body : | |
raise InvalidRequest('`Version` is missing.', ) | |
return | |
class RequestEvent (BaseRequest, ) : | |
""" | |
{"Name": "foo", "Payload": "test payload", "Coalesce": true} | |
""" | |
_available_body_parameters = ( | |
'Name', | |
'Payload', | |
'Coalesce', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['Name'] | |
self.body['Payload'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('Name', ), ) not in (str, unicode, ) : | |
raise InvalidRequest('invalid request, `Name` must be str.', ) | |
if type(self.body.get('Payload', ), ) not in (str, unicode, ) : | |
raise InvalidRequest('invalid request, `Payload` must be str.', ) | |
if type(self.body.get('Coalesce', ), ) not in (bool, ) : | |
raise InvalidRequest('invalid request, `Coalesce` must be bool.', ) | |
# payload | |
if len(str(self)) > PAYLOAD_SIZE_LIMIT : | |
raise InvalidRequest( | |
'invalid request, message size must be smaller than %s.' % ( | |
PAYLOAD_SIZE_LIMIT, | |
), ) | |
return | |
class RequestStream (BaseRequest, ) : | |
""" | |
{"Type": "member-join,user:deploy"}` | |
""" | |
_available_body_parameters = ( | |
'Type', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['Type'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('Type', ), ) not in (str, unicode, ) : | |
raise InvalidRequest('invalid request, `Type` must be str.', ) | |
_types = filter(string.strip, self.body.get('Type').split(','), ) | |
if len(_types) < 1 : | |
raise InvalidRequest('invalid request, `Type` must be filled.', ) | |
return | |
class RequestLeave (BaseRequest, ) : | |
""" | |
{"Node": "failed-node-name"} | |
""" | |
_available_body_parameters = ( | |
'Node', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['Node'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('Node', ), ) not in (str, unicode, ) : | |
raise InvalidRequest('invalid request, `Type` must be str.', ) | |
return | |
class RequestMonitor (BaseRequest, ) : | |
""" | |
{"LogLevel": "DEBUG"} | |
""" | |
_available_body_parameters = ( | |
'LogLevel', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['LogLevel'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('LogLevel', ), ) not in (str, unicode, ) : | |
raise InvalidRequest('invalid request, `Type` must be str.', ) | |
return | |
class RequestStop (BaseRequest, ) : | |
""" | |
{"Stop": 50} | |
""" | |
_available_body_parameters = ( | |
'Stop', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['Stop'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('Stop', ), ) not in (int, long, ) : | |
raise InvalidRequest('invalid request, `Type` must be int.', ) | |
return | |
class RequestJoin (BaseRequest, ) : | |
""" | |
{"Existing": ["192.168.0.1:6000", "192.168.0.2:6000"], "Replay": false} | |
""" | |
_available_body_parameters = ( | |
'Existing', | |
'Replay', | |
) | |
def check (self, ) : | |
if set(self.body.keys()) - set(self._available_body_parameters) : | |
raise InvalidRequest('invalid request', ) | |
try : | |
self.body['Existing'] | |
except KeyError : | |
raise InvalidRequest('invalid request, some key is missing.', ) | |
if type(self.body.get('Existing', ), ) not in (list, tuple, ) : | |
raise InvalidRequest('invalid request, `Existing` must be list or tuple.', ) | |
if self.body.get('Replay', ) and type(self.body.get('Replay', ), ) not in (bool, ) : | |
raise InvalidRequest('invalid request, `Replay` must be bool.', ) | |
return | |
class BaseResponse (object, ) : | |
has_body = True | |
def __init__ (self, request, header, body, ) : | |
self.request = request | |
self.header = header | |
self._body = body | |
def __repr__ (self, ) : | |
return '<Response: %s, seq:%d, has_body:%s>' % ( | |
repr(self.request, ), | |
self.header.get('Seq'), | |
self.has_body, | |
) | |
def _parse_body (self, ) : | |
return self._body | |
@property | |
def body (self, ) : | |
return self._parse_body() | |
@property | |
def seq (self, ) : | |
return self.header.get('Seq', ) | |
@property | |
def error (self, ) : | |
return self.header.get('Error', ) | |
@property | |
def is_success (self, ) : | |
assert type(self.error) in (str, ) | |
return not self.error.strip() | |
def callback (self, ) : | |
if not self.request.callbacks : | |
return | |
for i in self.request.callbacks : | |
i(self, ) | |
return | |
class ResponseWithoutBody (BaseResponse, ) : | |
has_body = False | |
class ResponseWithBody (BaseResponse, ) : | |
has_body = True | |
class ResponseJoin (BaseResponse, ) : | |
has_body = True | |
@property | |
def is_success (self, ) : | |
if not super(ResponseJoin, self).is_success or type(self._body) not in (dict, ): | |
return False | |
return self._body.get('Num', 0, ) > 0 | |
class ResponseMembers (ResponseWithBody, ) : | |
def __init__ (self, *a, **kw) : | |
super(ResponseMembers, self).__init__(*a, **kw) | |
self._body_parsed = None | |
def _parse_body (self, ) : | |
# FIXME: in the current `serf` has bugs, https://github.com/hashicorp/serf/issues/158 . | |
if self._body_parsed is None : | |
self._body_parsed = dict(Members=list(), ) | |
for i in self._body.get('Members', ) : | |
if type(i.get('Addr')) not in (list, tuple, ) : | |
i['Addr'] = map(int, socket.inet_ntoa(i.get('Addr', ), ).split('.', ), ) | |
elif type(i.get('Addr')) in (str, unicode, ) : | |
i['Addr'] = map(int, i.get('Addr', ).split('.', ), ) | |
self._body_parsed['Members'].append(i, ) | |
return self._body_parsed | |
def normalize_command (c, ) : | |
return c.replace('_', '-', ) | |
RESPONSE_HANDLER = { | |
'handshake': ResponseWithoutBody, | |
'event': ResponseWithoutBody, | |
'force_leave': ResponseWithoutBody, | |
'join': ResponseJoin, | |
'members': ResponseMembers, | |
'stream': ResponseWithoutBody, | |
'stream_result': ResponseWithBody, | |
'monitor': ResponseWithoutBody, | |
'monitor_result': ResponseWithBody, | |
'stop': ResponseWithoutBody, | |
'leave': ResponseWithoutBody, | |
} | |
REQUEST_HANDLER = { | |
'handshake': RequestHandshake, | |
'event': RequestEvent, | |
'force_leave': RequestLeave, | |
'join': RequestJoin, | |
'stream': RequestStream, | |
'monitor': RequestMonitor, | |
'stop': RequestStop, | |
'leave': RequestLeave, | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment