Created
August 16, 2016 06:46
-
-
Save KixPanganiban/744237838ce2111f58800da307b5c25d to your computer and use it in GitHub Desktop.
Zask Test File for fix/exceptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import gevent | |
from zask import Zask, _request_ctx | |
from zask.ext.zerorpc import ZeroRPC | |
from zerorpc.exceptions import RemoteError | |
app = Zask('testapp') | |
config = dict(DEBUG=False, | |
ERROR_LOG='/tmp/zask.error.log', | |
ZERORPC_TESTAPP={ | |
'1.0': 'tcp://0.0.0.0:9999', | |
'client_keys': ['dev'], | |
'access_key': 'testapp' | |
}) | |
app.config.update(config) | |
rpc = ZeroRPC() | |
rpc.init_app(app) | |
class TestApp(object): | |
__service_name__ = 'testapp' | |
__version__ = '1.0' | |
def ping(self): | |
request_event = self.get_request_event() | |
print(request_event.header) | |
return json.dumps({"response": "pong"}) | |
def test_ping(): | |
c = rpc.Client('testapp', version='1.0') | |
try: | |
c.ping() | |
except RemoteError as e: | |
print("%s: %s" % (e.name, e.msg)) | |
server = rpc.Server(TestApp()) | |
tasks = [gevent.spawn(server.run), gevent.spawn(test_ping)] | |
gevent.joinall(tasks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment