Skip to content

Instantly share code, notes, and snippets.

@mthh
Last active February 25, 2016 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mthh/df1161a7708fe500732b to your computer and use it in GitHub Desktop.
Save mthh/df1161a7708fe500732b to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
R client (through Rpy2) trying to be used in a background for a WebPage
providing a R interactive console.
@author: mz
"""
import threading
import time
import zmq
import sys
import os
from collections import deque
from psutil import Popen, Process
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
if not os.path.isdir('/tmp/feeds'):
try:
os.mkdir('/tmp/feeds')
except Exception as err:
print(err)
sys.exit()
url_worker = "ipc:///tmp/feeds/rpy2_workers"
class client_Rpy:
"""
Basic request-reply ZMQ client sending R expression to evaluate
by a Rpy2 worker and receiving json response (console output / status)
"""
def __init__(self, client_url, i, init=True, worker_pid=None, thread_q=None):
if init:
self.worker_process, self.thread_q = launch_rpy_queue_worker(client_url)
else:
self.worker_process, self.thread_q = Process(worker_pid), thread_q
self.context = zmq.Context.instance()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
self.socket.connect(client_url)
def reval(self, expression):
self.socket.send(expression.encode())
self.reply = self.socket.recv()
print(self.reply.decode())
return self.reply
def disconnect_close(self):
self.socket.send(b'CLOSE')
time.sleep(0.5)
self.worker_process.terminate()
self.worker_process.wait()
self.socket.close()
self.context.term()
if self.thread_q != None:
self.thread_q.join()
sys.exit(0)
class Rpy2_Queue(object):
"""Worker Queue class using ZMQStream/IOLoop for event dispatching"""
def __init__(self, url_client, url_worker, total_workers):
self.total_workers = total_workers
self.available_workers = 0
self.workers = deque()
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind(url_client)
backend = context.socket(zmq.ROUTER)
backend.bind(url_worker)
self.backend = ZMQStream(backend)
self.frontend = ZMQStream(frontend)
self.backend.on_recv(self.handle_backend)
self.loop = IOLoop.instance()
def handle_backend(self, msg):
# Queue worker address for routing clients/workers:
worker_addr, empty, client_addr = msg[:3]
assert self.available_workers < self.total_workers
# Add worker back to the list of workers
self.available_workers += 1
self.workers.append(worker_addr)
assert empty == b""
# Third frame is READY or else a client reply address
# If client reply, send rest back to frontend
if client_addr != b"READY":
empty, reply = msg[3:]
assert empty == b""
self.frontend.send_multipart([client_addr, b'', reply])
if self.available_workers == 1:
# on first recv, start accepting frontend messages
self.frontend.on_recv(self.handle_frontend)
def handle_frontend(self, msg):
client_addr, empty, request = msg
assert empty == b""
self.available_workers -= 1
worker_id = self.workers.popleft()
self.backend.send_multipart([worker_id, b'', client_addr, b'', request])
if request == b'CLOSE':
self.loop.stop()
return
if self.available_workers == 0:
# stop receiving until workers become available again
self.frontend.stop_on_recv()
def prepare_worker(nb_worker):
os.chdir('/home/mz/p2/rpy2')
# Start the R worker :
r_process = Popen(['python3', 'rpy2_session_console_worker.py', '{}'.format(nb_worker)])
time.sleep(0.3)
return r_process
def launch_queue(url_client):
queue = Rpy2_Queue(url_client, url_worker, 1)
loop = IOLoop.instance().start()
def launch_rpy_queue_worker(url_client):
rpy2_worker = prepare_worker(1)
thread_q = threading.Thread(target=launch_queue, args=(url_client, ))
thread_q.start()
time.sleep(0.3)
return rpy2_worker, thread_q
def test():
crp = client_Rpy("ipc:///tmp/feeds/rpy2_clients", 789)
crp.reval('R.Version()')
crp.reval('a<-c(1,2,3)')
crp.reval('b<- c(19,22,33) * a *9')
crp.reval('d <- data.frame(b)')
crp.reval('print(d)')
crp.reval('plot(d)')
crp.reval('d <- data.frame(b)')
crp.disconnect_close()
if __name__ == '__main__':
test()
# -*- coding: utf-8 -*-
"""
Rpy2 worker, aimed to be launched as a separate process and to communicate
with the client through ZMQ, trying to emulate a R console.
@author: mz
"""
from collections import deque
import rpy2.rinterface as rinterface
import os
import zmq
import json
import sys
class Rpy2_worker:
"""
Rpy2 worker trying to emulate a remote R console
"""
def __init__(self, ident, worker_url, env='.GlobalEnv'):
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
self.importr = importr
self.r = robjects.r
self.reval = robjects.reval
self.context = zmq.Context.instance()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident))
self.socket.connect("ipc:///tmp/feeds/rpy2_workers")
self.socket.send(b"READY")
self.console_output = deque()
try:
self.rversion = self.r('R.version.string')[0]
print("Worker {} initialized on {}".format(
self.socket.identity.decode(), self.rversion))
self.capture_R_output()
except Exception as err:
err(
"Something wrong happened while initializing the Rpy2 executor")
return -1
def fun_output(self, x):
self.console_output.append(x)
def capture_R_output(self):
rinterface.set_writeconsole(self.fun_output)
def uncapture_R_output(self):
rinterface.set_writeconsole(rinterface.consolePrint)
def listen(self):
try:
# self.last_modified_graphics = os.stat('/tmp/r_output.svg').st_mtime
grdevices = self.importr('grDevices')
grdevices.svg(file='/tmp/r_output.svg')
if not os.path.exists('/tmp/r_output.svg'):
with open('/tmp/r_output.svg', 'w') as f:
f.write('')
while True:
self.last_modified_graphics = os.stat('/tmp/r_output.svg').st_mtime
result = ''
header, empty, request = self.socket.recv_multipart()
if request == b"CLOSE":
print('CLOSE_OK')
break
try:
print(request.decode())
result_r = self.reval(request.decode())
print(result_r)
except self.rinterface.RRuntimeError as err:
status = "Rpy2 intercepted error : " + str(err)
print(status)
if self.last_modified_graphics != os.stat('/tmp/r_output.svg').st_mtime:
grdevices.dev_off() # Flush the graphical output ?
with open("/tmp/r_output.svg", "r") as f:
print('There is a graphical output...Returning it in '
'the console field')
result = f.read()
grdevices.svg(file='/tmp/r_output.svg') # And restart it ...
if len(self.console_output) == 0 and 'rpy2' in str(type(result_r)):
status = 'OK - No console output'
elif len(self.console_output) > 0 and 'rpy2' in str(type(result_r)):
status = 'OK'
result = ' '.join([self.console_output.popleft() for i in range(len(self.console_output))])
else:
status = 'Something went wrong...'
response = json.dumps({'Result': result,
'Status': status})
print(response)
self.socket.send_multipart([header, b'', response.encode()])
except zmq.ContextTerminated:
self.socket.close()
except KeyboardInterrupt:
self.socket.close()
self.context.term()
return
finally:
grdevices.dev_off()
self.socket.close()
self.context.term()
print('Session over')
return
if __name__ == '__main__':
assert len(sys.argv) == 2
rpw = Rpy2_worker(sys.argv[1], "ipc:///tmp/feeds/rpy2_workers")
rpw.listen()
#import rpy2.rinterface as rinterface
#from collections import deque
#class Rpy2_worker:
# """
# Rpy2 worker trying to emulate an interactive R console
# """
# def __init__(self, port, env='.GlobalEnv'):
# import rpy2.robjects as robjects
# self.r = robjects.r
# self.context = zmq.Context.instance()
# self.socket = self.context.socket(zmq.REP)
## self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident))
# self.socket.connect("ipc:///tmp/rp"+str(port))
# self.console_output = []
# try:
# self.rversion = self.r('R.version.string')[0]
# print("Worker {} initialized on {}".format(port, self.rversion))
## self.capture_R_output()
# except Exception as err:
# raise err(
# "Something wrong happened while initializing the Rpy2 executor")
# return -1
#
## def fun_output(self, x):
## self.console_output.append(x)
##
## def capture_R_output(self):
## rinterface.set_writeconsole(self.fun_output)
##
## def uncapture_R_output(self):
## rinterface.set_writeconsole(rinterface.consolePrint)
#
# def listen(self):
# try:
# while True:
# request = self.socket.recv()
# print('Received ', request.decode())
# if b'CLOSE' in request:
# self.socket.send(b'OK')
# self.socket.close()
# # self.context
# sys.exit()
# try:
# print(request.decode())
# result = self.reval(request.decode())
# print(result)
# except self.rinterface.RRuntimeError as err:
# status = "Rpy2 intercepted error : " + str(err)
# print(status)
#
# if len(self.console_output) == 0 and 'rpy2' in str(type(result)):
# status = 'OK - No console output'
# elif len(self.console_output) > 0 and 'rpy2' in str(type(result)):
# status = 'OK'
# result = ' '.join([self.console_output.popleft() for i in range(len(self.console_output))])
# else:
# status = 'Something went wrong...'
# response = json.dumps({'Result': result,
# 'Status': status})
# print(response)
# self.socket.send(response)
# except zmq.ContextTerminated:
# return
# except KeyboardInterrupt:
# self.socket.close()
# return
#
## def send_console_output(self):
## pass
##
## def listen_expression(self):
## pass
#
#if __name__ == '__main__':
# assert len(sys.argv) == 2
# rp = Rpy2_worker(sys.argv[1])
# rp.listen()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment