Skip to content

Instantly share code, notes, and snippets.

@SiggyF
Last active August 7, 2017 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SiggyF/523e461e2d3b329d2a2b99c439a192ec to your computer and use it in GitHub Desktop.
Save SiggyF/523e461e2d3b329d2a2b99c439a192ec to your computer and use it in GitHub Desktop.
import jsonrpc
dispatcher = jsonrpc.Dispatcher()
def calculate(a):
return a + 2
dispatcher.add_method(calculate)
import json
request = {
"jsonrpc": "2.0",
"method": "calculate",
"params": {
"a": 2
},
"id": 3
}
msg = json.dumps(request)
resp = jsonrpc.JSONRPCResponseManager.handle(msg, dispatcher)
reply = resp.json
print(reply)
@YoeriKraak
Copy link

import zmq
import logging
import jsonrpc
import json
import numpy

def create_connectivity(adress):
"""
Create tcp connectivity.
Returns:
socket: for outgoing messages
poller: for return messages
"""
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(adress)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
return socket, poller

def load_dispatcher():
dispatcher = jsonrpc.Dispatcher()
dispatcher.add_method(calculate)
dispatcher.add_method(modify_numpy_array)
dispatcher.add_method(modify_dictionary)
return dispatcher

def calculate(a):
b=3
result=a+b
return result

def modify_numpy_array(array):
"""Adds 10 to element 2"""
array[1]+=10
return array

def modify_dictionary(dictionary):
dictionary['Two']='2nd'
return dictionary

adress="tcp://127.0.0.1:5000"
#adress="tcp://*:8080"
socket, poller = create_connectivity(adress)
print "Server started"
dispatcher = load_dispatcher()
print "Methods loaded"

#listen loop
try:
while True:
incomming_packet = poller.poll(1000)
if len(incomming_packet) > 0:
message_received=socket.recv()
print "Received ", message_received
resp = jsonrpc.JSONRPCResponseManager.handle(message_received, dispatcher)
reply = resp.json
socket.send(str(reply).encode(), zmq.NOBLOCK)
print "Sending reply: ", reply
finally:
poller.unregister(socket)
socket.unbind(adress)

@YoeriKraak
Copy link

`import zmq
import logging
import jsonrpc
import json
import numpy

def create_connectivity(adress):
"""
Create tcp connectivity.
Returns:
socket: for outgoing messages
poller: for return messages
"""
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(adress)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
return socket, poller

def load_dispatcher():
dispatcher = jsonrpc.Dispatcher()
dispatcher.add_method(calculate)
dispatcher.add_method(modify_numpy_array)
dispatcher.add_method(modify_dictionary)
return dispatcher

def calculate(a):
b=3
result=a+b
return result

def modify_numpy_array(array):
"""Adds 10 to element 2"""
array[1]+=10
return array

def modify_dictionary(dictionary):
dictionary['Two']='2nd'
return dictionary

adress="tcp://127.0.0.1:5000"
#adress="tcp://*:8080"
socket, poller = create_connectivity(adress)
print "Server started"
dispatcher = load_dispatcher()
print "Methods loaded"

#listen loop
try:
while True:
incomming_packet = poller.poll(1000)
if len(incomming_packet) > 0:
message_received=socket.recv()
print "Received ", message_received
resp = jsonrpc.JSONRPCResponseManager.handle(message_received, dispatcher)
reply = resp.json
socket.send(str(reply).encode(), zmq.NOBLOCK)
print "Sending reply: ", reply
finally:
poller.unregister(socket)
socket.unbind(adress)

`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment