Skip to content

Instantly share code, notes, and snippets.

@PCManticore
Created June 18, 2014 05:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PCManticore/cf82ab421d4dc5c7f6ff to your computer and use it in GitHub Desktop.
Save PCManticore/cf82ab421d4dc5c7f6ff to your computer and use it in GitHub Desktop.
diff -r abcf17bc5eae Lib/test/test_xmlrpc.py
--- a/Lib/test/test_xmlrpc.py Tue Jun 17 00:34:56 2014 -0700
+++ b/Lib/test/test_xmlrpc.py Wed Jun 18 08:20:07 2014 +0300
@@ -12,6 +12,7 @@
import re
import io
import contextlib
+import inspect
from test import support
try:
@@ -497,6 +498,53 @@
PORT = None
evt.set()
+def http_magic_server(evt, numrequests, requestHandler=None):
+ class TestInstanceClass:
+ def div(self, x:1, y:2)->3:
+ "annotated div"
+ return x // y
+
+ class MyXMLRPCServer(xmlrpc.server.MagicRPCServer):
+ def get_request(self):
+ # Ensure the socket is always non-blocking. On Linux, socket
+ # attributes are not inherited like they are on *BSD and Windows.
+ s, port = self.socket.accept()
+ s.setblocking(True)
+ return s, port
+ if not requestHandler:
+ requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
+ serv = MyXMLRPCServer(("localhost", 0), requestHandler,
+ logRequests=False, bind_and_activate=False)
+ try:
+ serv.server_bind()
+ global ADDR, PORT, URL
+ ADDR, PORT = serv.socket.getsockname()
+ #connect to IP address directly. This avoids socket.create_connection()
+ #trying to connect to "localhost" using all address families, which
+ #causes slowdown e.g. on vista which supports AF_INET6. The server listens
+ #on AF_INET only.
+ URL = "http://%s:%d"%(ADDR, PORT)
+ serv.server_activate()
+ serv.register_introspection_functions()
+ serv.register_function(pow)
+ serv.register_function(lambda x,y: x+y, 'add')
+ testInstance = TestInstanceClass()
+ serv.register_instance(testInstance, allow_dotted_names=True)
+ evt.set()
+
+ # handle up to 'numrequests' requests
+ while numrequests > 0:
+ serv.handle_request()
+ numrequests -= 1
+
+ except socket.timeout:
+ pass
+ finally:
+ serv.socket.close()
+ PORT = None
+ evt.set()
+
+
# This function prevents errors like:
# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
def is_unavailable_exception(e):
@@ -1117,6 +1165,58 @@
server.server_close()
self.assertTrue(server.use_builtin_types)
+class MagicServerTestCase(BaseServerTestCase):
+ threadFunc = staticmethod(http_magic_server)
+
+ def test_in_dir(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ for method in ('add', 'div', 'pow'):
+ self.assertIn(method, dir(p))
+
+ def test_signatures(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ add_sig = inspect.signature(p.add)
+ div_sig = inspect.signature(p.div)
+ pow_sig = inspect.signature(p.pow)
+ self.assertEqual(str(add_sig), "(x, y)")
+ self.assertEqual(str(div_sig), "(x:1, y:2) -> 3")
+ self.assertEqual(str(pow_sig), "(*args, **kwargs)")
+
+ def test_doc(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ self.assertEqual(p.add.__doc__, '')
+ self.assertEqual(p.pow.__doc__, pow.__doc__)
+ self.assertEqual(p.div.__doc__, "annotated div")
+
+ def test_annotations(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ self.assertIsNotNone(p.div.__annotations__)
+ for name, value in (('x', 1), ('y', 2), ('return', 3)):
+ self.assertIn(name, p.div.__annotations__)
+ self.assertEqual(p.div.__annotations__[name], value)
+
+ def test_arguments_errors(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ for meth in p.div, p.add:
+ with self.assertRaisesRegex(TypeError,
+ "missing 1 required positional "
+ "argument: 'y'"):
+ meth(1)
+ with self.assertRaisesRegex(TypeError,
+ "takes 2 positional arguments "
+ "but 3 were given"):
+ meth(1, 2, 3)
+
+class MagicServerTestCase1(BaseServerTestCase):
+ threadFunc = staticmethod(http_magic_server)
+ request_count = 4
+
+ def test_calling_works(self):
+ with xmlrpclib.MagicProxy(URL) as p:
+ self.assertEqual(p.div(10, 4), 10 // 4)
+ self.assertEqual(p.pow(1, 1), pow(1, 1))
+ self.assertEqual(p.add(1, 2), 1 + 2)
+
@support.reap_threads
def test_main():
@@ -1125,7 +1225,7 @@
SimpleServerTestCase, KeepaliveServerTestCase1,
KeepaliveServerTestCase2, GzipServerTestCase,
MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
- CGIHandlerTestCase)
+ CGIHandlerTestCase, MagicServerTestCase, MagicServerTestCase1)
if __name__ == "__main__":
diff -r abcf17bc5eae Lib/xmlrpc/client.py
--- a/Lib/xmlrpc/client.py Tue Jun 17 00:34:56 2014 -0700
+++ b/Lib/xmlrpc/client.py Wed Jun 18 08:20:07 2014 +0300
@@ -136,6 +136,8 @@
from xml.parsers import expat
import errno
from io import BytesIO
+import textwrap
+import inspect
try:
import gzip
except ImportError:
@@ -1459,6 +1461,67 @@
Server = ServerProxy
+# ------------------------- MagicProxy ------------------------------
+
+def _dispatch_rpc(name, signature, params, doc, rpc):
+ function_defition = textwrap.dedent("""
+ def {name}{signature}:
+ '''{doc}'''
+ return rpc._original_{name}({params})
+ """)
+ definition = function_defition.format(
+ name=name,
+ signature=signature,
+ params=params,
+ doc=doc)
+ namespace = {'__name__': 'xmlrpc.client',
+ 'rpc': rpc}
+ try:
+ exec(definition, namespace)
+ except SyntaxError as e:
+ raise SyntaxError(e.msg + ':\n\n' + definition)
+
+ result = namespace[name]
+ try:
+ result.__module__ = (sys._getframe(1)
+ .f_globals
+ .get('__name__', '__main__'))
+ except (AttributeError, ValueError):
+ pass
+
+ result.__doc__ = doc
+ result.__name__ = name
+ return result
+
+
+class MagicProxy(ServerProxy):
+ def __init__(self, uri, transport=None, encoding=None, verbose=False,
+ allow_none=False, use_datetime=False,
+ use_builtin_types=False):
+ ServerProxy.__init__(self,
+ uri, transport=transport, encoding=encoding, verbose=verbose,
+ allow_none=allow_none, use_datetime=use_datetime,
+ use_builtin_types=use_builtin_types)
+ self._collect_methods()
+
+ def _collect_methods(self):
+ try:
+ methods = self.system.introspection()
+ except Fault as exc:
+ # probably does not support introspection and
+ # it is not a magic server.
+ return
+ for meth_info in methods:
+ method = _dispatch_rpc(*meth_info, rpc=self)
+ meth_name = meth_info[0]
+ original = getattr(self, meth_name)
+
+ # we need this in order to call the original RPC method
+ setattr(self, "_original_{}".format(meth_name), original)
+ setattr(self, meth_name, method)
+
+
+
# --------------------------------------------------------------------
# test code
diff -r abcf17bc5eae Lib/xmlrpc/server.py
--- a/Lib/xmlrpc/server.py Tue Jun 17 00:34:56 2014 -0700
+++ b/Lib/xmlrpc/server.py Wed Jun 18 08:20:07 2014 +0300
@@ -405,7 +405,6 @@
)
except AttributeError:
pass
-
if func is not None:
return func(*params)
else:
@@ -959,6 +958,60 @@
XMLRPCDocGenerator.__init__(self)
+class MagicRPCServer(SimpleXMLRPCServer):
+ def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler,
+ logRequests=True, allow_none=False, encoding=None,
+ bind_and_activate=True, use_builtin_types=False):
+ SimpleXMLRPCServer.__init__(self, addr, requestHandler, logRequests,
+ allow_none, encoding, bind_and_activate,
+ use_builtin_types)
+ self.funcs['system.introspection'] = self.system_introspection
+
+ def system_introspection(self):
+ """system.introspection() => TODO
+
+ """
+ methods = []
+ ignored = frozenset((
+ 'system_listMethods',
+ 'system_methodSignature',
+ 'system_methodHelp',
+ 'system_introspection'))
+ for method_name in self.system_listMethods():
+ if method_name in self.funcs:
+ method = self.funcs[method_name]
+ elif self.instance is not None:
+ if not hasattr(self.instance, '_dispatch'):
+ try:
+ method = resolve_dotted_attribute(
+ self.instance,
+ method_name
+ )
+ except AttributeError:
+ # XXX: what to do now?
+ continue
+ else:
+ # XXX: how about now?
+ continue
+ else:
+ assert 0, "Could not find method in self.functions and no "\
+ "instance installed"
+ if method.__name__ in ignored:
+ # ignore introspection
+ continue
+ try:
+ sig = inspect.signature(method)
+ signature = str(sig)
+ parameters = ", ".join(tuple(sig.parameters))
+ except ValueError:
+ # probably a builtin ?
+ signature = '(*args, **kwargs)'
+ parameters = '*args, **kwargs'
+ doc = inspect.getdoc(method) or ""
+ methods.append((method_name, signature, parameters, doc))
+ return methods
+
+
if __name__ == '__main__':
import datetime
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment