-
-
Save SiggyF/523e461e2d3b329d2a2b99c439a192ec to your computer and use it in GitHub Desktop.
import jsonrpc | |
dispatcher = jsonrpc.Dispatcher() | |
def calculate(a): | |
return a + 2 | |
dispatcher.add_method(calculate) | |
import json | |
request = { | |
"jsonrpc": "2.0", | |
"method": "calculate", | |
"params": { | |
"a": 2 | |
}, | |
"id": 3 | |
} | |
msg = json.dumps(request) | |
resp = jsonrpc.JSONRPCResponseManager.handle(msg, dispatcher) | |
reply = resp.json | |
print(reply) |
`import zmq
import logging
import jsonrpc
import json
import numpy
def create_connectivity(adress):
"""
Create tcp connectivity.
Returns:
socket: for outgoing messages
poller: for return messages
"""
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(adress)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
return socket, poller
def load_dispatcher():
dispatcher = jsonrpc.Dispatcher()
dispatcher.add_method(calculate)
dispatcher.add_method(modify_numpy_array)
dispatcher.add_method(modify_dictionary)
return dispatcher
def calculate(a):
b=3
result=a+b
return result
def modify_numpy_array(array):
"""Adds 10 to element 2"""
array[1]+=10
return array
def modify_dictionary(dictionary):
dictionary['Two']='2nd'
return dictionary
adress="tcp://127.0.0.1:5000"
#adress="tcp://*:8080"
socket, poller = create_connectivity(adress)
print "Server started"
dispatcher = load_dispatcher()
print "Methods loaded"
#listen loop
try:
while True:
incomming_packet = poller.poll(1000)
if len(incomming_packet) > 0:
message_received=socket.recv()
print "Received ", message_received
resp = jsonrpc.JSONRPCResponseManager.handle(message_received, dispatcher)
reply = resp.json
socket.send(str(reply).encode(), zmq.NOBLOCK)
print "Sending reply: ", reply
finally:
poller.unregister(socket)
socket.unbind(adress)
`
import zmq
import logging
import jsonrpc
import json
import numpy
def create_connectivity(adress):
"""
Create tcp connectivity.
Returns:
socket: for outgoing messages
poller: for return messages
"""
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(adress)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
return socket, poller
def load_dispatcher():
dispatcher = jsonrpc.Dispatcher()
dispatcher.add_method(calculate)
dispatcher.add_method(modify_numpy_array)
dispatcher.add_method(modify_dictionary)
return dispatcher
def calculate(a):
b=3
result=a+b
return result
def modify_numpy_array(array):
"""Adds 10 to element 2"""
array[1]+=10
return array
def modify_dictionary(dictionary):
dictionary['Two']='2nd'
return dictionary
adress="tcp://127.0.0.1:5000"
#adress="tcp://*:8080"
socket, poller = create_connectivity(adress)
print "Server started"
dispatcher = load_dispatcher()
print "Methods loaded"
#listen loop
try:
while True:
incomming_packet = poller.poll(1000)
if len(incomming_packet) > 0:
message_received=socket.recv()
print "Received ", message_received
resp = jsonrpc.JSONRPCResponseManager.handle(message_received, dispatcher)
reply = resp.json
socket.send(str(reply).encode(), zmq.NOBLOCK)
print "Sending reply: ", reply
finally:
poller.unregister(socket)
socket.unbind(adress)