Skip to content

Instantly share code, notes, and snippets.

@plq plq/output
Last active Aug 29, 2015

Embed
What would you like to do?
INFO:root:Spyne 2.11.0 initialized
INFO:root:create_incident: Parameters:<ns0:IncidentParams xmlns:ns0="tns">
<ns0:number>INC0000001</ns0:number>
</ns0:IncidentParams>
.
----------------------------------------------------------------------
Ran 1 test in 0.009s
OK
('200 OK', [('Content-Length', '313'), ('Content-Type', 'text/xml; charset=utf-8')])
<senv:Envelope xmlns:tns="tns" xmlns:senv="http://schemas.xmlsoap.org/soap/envelope/">
<senv:Body>
<tns:create_incidentResponse>
<tns:create_incidentResult>
<tns:number>INC0000001</tns:number>
</tns:create_incidentResult>
</tns:create_incidentResponse>
</senv:Body>
</senv:Envelope>
#!/usr/bin/env python2
import unittest
from lxml import etree
from StringIO import StringIO
from spyne.application import Application
from spyne.service import ServiceBase
from spyne.protocol.soap import Soap11
from spyne.decorator import srpc, rpc
from spyne.model.complex import ComplexModel
from spyne.model.primitive import String, Unicode
from spyne.server.wsgi import WsgiApplication
from spyne.util.xml import get_object_as_xml
import logging
logging.basicConfig(level=logging.INFO)
import spyne
logging.info("Spyne %s initialized", spyne.__version__)
#complex model definition, parameters that will be recieved in SOAP call,
#idea is to save model to the DB.
class IncidentParams(ComplexModel):
__type_name__ = 'IncidentParams'
__namespace__ = 'tns'
IncNum = Unicode.customize(
max_len = 15,
pattern = 'INC[0-9]+',
type_name = 'MajorIncidentNumber')
number = IncNum
def start_response(code, headers):
print(code, headers)
class TestService(unittest.TestCase):
def test_soap_bare_with_complexInput(self):
#service definition
class IncidentService(ServiceBase):
@srpc(IncidentParams, _returns=IncidentParams)
def create_incident(the_incident):
# This RPC logs the input and passes it through as the return value.
s = etree.tostring(get_object_as_xml(the_incident), pretty_print=True)
logging.info("create_incident: Parameters:%s" % s)
# Code_to_save_incident_to_DB(the_incident)
return the_incident
app = Application([IncidentService], 'tns', in_protocol=Soap11(),
out_protocol=Soap11(cleanup_namespaces=True))
req = """
<senv:Envelope xmlns:senv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:tns="tns">
<senv:Body>
<tns:create_incident>
<tns:the_incident>
<tns:number>INC0000001</tns:number>
</tns:the_incident>
</tns:create_incident>
</senv:Body>
</senv:Envelope>
"""
server = WsgiApplication(app)
resp = etree.fromstring(''.join(server({
'QUERY_STRING': '',
'PATH_INFO': '/create',
'REQUEST_METHOD': 'GET',
'SERVER_NAME': 'localhost',
'wsgi.input': StringIO(req)
}, start_response, "http://null")))
print etree.tostring(resp, pretty_print=True)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.