Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
python xml-rpc
#!/usr/bin/env python
'''
@package test_client
@brief this module is for testing xmlrpc
Started : 2010/03/10
license: CQVista's NDA
@version: 1.0.0
@author: Moonchang Chae <mcchae@cqvista.com>
'''
__author__ = "MoonChang Chae <mcchae@cqvista.com>"
__date__ = "2010/03/10"
__version__ = "1.0.0"
__version_info__ = (1, 0, 0)
__license__ = "GCQVista's NDA"
import xmlrpclib
###########################################################################################
## connect xmlrpc
def connectService(url):
try:
s = xmlrpclib.ServerProxy(url)
s.system.listMethods()
return s
finally:
pass
###########################################################################################
## main test function
def main():
xrs = None
url = 'http://localhost:18789/_test_xmlrpc_'
try:
xrs = connectService(url)
xrs.do_a('a1')
xrs.do_b('a1', 'a2')
xrs.do_c('a1', 'a2', 'a3')
finally:
pass
###########################################################################################
if __name__ == '__main__':
main()
#!/usr/bin/env python
'''
@package test_service
@brief this module is for test xmlrpc
Started : 2010/03/10
@version: 1.0.0
@author: Moonchang Chae <mcchae@cqvista.com>
license: GCQVista's NDA
'''
__author__ = "MoonChang Chae <mcchae@cqvista.com>"
__date__ = "2010/03/10"
__version__ = "1.0.0"
__version_info__ = (1, 0, 0)
__license__ = "CQVista's NDA"
import threading
import time
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
###########################################################################################
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/_test_xmlrpc_',)
###########################################################################################
def logOut(msg):
print msg
###########################################################################################
class doRpc:
def __init__(self):
pass
def do_a(self, p1):
s = "[%s] in do_a(p1=<%s>)" % (threading.current_thread().getName(), p1)
logOut(s)
time.sleep(1)
logOut("%s DONE!" % s)
return s
def do_b(self, p1, p2):
s = "[%s] in do_b(p1=<%s>, p2=<%s>)" % (threading.current_thread().getName(), p1, p2)
logOut(s)
time.sleep(2)
logOut("%s DONE!" % s)
return s
def do_c(self, p1, p2, p3):
s = "[%s] in do_c(p1=<%s>, p2=<%s>, p3=<%s>)" % (threading.current_thread().getName(), p1, p2, p3)
logOut(s)
time.sleep(3)
logOut("%s DONE!" % s)
return s
###########################################################################################
## main function
def main():
# Create server
server = SimpleXMLRPCServer(("0.0.0.0", 18789),
requestHandler=RequestHandler,
logRequests = False,
allow_none = True)
server.allow_none = True
server.logRequests = False
server.register_introspection_functions()
server.register_instance(doRpc())
try:
# Run the server's main loop
server.serve_forever()
except:
pass
finally:
logOut('test Closed.')
###########################################################################################
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment