Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@starenka
Forked from sputnikus/simple_as_hell.py
Created November 7, 2011 20:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save starenka/1346000 to your computer and use it in GitHub Desktop.
Save starenka/1346000 to your computer and use it in GitHub Desktop.
SOAP in Python - examples
import logging
from rpclib.application import Application
from rpclib.decorator import srpc
from rpclib.service import ServiceBase
from rpclib.model.primitive import String
from rpclib.model.primitive import Integer
from rpclib.model.complex import Iterable
from rpclib.interface.wsdl import Wsdl11
from rpclib.protocol.soap import Soap11
from rpclib.server.wsgi import WsgiApplication
class HighFiveService(ServiceBase):
@srpc(String, Integer, _returns=Iterable(String))
def give_five(name, times):
'''
This docstring appear as method documentation in the wsdl, which is
<strong>awesome</strong>
'''
for _ in xrange(times):
yield ', '.join(("High five", name))
if __name__ == "__main__":
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('rpclib.protocol.xml').setLevel(logging.DEBUG)
app = Application([HighFiveService], 'tns', interface=Wsdl11(),
in_protocol=Soap11(), out_protocol=Soap11())
server = make_server('127.0.0.1', 7789, WsgiApplication(app))
print "listening to http://127.0.0.1:7789"
print "wsdl is at: http://localhost:7789/?wsdl"
server.serve_forever()
import logging
from rpclib.application import Application
from rpclib.decorator import srpc
from rpclib.service import ServiceBase
from rpclib.model.primitive import String
from rpclib.model.primitive import Integer
from rpclib.model.complex import Array
from rpclib.interface.wsdl import Wsdl11
from rpclib.protocol.soap import Soap11
from rpclib.server.wsgi import WsgiApplication
class DB(object):
'''Like a bus'''
_db = list()
@classmethod
def write(cls, values):
cls._db.extend(values)
@classmethod
def read(cls, index):
if len(cls._db) <= index:
return ""
else:
return cls._db[index]
class RpcDbService(ServiceBase):
'''Bullshit, but still good example'''
@srpc(Array(String))
def write_values(values):
'''Writes values for ya'''
DB.write(values)
@srpc(Integer, _returns=String)
def read_value(index):
'''Read just one value'''
return DB.read(index)
if __name__ == '__main__':
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('rpclib.protocol.xml').setLevel(logging.DEBUG)
app = Application([RpcDbService], 'tns', interface=Wsdl11(),
in_protocol=Soap11(), out_protocol=Soap11())
server = make_server('127.0.0.1', 7789, WsgiApplication(app))
print "listening to http://127.0.0.1:7789"
print "wsdl is at: http://localhost:7789/?wsdl"
server.serve_forever()
from suds.client import Client
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger('suds.transport').setLevel(logging.DEBUG)
class DBio(object):
def __init__(self):
self._client = Client('http://localhost:7789/?wsdl')
def send_values(self, values):
array_of_strings = self._client.factory.create('stringArray')
array_of_strings.string.extend(values)
self._client.service.write_values(array_of_strings)
def recieve_value(self, index):
return self._client.service.read_value(index)
VALUES = [["Nekdo"],
["Neco"],
["Necojineho"]]
if __name__ == '__main__':
client = DBio()
client.send_values(VALUES)
print client.recieve_value(0)
print client.recieve_value(9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment