Skip to content

Instantly share code, notes, and snippets.

@KixPanganiban
Created August 16, 2016 06:46
Show Gist options
  • Save KixPanganiban/744237838ce2111f58800da307b5c25d to your computer and use it in GitHub Desktop.
Save KixPanganiban/744237838ce2111f58800da307b5c25d to your computer and use it in GitHub Desktop.
Zask Test File for fix/exceptions
import json
import gevent
from zask import Zask, _request_ctx
from zask.ext.zerorpc import ZeroRPC
from zerorpc.exceptions import RemoteError
app = Zask('testapp')
config = dict(DEBUG=False,
ERROR_LOG='/tmp/zask.error.log',
ZERORPC_TESTAPP={
'1.0': 'tcp://0.0.0.0:9999',
'client_keys': ['dev'],
'access_key': 'testapp'
})
app.config.update(config)
rpc = ZeroRPC()
rpc.init_app(app)
class TestApp(object):
__service_name__ = 'testapp'
__version__ = '1.0'
def ping(self):
request_event = self.get_request_event()
print(request_event.header)
return json.dumps({"response": "pong"})
def test_ping():
c = rpc.Client('testapp', version='1.0')
try:
c.ping()
except RemoteError as e:
print("%s: %s" % (e.name, e.msg))
server = rpc.Server(TestApp())
tasks = [gevent.spawn(server.run), gevent.spawn(test_ping)]
gevent.joinall(tasks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment