Last active
February 25, 2016 13:20
-
-
Save mthh/d996530456b215750970 to your computer and use it in GitHub Desktop.
Some tests with rpy2 and ZMQ sockets...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
@author: mz | |
""" | |
import threading | |
import time | |
import zmq | |
import sys | |
import os | |
import pickle | |
import numpy as np | |
from random import randint | |
from collections import deque | |
from psutil import Popen, signal | |
from zmq.eventloop.ioloop import IOLoop | |
from zmq.eventloop.zmqstream import ZMQStream | |
if not os.path.isdir('/tmp/feeds'): | |
try: | |
os.mkdir('/tmp/feeds') | |
except Exception as err: | |
print(err) | |
sys.exit() | |
result_list = [] # Add the result to a list for the example | |
rlock = threading.RLock() | |
def client_thread_py(client_url, expression, i): | |
""" | |
Basic request-reply ZMQ client sending R expression to evaluate | |
by a Rpy2 worker and receiving serialized python object. | |
""" | |
context = zmq.Context.instance() | |
socket = context.socket(zmq.REQ) | |
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) | |
socket.connect(client_url) | |
socket.send(expression.encode()) | |
reply = socket.recv() | |
reply = pickle.loads(reply) | |
with rlock: | |
result_list.append((i, reply)) | |
return | |
class Rpy2_Queue(object): | |
"""Worker Queue class using ZMQStream/IOLoop for event dispatching""" | |
def __init__(self, url_client, url_worker, total_workers): | |
self.total_workers = total_workers | |
self.available_workers = 0 | |
self.workers = deque() | |
context = zmq.Context() | |
frontend = context.socket(zmq.ROUTER) | |
frontend.bind(url_client) | |
backend = context.socket(zmq.ROUTER) | |
backend.bind(url_worker) | |
self.backend = ZMQStream(backend) | |
self.frontend = ZMQStream(frontend) | |
self.backend.on_recv(self.handle_backend) | |
self.loop = IOLoop.instance() | |
def handle_backend(self, msg): | |
# Queue worker address for routing clients/workers: | |
worker_addr, empty, client_addr = msg[:3] | |
assert self.available_workers < self.total_workers | |
# Add worker back to the list of workers | |
self.available_workers += 1 | |
self.workers.append(worker_addr) | |
assert empty == b"" | |
# Third frame is READY or else a client reply address | |
# If client reply, send rest back to frontend | |
if client_addr != b"READY": | |
empty, reply = msg[3:] | |
assert empty == b"" | |
self.frontend.send_multipart([client_addr, b'', reply]) | |
if self.available_workers == 1: | |
# on first recv, start accepting frontend messages | |
self.frontend.on_recv(self.handle_frontend) | |
def handle_frontend(self, msg): | |
client_addr, empty, request = msg | |
assert empty == b"" | |
self.available_workers -= 1 | |
worker_id = self.workers.popleft() | |
self.backend.send_multipart([worker_id, b'', client_addr, b'', request]) | |
if self.available_workers == 0: | |
# stop receiving until workers become available again | |
self.frontend.stop_on_recv() | |
def prepare_worker(nb_worker): | |
os.chdir('/home/mz/p2/rpy2') | |
# Start R workers : | |
r_process = [ | |
Popen(['python3', 'rpy2_worker.py', '{}'.format(i)]) | |
for i in range(nb_worker)] | |
time.sleep(2) | |
return r_process | |
if __name__ == "__main__": | |
url_worker = "ipc:///tmp/feeds/rpy2_workers" | |
url_client = "ipc:///tmp/feeds/rpy2_clients" | |
expressions = [ | |
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400, min=-8, max=1379852), 400)", | |
"a <- c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))", | |
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)", "mat4 <- log10(matrix(runif(455*455), 455))", | |
"d<-log1p(seq(77, 150))", "mat4 <- log10(matrix(runif(200*200), 200))", "mat4 *12", | |
"mat12 <- matrix(runif(72*72), 72) / 6", 'R.Version(); a<-c(1,2,3,4); a*8', | |
"sequ <- seq(1,1000)", "ct <- Sys.time()", "matz <- data.frame(matrix(runif(741*741), 654) / 8.6)", | |
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(555*555), 555) / 3.5", | |
"maty <- matrix(runif(200*200), 200)", "maty <- matrix(runif(200*200), 200)", | |
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400, min=-8, max=1379852), 400)", | |
"a <- c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))", | |
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)", "mat4 <- log10(matrix(runif(455*455), 455))", | |
"d<-log1p(seq(77, 150))", "mat4 <- log10(matrix(runif(200*200), 200))", | |
"mat12 <- matrix(runif(72*72), 72) / 6", 'R.Version(); a<-c(1,2,3,4); a*8', | |
"sequ <- seq(1,1000)", "ct <- Sys.time()", "matz <- data.frame(matrix(runif(741*741), 654) / 8.6)", | |
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(555*555), 555) / 3.5", | |
"maty <- matrix(runif(200*200), 200)", "maty <- matrix(runif(200*200), 200)" | |
] | |
# The number of worker will be used to launch each worker and by | |
# ... the broker to do the balance between them : | |
NBR_WORKERS = 12 | |
# Prepare the workers in separated process : | |
rpy2_workers = prepare_worker(NBR_WORKERS) | |
# Launch a bunch of clients (one per expression) : | |
threads = [] | |
for i in range(len(expressions)): | |
thread_c = threading.Thread(target=client_thread_py, | |
args=(url_client, expressions[i], i,)) | |
# thread_c.daemon = True | |
thread_c.start() | |
threads.append(thread_c) | |
# The queue/broker is here started in the same process as the clients | |
# ... but it can be launched (with workers) on background, | |
# ... waiting for requests coming from clients : | |
queue = Rpy2_Queue(url_client, url_worker, NBR_WORKERS) | |
# The example have to be shutdown using CTRL+C when all requests are done: | |
try: | |
IOLoop.instance().start() | |
except KeyboardInterrupt: | |
IOLoop.instance().stop() | |
print('Shuting down Rpy2 process...') | |
for process in rpy2_workers: # Shuting down worker process : | |
process.send_signal(signal.SIGTERM) | |
process.wait() | |
# Check the example has procuced exepcted results : | |
assert len(result_list) == len(expressions) | |
print("len(result_list) == len(expressions)") | |
smple = randint(0, len(expressions)) | |
print('Random result\nn#{}:\n\n{}' | |
.format(smple, str(result_list[smple]))) | |
#def client_thread_ro(client_url, expression, i): | |
# """ | |
# Basic request-reply ZMQ client sending R expression to evaluate | |
# by a Rpy2 worker and receiving serialized rpy2 object | |
# (less usefull in this testcase). | |
# """ context = zmq.Context.instance() | |
# socket = context.socket(zmq.REQ) | |
# | |
# socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) | |
# socket.connect(client_url) | |
# | |
# socket.send(expression.encode()) | |
# reply = socket.recv() | |
# | |
# try: | |
# reply = pickle.loads(reply) # rpy2 object have been sent serialized with pickle | |
# except Exception as err: | |
# print("Attempted to unpickle without RLock :\n", err) | |
# with rlock: | |
# reply = pickle.loads(reply) | |
# with rlock: | |
# result_list.append((i, reply)) | |
# return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
@author: mthh | |
Quick example on how to use rpy2 (high-level interface) with some downloaded CRAN packages | |
(here following the examples from https://cran.r-project.org/web/packages/flows/vignettes/flows.html | |
and https://cran.r-project.org/web/packages/SpatialPosition/vignettes/SpatialPosition.html) to see | |
how smooth the integration between R and python is with rpy2 (http://rpy2.bitbucket.org/) | |
""" | |
import rpy2.robjects as robjects | |
import pandas as pd | |
from rpy2.robjects.packages import importr | |
from pandas.rpy import common as com | |
# Import R packages (flows + other necessary ones) in python : | |
r_flows = importr("flows") | |
r_base = importr("base") | |
grdevices = importr('grDevices') | |
# Read some data (in python) : | |
nav_data = pd.read_csv('/home/mz/nav.csv') | |
# Convert it to a RPy2 object : | |
Rnav_data = com.convert_to_r_dataframe(nav_data) | |
# Rnav_data is now a R data.frame but it is not in the ".GlobalEnv" of R | |
# ... until we put it explicitly in there | |
# Follow the "flows" package vignette to test this out : | |
myflows = r_flows.prepflows(Rnav_data, i = 'i', j = 'j', fij = 'fij') # R : myflows <- prepflows(mat = nav, i = "i", j = "j", fij = "fij") | |
grdevices.png(file="/tmp/flows_lorenz.png", width=512, height=512) | |
mystat = r_flows.statmat(myflows, output = "lorenz", verbose = True) # R : statmat(mat = myflows, output = "none", verbose = TRUE) | |
grdevices.dev_off() # The graphcial output has been saved in "/tmp/flows_lorenz.png" | |
# Ok, so let's try the example "Commuters flows in the French Grand Est" | |
myflows = r_flows.prepflows(Rnav_data, i = 'i', j = 'j', fij = 'fij') | |
robjects.globalenv['myflows'] = myflows # Well... the "diag" function is boring to call in python, so doing it in R | |
robjects.reval('diag(myflows) <- 0') # Let's evualuate it with a R syntax | |
myflows = robjects.globalenv['myflows'] # Fetch the modified variable in python environemment | |
# It's also possible to manipulate the "shallow" copy as a numpy array : | |
#myflows_np = np.asarray(myflows) # np.asarray yield a shallow copy (instead of np.array) | |
#np.fill_diagonal(myflows_np, 0) # Fill the diag with 0 | |
# -> the 'myflows' R matrix has been modified | |
flowSel1 = r_flows.firstflowsg(mat = myflows, method = "xfirst", k = 500) | |
flowSel2 = r_flows.firstflowsg(mat = myflows, method = "xfirst", k = 1000) | |
# Python can't use the '*' operator between two R 'Matrix' | |
# ... and the R '*' don't seems to be a dot product, so let's evaluate that | |
# ... with a R syntax : | |
robjects.globalenv['myflows'] = myflows # Pass the objects to R .GlobalEnv | |
robjects.globalenv['flowSel1'] = flowSel1 # Pass the objects to R .GlobalEnv | |
robjects.reval('tmp_mat <- myflows * flowSel1') | |
tmp_mat = robjects.globalenv['tmp_mat'] | |
res_comp = r_flows.compmat(mat1 = myflows, mat2 = tmp_mat, digits = 1) | |
#In [28]: print(res_comp) | |
# mat1 mat2 absdiff reldiff | |
# nblinks 3191.0000 137.000 3054.0 95.70668 | |
# sumflows 313298.6642 193196.227 120102.4 38.33481 | |
# connectcompx 1.0000 10.000 9.0 NA | |
# (...) | |
### Lets do it other way now, copy paste the code from the vignette to evaluate | |
### directly (and fetch the result map in a png file) : | |
grdevices.png(file="/tmp/flows_map.png", width=512, height=512) | |
robjects.r(""" | |
library(sp) | |
flowSel1 <- firstflows(mat = myflows/rowSums(myflows)*100, method = "xfirst", | |
k = 20); | |
flowSel2 <- domflows(mat = myflows, w = colSums(myflows), k = 1); | |
flowSel <- myflows * flowSel1 * flowSel2; | |
inflows <- data.frame(id = colnames(myflows), w = colSums(myflows)); | |
opar <- par(mar = c(0,0,2,0));plot(GE, col = "#cceae7", border = NA) | |
plotMapDomFlows(mat = flowSel, spdf = UA, spdfid = "ID", w = inflows, wid = "id", | |
wvar = "w", wcex = 0.05, add = TRUE, | |
legend.flows.pos = "topright", | |
legend.flows.title = "Nb. of commuters") | |
title("Dominant Flows of Commuters") | |
mtext(text = "INSEE, 2011", side = 4, line = -1, adj = 0.01, cex = 0.8) | |
""") | |
grdevices.dev_off() ## Output the same map as in the vignette | |
# Runtime execution approx. 3s (including rpy2 initialization) | |
############################################################################## | |
# Example with SpatialPosition and SpatialDataFrames | |
# Slightly adapted from the vignette : | |
# https://cran.r-project.org/web/packages/SpatialPosition/vignettes/SpatialPosition.html | |
############################################################################## | |
import rpy2.rinterface as rinterface | |
r_SpatialPosition = importr('SpatialPosition') | |
#r_geojsonio = importr('geojsonio') | |
r_sp = importr('sp') | |
rgdal = importr('rgdal') | |
graphics = importr('graphics') | |
#known_pts = r_geojsonio.geojson_read('/home/mz/code/noname/tmp/geom/hamburg_pts.geojson', what='sp') | |
#mask_layer = r_geojsonio.geojson_read('/home/mz/code/noname/tmp/geom/hamburg_mask.geojson', what='sp') | |
known_pts = rgdal.readOGR('/home/mz/code/RSpatialPosition/test/chinese_restaurant_hmbrg_selec3035.shp', 'chinese_restaurant_hmbrg_selec3035') | |
mask_layer = rgdal.readOGR('/home/mz/code/RSpatialPosition/test/hamburg_polyg3035.shp', 'hamburg_polyg3035') | |
globalAccessibility = r_SpatialPosition.stewart( | |
knownpts = known_pts, varname = "value1", typefct = "exponential", | |
span = 1000, beta = 3, resolution = 200, longlat = False, mask = mask_layer) | |
rasterAccessibility = r_SpatialPosition.rasterStewart(x = globalAccessibility, | |
mask = mask_layer) | |
# Now prepare the graphical output : | |
grdevices.png(file="/tmp/stewart_output.png", width=512, height=512) | |
graphics.par(mar = rinterface.IntSexpVector((4,2,2,1))) | |
r_SpatialPosition.plotStewart(x = rasterAccessibility, add = False, nclass = 6) | |
breakValues = r_SpatialPosition.plotStewart(x = rasterAccessibility, | |
add = False, nclass = 6) | |
contLines = r_SpatialPosition.contourStewart(x = rasterAccessibility, | |
breaks = breakValues) | |
r_sp.plot(contLines, add = True) | |
r_sp.plot(mask_layer, add = True) | |
graphics.mtext("Global Accessibility to Chinese Restaurants in Hamburg", | |
side = 3,cex = 1.5) | |
graphics.mtext("Potential nb. of 'star'\n (= random value/weight given to each restaurant) " | |
"distance function:\nexponential, span = 1 km, beta = 3", | |
side = 1, line = 1) | |
grdevices.dev_off() | |
# A great potential map computed though python thanks to R and rpy2 is now in the /tmp folder ! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
@author: mz | |
""" | |
from rpy2.rinterface import RRuntimeError | |
import numpy as np | |
import pickle | |
import time | |
import sys | |
import zmq | |
class Rpy2_exec2py: | |
""" | |
Rpy2 worker returning pure python objects through a ZMQ socket. | |
""" | |
def __init__(self, ident, worker_url, env='.GlobalEnv'): | |
import rpy2.robjects as robjects | |
self.r = robjects.r | |
self.context = zmq.Context.instance() | |
self.socket = self.context.socket(zmq.REQ) | |
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident)) | |
self.socket.connect(worker_url) | |
self.socket.send(b"READY") | |
try: | |
self.rversion = self.r('R.version.string')[0] | |
print("Worker {} initialized on {}".format( | |
self.socket.identity.decode(), self.rversion)) | |
except Exception as err: | |
err( | |
"Something wrong happened while initializing the Rpy2 executor") | |
return -1 | |
def listen(self): | |
try: | |
while True: | |
address, empty, request = self.socket.recv_multipart() | |
# print("Worker {} received {}\n".format( | |
# self.socket.identity.decode(), request.decode()), end='') | |
try: | |
result = self.r(request.decode()) | |
# According to the rpy2 doc, | |
# using numpy.array (instead of np.asarray) | |
# copy the data in the new object : | |
result = np.array(result) | |
except RRuntimeError as err: | |
result = "Rpy2 intercepted error : " + str(err) | |
print(result) | |
# Send back the result serialized with pickle : | |
result = pickle.dumps(result) | |
self.socket.send_multipart([address, b'', result]) | |
except zmq.ContextTerminated: | |
return | |
except KeyboardInterrupt: | |
self.socket.close() | |
return | |
if __name__ == '__main__': | |
if len(sys.argv) == 3: | |
rps = Rpy2_exec2py(sys.argv[1], sys.argv[2]) | |
rps.listen() | |
elif len(sys.argv) == 2: | |
worker_url = 'ipc:///tmp/feeds/rpy2_workers' | |
rps = Rpy2_exec2py(sys.argv[1], worker_url) | |
rps.listen() | |
else: | |
print('Usage : {} worker_id ipc_path'.format(sys.argv[0])) | |
sys.exit() | |
#class Rpy2_exec2ro: | |
# """ | |
# Rpy2 worker returning R/rpy2 objects serialized by pickle | |
# through a ZMQ socket (less useful for the intented purpose of this test) | |
# """ | |
# def __init__(self, ident, worker_url, env='.GlobalEnv'): | |
# import rpy2.robjects as robjects | |
# self.r = robjects.r | |
# try: | |
# self.rversion = self.r('R.version.string')[0] | |
# print("Initialized\n{}".format(self.rversion)) | |
# except Exception as err: | |
# err( | |
# "Something wrong happened while initializing the Rpy2 executor") | |
# return -1 | |
# self.context = zmq.Context.instance() | |
# self.socket = self.context.socket(zmq.REQ) | |
# self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident)) | |
# self.socket.connect(worker_url) | |
# self.socket.send(b"READY") | |
# | |
# def listen(self): | |
# try: | |
# while True: | |
# address, empty, request = self.socket.recv_multipart() | |
# print("%s: %s\n" % (self.socket.identity.decode(), | |
# request.decode()), end='') | |
# try: | |
# result = self.r(request.decode()) | |
# except RRuntimeError as err: | |
# result = "Rpy2 intercepted error : " + str(err) | |
# print(result) | |
# result = pickle.dumps(result) | |
# self.socket.send_multipart([address, b'', result]) | |
# except zmq.ContextTerminated: | |
# return | |
# except KeyboardInterrupt: | |
# self.socket.close() | |
# return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment