Last active
February 25, 2016 10:06
-
-
Save mthh/df1161a7708fe500732b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
R client (through Rpy2) trying to be used in a background for a WebPage | |
providing a R interactive console. | |
@author: mz | |
""" | |
import threading | |
import time | |
import zmq | |
import sys | |
import os | |
from collections import deque | |
from psutil import Popen, Process | |
from zmq.eventloop.ioloop import IOLoop | |
from zmq.eventloop.zmqstream import ZMQStream | |
if not os.path.isdir('/tmp/feeds'): | |
try: | |
os.mkdir('/tmp/feeds') | |
except Exception as err: | |
print(err) | |
sys.exit() | |
url_worker = "ipc:///tmp/feeds/rpy2_workers" | |
class client_Rpy: | |
""" | |
Basic request-reply ZMQ client sending R expression to evaluate | |
by a Rpy2 worker and receiving json response (console output / status) | |
""" | |
def __init__(self, client_url, i, init=True, worker_pid=None, thread_q=None): | |
if init: | |
self.worker_process, self.thread_q = launch_rpy_queue_worker(client_url) | |
else: | |
self.worker_process, self.thread_q = Process(worker_pid), thread_q | |
self.context = zmq.Context.instance() | |
self.socket = self.context.socket(zmq.REQ) | |
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) | |
self.socket.connect(client_url) | |
def reval(self, expression): | |
self.socket.send(expression.encode()) | |
self.reply = self.socket.recv() | |
print(self.reply.decode()) | |
return self.reply | |
def disconnect_close(self): | |
self.socket.send(b'CLOSE') | |
time.sleep(0.5) | |
self.worker_process.terminate() | |
self.worker_process.wait() | |
self.socket.close() | |
self.context.term() | |
if self.thread_q != None: | |
self.thread_q.join() | |
sys.exit(0) | |
class Rpy2_Queue(object): | |
"""Worker Queue class using ZMQStream/IOLoop for event dispatching""" | |
def __init__(self, url_client, url_worker, total_workers): | |
self.total_workers = total_workers | |
self.available_workers = 0 | |
self.workers = deque() | |
context = zmq.Context() | |
frontend = context.socket(zmq.ROUTER) | |
frontend.bind(url_client) | |
backend = context.socket(zmq.ROUTER) | |
backend.bind(url_worker) | |
self.backend = ZMQStream(backend) | |
self.frontend = ZMQStream(frontend) | |
self.backend.on_recv(self.handle_backend) | |
self.loop = IOLoop.instance() | |
def handle_backend(self, msg): | |
# Queue worker address for routing clients/workers: | |
worker_addr, empty, client_addr = msg[:3] | |
assert self.available_workers < self.total_workers | |
# Add worker back to the list of workers | |
self.available_workers += 1 | |
self.workers.append(worker_addr) | |
assert empty == b"" | |
# Third frame is READY or else a client reply address | |
# If client reply, send rest back to frontend | |
if client_addr != b"READY": | |
empty, reply = msg[3:] | |
assert empty == b"" | |
self.frontend.send_multipart([client_addr, b'', reply]) | |
if self.available_workers == 1: | |
# on first recv, start accepting frontend messages | |
self.frontend.on_recv(self.handle_frontend) | |
def handle_frontend(self, msg): | |
client_addr, empty, request = msg | |
assert empty == b"" | |
self.available_workers -= 1 | |
worker_id = self.workers.popleft() | |
self.backend.send_multipart([worker_id, b'', client_addr, b'', request]) | |
if request == b'CLOSE': | |
self.loop.stop() | |
return | |
if self.available_workers == 0: | |
# stop receiving until workers become available again | |
self.frontend.stop_on_recv() | |
def prepare_worker(nb_worker): | |
os.chdir('/home/mz/p2/rpy2') | |
# Start the R worker : | |
r_process = Popen(['python3', 'rpy2_session_console_worker.py', '{}'.format(nb_worker)]) | |
time.sleep(0.3) | |
return r_process | |
def launch_queue(url_client): | |
queue = Rpy2_Queue(url_client, url_worker, 1) | |
loop = IOLoop.instance().start() | |
def launch_rpy_queue_worker(url_client): | |
rpy2_worker = prepare_worker(1) | |
thread_q = threading.Thread(target=launch_queue, args=(url_client, )) | |
thread_q.start() | |
time.sleep(0.3) | |
return rpy2_worker, thread_q | |
def test(): | |
crp = client_Rpy("ipc:///tmp/feeds/rpy2_clients", 789) | |
crp.reval('R.Version()') | |
crp.reval('a<-c(1,2,3)') | |
crp.reval('b<- c(19,22,33) * a *9') | |
crp.reval('d <- data.frame(b)') | |
crp.reval('print(d)') | |
crp.reval('plot(d)') | |
crp.reval('d <- data.frame(b)') | |
crp.disconnect_close() | |
if __name__ == '__main__': | |
test() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Rpy2 worker, aimed to be launched as a separate process and to communicate | |
with the client through ZMQ, trying to emulate a R console. | |
@author: mz | |
""" | |
from collections import deque | |
import rpy2.rinterface as rinterface | |
import os | |
import zmq | |
import json | |
import sys | |
class Rpy2_worker: | |
""" | |
Rpy2 worker trying to emulate a remote R console | |
""" | |
def __init__(self, ident, worker_url, env='.GlobalEnv'): | |
import rpy2.robjects as robjects | |
from rpy2.robjects.packages import importr | |
self.importr = importr | |
self.r = robjects.r | |
self.reval = robjects.reval | |
self.context = zmq.Context.instance() | |
self.socket = self.context.socket(zmq.REQ) | |
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident)) | |
self.socket.connect("ipc:///tmp/feeds/rpy2_workers") | |
self.socket.send(b"READY") | |
self.console_output = deque() | |
try: | |
self.rversion = self.r('R.version.string')[0] | |
print("Worker {} initialized on {}".format( | |
self.socket.identity.decode(), self.rversion)) | |
self.capture_R_output() | |
except Exception as err: | |
err( | |
"Something wrong happened while initializing the Rpy2 executor") | |
return -1 | |
def fun_output(self, x): | |
self.console_output.append(x) | |
def capture_R_output(self): | |
rinterface.set_writeconsole(self.fun_output) | |
def uncapture_R_output(self): | |
rinterface.set_writeconsole(rinterface.consolePrint) | |
def listen(self): | |
try: | |
# self.last_modified_graphics = os.stat('/tmp/r_output.svg').st_mtime | |
grdevices = self.importr('grDevices') | |
grdevices.svg(file='/tmp/r_output.svg') | |
if not os.path.exists('/tmp/r_output.svg'): | |
with open('/tmp/r_output.svg', 'w') as f: | |
f.write('') | |
while True: | |
self.last_modified_graphics = os.stat('/tmp/r_output.svg').st_mtime | |
result = '' | |
header, empty, request = self.socket.recv_multipart() | |
if request == b"CLOSE": | |
print('CLOSE_OK') | |
break | |
try: | |
print(request.decode()) | |
result_r = self.reval(request.decode()) | |
print(result_r) | |
except self.rinterface.RRuntimeError as err: | |
status = "Rpy2 intercepted error : " + str(err) | |
print(status) | |
if self.last_modified_graphics != os.stat('/tmp/r_output.svg').st_mtime: | |
grdevices.dev_off() # Flush the graphical output ? | |
with open("/tmp/r_output.svg", "r") as f: | |
print('There is a graphical output...Returning it in ' | |
'the console field') | |
result = f.read() | |
grdevices.svg(file='/tmp/r_output.svg') # And restart it ... | |
if len(self.console_output) == 0 and 'rpy2' in str(type(result_r)): | |
status = 'OK - No console output' | |
elif len(self.console_output) > 0 and 'rpy2' in str(type(result_r)): | |
status = 'OK' | |
result = ' '.join([self.console_output.popleft() for i in range(len(self.console_output))]) | |
else: | |
status = 'Something went wrong...' | |
response = json.dumps({'Result': result, | |
'Status': status}) | |
print(response) | |
self.socket.send_multipart([header, b'', response.encode()]) | |
except zmq.ContextTerminated: | |
self.socket.close() | |
except KeyboardInterrupt: | |
self.socket.close() | |
self.context.term() | |
return | |
finally: | |
grdevices.dev_off() | |
self.socket.close() | |
self.context.term() | |
print('Session over') | |
return | |
if __name__ == '__main__': | |
assert len(sys.argv) == 2 | |
rpw = Rpy2_worker(sys.argv[1], "ipc:///tmp/feeds/rpy2_workers") | |
rpw.listen() | |
#import rpy2.rinterface as rinterface | |
#from collections import deque | |
#class Rpy2_worker: | |
# """ | |
# Rpy2 worker trying to emulate an interactive R console | |
# """ | |
# def __init__(self, port, env='.GlobalEnv'): | |
# import rpy2.robjects as robjects | |
# self.r = robjects.r | |
# self.context = zmq.Context.instance() | |
# self.socket = self.context.socket(zmq.REP) | |
## self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident)) | |
# self.socket.connect("ipc:///tmp/rp"+str(port)) | |
# self.console_output = [] | |
# try: | |
# self.rversion = self.r('R.version.string')[0] | |
# print("Worker {} initialized on {}".format(port, self.rversion)) | |
## self.capture_R_output() | |
# except Exception as err: | |
# raise err( | |
# "Something wrong happened while initializing the Rpy2 executor") | |
# return -1 | |
# | |
## def fun_output(self, x): | |
## self.console_output.append(x) | |
## | |
## def capture_R_output(self): | |
## rinterface.set_writeconsole(self.fun_output) | |
## | |
## def uncapture_R_output(self): | |
## rinterface.set_writeconsole(rinterface.consolePrint) | |
# | |
# def listen(self): | |
# try: | |
# while True: | |
# request = self.socket.recv() | |
# print('Received ', request.decode()) | |
# if b'CLOSE' in request: | |
# self.socket.send(b'OK') | |
# self.socket.close() | |
# # self.context | |
# sys.exit() | |
# try: | |
# print(request.decode()) | |
# result = self.reval(request.decode()) | |
# print(result) | |
# except self.rinterface.RRuntimeError as err: | |
# status = "Rpy2 intercepted error : " + str(err) | |
# print(status) | |
# | |
# if len(self.console_output) == 0 and 'rpy2' in str(type(result)): | |
# status = 'OK - No console output' | |
# elif len(self.console_output) > 0 and 'rpy2' in str(type(result)): | |
# status = 'OK' | |
# result = ' '.join([self.console_output.popleft() for i in range(len(self.console_output))]) | |
# else: | |
# status = 'Something went wrong...' | |
# response = json.dumps({'Result': result, | |
# 'Status': status}) | |
# print(response) | |
# self.socket.send(response) | |
# except zmq.ContextTerminated: | |
# return | |
# except KeyboardInterrupt: | |
# self.socket.close() | |
# return | |
# | |
## def send_console_output(self): | |
## pass | |
## | |
## def listen_expression(self): | |
## pass | |
# | |
#if __name__ == '__main__': | |
# assert len(sys.argv) == 2 | |
# rp = Rpy2_worker(sys.argv[1]) | |
# rp.listen() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment