Skip to content

Instantly share code, notes, and snippets.

@mthh
Last active February 17, 2016 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mthh/d62f90f5cf68259917c8 to your computer and use it in GitHub Desktop.
Save mthh/d62f90f5cf68259917c8 to your computer and use it in GitHub Desktop.
noname_0
# -*- coding: utf-8 -*-
"""
Three very basic variations to launch a R instance and connect
a python "rClient" instance to it (1-to-1) via a ZMQ (ipc) socket in order
to get results from statements evaluated by the R instance.
@author: mz
"""
from psutil import Popen, Process
import zmq
import sys
import os
if not os.path.isdir('/tmp/feeds'):
try:
os.mkdir('/tmp/feeds')
except Exception as err:
print(err)
sys.exit()
class rClient:
"""Class for connecting with R zmq socket (REQ/REP pattern)"""
def __init__(self, port, init, key=None, pid=None):
global g
if init:
self.process = Popen([
'Rscript',
'--vanilla',
'R/Rserv_zmq2.R',
'{}'.format(port)])
elif pid:
self.process = Process(pid)
else:
raise AttributeError('Something wrong happened, the client have '
'to be initialised or have a pid')
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect('ipc:///tmp/feeds/'+str(port))
self.key = key
def rEval(self, command):
self.socket.send(command)
self.last_reply = self.socket.recv()
return self.last_reply
def disconnect(self):
command = b"exitR()"
self.socket.send(command)
self.socket.recv()
try:
if self.key:
g.keys_mapping.pop(self.key)
print('Exited session from {}'.format(self.key or self))
self.process.kill()
self.process.wait()
self.socket.close()
except Exception as err:
print(err)
def manual_disconnect(self):
self.process.kill()
self.process.wait()
self.socket.close()
if self.key:
g.keys_mapping.pop(self.key)
class rClient_pushpull:
"""Class for connecting with R zmq socket (PULL/PUSH pattern)"""
def __init__(self, port, init, key=None, pid=None):
global g
if init:
self.process = Popen([
'Rscript',
'--vanilla',
'R/Rserv_zmq_push_pull.R',
'{}'.format(port)])
elif pid:
self.process = Process(pid)
else:
raise AttributeError('Something wrong happened, the client have '
'to be initialised or have a pid')
self.context = zmq.Context()
self.socket_recv = self.context.socket(zmq.PULL)
self.socket_send = self.context.socket(zmq.PUSH)
self.socket_recv.connect('ipc:///tmp/feeds/'+str(port))
self.socket_send.connect('ipc:///tmp/feeds/'+str(port+1))
self.key = key
def rEval(self, command):
self.socket_send.send(command)
self.last_reply = self.socket_recv.recv()
return self.last_reply
def disconnect(self):
command = b"exitR()"
self.socket_send.send(command)
self.socket_recv.recv()
try:
if self.key:
g.keys_mapping.pop(self.key)
print('Exited session from {}'.format(self.key or self))
self.process.kill()
self.process.wait()
self.socket_send.close()
self.socket_recv.close()
except Exception as err:
print(err)
def manual_disconnect(self):
self.socket_send.close()
self.socket_recv.close()
self.process.kill()
self.process.wait()
if self.key:
g.keys_mapping.pop(self.key)
import asyncio
import zmq.asyncio
class rClient_async:
"""Class for connecting with R zmq.asyncio socket (REQ/REP pattern)"""
def __init__(self, port, init, ctx, key=None, pid=None):
if init:
self.process = Popen([
'Rscript',
'--vanilla',
'R/Rserv_zmq2.R',
'{}'.format(port)])
elif pid:
self.process = Process(pid)
else:
raise AttributeError('Something wrong happened, the client have '
'to be initialised or have a pid')
self.ctx = ctx
self.socket = self.ctx.socket(zmq.REQ)
self.socket.connect('ipc:///tmp/feeds/'+str(port))
self.key = key
@asyncio.coroutine
def rEval(self, command):
yield from self.socket.send(command)
return (yield from self.socket.recv())
@asyncio.coroutine
def disconnect(self):
command = b"exitR()"
yield from self.socket.send(command)
yield from self.socket.recv()
try:
if self.key:
g.keys_mapping.pop(self.key)
print('Exited session from {}'.format(self.key or self))
self.process.kill()
self.process.wait()
self.socket.close()
except Exception as err:
print(err)
def manual_disconnect(self):
self.process.kill()
self.process.wait()
self.socket.close()
if self.key:
g.keys_mapping.pop(self.key)
if __name__ == '__main__': # Quick and dirty tests ...
from random import randint
from threading import Thread, Lock
from time import time
class g:
keys_mapping = {}
seen_port = set()
lock = Lock()
def choose_port():
while True:
p = randint(1, 9999)
if p not in seen_port:
with lock:
seen_port.add(p)
return p
def test_rep_req():
p = choose_port()
r = rClient(p, True)
print(r.process)
result = r.rEval(b'a <- c(1,2,3)*2')
assert result == b'2, 4, 6' # The server is running correctly,
result = r.rEval(b'a * 10') # ... success to do multiplication
assert result == b'20, 40, 60' # ... and to remember variable
r.disconnect() # Let's try to disconnect it
assert not r.process.is_running() # Now the R process shouldn't be running
return
def test_pull_push():
p = choose_port()
r = rClient_pushpull(p, True)
print(r.process)
result = r.rEval(b'a <- c(1,2,3)*2')
assert result == b'2, 4, 6' # The server is running correctly,
result = r.rEval(b'a * 10') # ... success to do multiplication
assert result == b'20, 40, 60' # ... and to remember variable
r.disconnect() # Let's try to disconnect it
assert not r.process.is_running() # Now the R process shouldn't be running
return
def test_async():
ctx = zmq.asyncio.Context() # Could probably have been in the rClient_async class too ?
loop = zmq.asyncio.ZMQEventLoop() # zmq.asyncio provides his own EventLoop
asyncio.set_event_loop(loop)
@asyncio.coroutine
def process():
p = choose_port()
r = rClient_async(p, True, ctx)
print(r.process)
result = yield from r.rEval(b'a <- c(1,2,3)*2')
assert result == b'2, 4, 6' # The server is running correctly,
result = yield from r.rEval(b'a * 10') # ... success to do multiplication
assert result == b'20, 40, 60' # ... and to remember variable
yield from r.disconnect() # Let's try to disconnect it (its also a coroutine)
assert not r.process.is_running() # Now the R process shouldn't be running
return
loop.run_until_complete(process())
return
def test_both():
"""
Use python threads to launch the 3 tests together
"""
st = time()
threads = [
Thread(target=test_rep_req),
Thread(target=test_pull_push),
Thread(target=test_async)]
[t.start() for t in threads]
[t.join() for t in threads]
return '{:.3f} s\n'.format(time()-st)
def test_many(fun=test_rep_req, launcher_threads=4):
"""
Use python threads to launch many concurrent R process
(-> high CPU usage)
"""
st = time()
threads = [Thread(target=fun) for i in range(launcher_threads)]
[t.start() for t in threads]
[t.join() for t in threads]
return '{:.3f} s\n'.format(time()-st)
def test_more(nb=50, **kwargs):
for i in range(nb):
test_many(**kwargs)
#!/usr/bin/env Rscript
require(base)
require(methods)
require(stats)
require(pbdZMQ)
require(jsonlite)
startServer_test <- function(port){
ctx = zmq.ctx.new()
socket = zmq.socket(ctx, .pbd_env$ZMQ.ST$REP)
zmq.bind(socket, paste0("ipc:///tmp/feeds/", port))
while(TRUE){
buf = zmq.recv(socket);
msg = buf$buf
if(class(msg) == 'raw'){msg = rawToChar(msg)}
print(msg)
result <- rEval(msg)
zmq.send(socket, result[2])
if(result[1] == -1) break
}
zmq.close(socket)
zmq.ctx.destroy(ctx)
q("no")
}
rEval <- function(message){
validateOuput <- function(output)encodeString(toString(format(output)))
if(grepl('system', message) | grepl('exitR', message)){
out <- c(-1, "Now exiting R\n")
}
else{
output <- tryCatch(
eval(parse(text=message), envir = .GlobalEnv),
error = function(e)paste0(e$message, "\n")
)
print(output)
out <- c(0, validateOuput(output))
}
return(out)
}
startServer_test(commandArgs(TRUE)[1])
#!/usr/bin/env Rscript
require(base)
require(methods)
require(stats)
require(pbdZMQ)
require(jsonlite)
startServer_test <- function(port){
ctx = zmq.ctx.new()
socket_recv = zmq.socket(ctx, .pbd_env$ZMQ.ST$PULL)
socket_send = zmq.socket(ctx, .pbd_env$ZMQ.ST$PUSH)
zmq.bind(socket_recv, paste0("ipc:///tmp/feeds/", as.character(as.numeric(port)+1)))
zmq.bind(socket_send, paste0("ipc:///tmp/feeds/", port))
while(TRUE){
buf = zmq.recv(socket_recv);
msg = buf$buf
if(class(msg) == 'raw'){msg = rawToChar(msg)}
print(msg)
result <- rEval(msg)
zmq.send(socket_send, result[2])
if(result[1] == -1) break
}
zmq.close(socket_send)
zmq.close(socket_recv)
zmq.ctx.destroy(ctx)
q("no")
}
rEval <- function(message){
validateOuput <- function(output)encodeString(toString(format(output)))
if(grepl('system', message) | grepl('exitR', message)){
out <- c(-1, "Now exiting R\n")
}
else{
output <- tryCatch(
eval(parse(text=message), envir = .GlobalEnv),
error = function(e)paste0(e$message, "\n")
)
print(output)
out <- c(0, validateOuput(output))
}
return(out)
}
startServer_test(commandArgs(TRUE)[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment