Skip to content

Instantly share code, notes, and snippets.

@mthh
Last active February 25, 2016 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mthh/d996530456b215750970 to your computer and use it in GitHub Desktop.
Save mthh/d996530456b215750970 to your computer and use it in GitHub Desktop.
Some tests with rpy2 and ZMQ sockets...
# -*- coding: utf-8 -*-
"""
@author: mz
"""
import threading
import time
import zmq
import sys
import os
import pickle
import numpy as np
from random import randint
from collections import deque
from psutil import Popen, signal
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
if not os.path.isdir('/tmp/feeds'):
try:
os.mkdir('/tmp/feeds')
except Exception as err:
print(err)
sys.exit()
result_list = [] # Add the result to a list for the example
rlock = threading.RLock()
def client_thread_py(client_url, expression, i):
"""
Basic request-reply ZMQ client sending R expression to evaluate
by a Rpy2 worker and receiving serialized python object.
"""
context = zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.connect(client_url)
socket.send(expression.encode())
reply = socket.recv()
reply = pickle.loads(reply)
with rlock:
result_list.append((i, reply))
return
class Rpy2_Queue(object):
"""Worker Queue class using ZMQStream/IOLoop for event dispatching"""
def __init__(self, url_client, url_worker, total_workers):
self.total_workers = total_workers
self.available_workers = 0
self.workers = deque()
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind(url_client)
backend = context.socket(zmq.ROUTER)
backend.bind(url_worker)
self.backend = ZMQStream(backend)
self.frontend = ZMQStream(frontend)
self.backend.on_recv(self.handle_backend)
self.loop = IOLoop.instance()
def handle_backend(self, msg):
# Queue worker address for routing clients/workers:
worker_addr, empty, client_addr = msg[:3]
assert self.available_workers < self.total_workers
# Add worker back to the list of workers
self.available_workers += 1
self.workers.append(worker_addr)
assert empty == b""
# Third frame is READY or else a client reply address
# If client reply, send rest back to frontend
if client_addr != b"READY":
empty, reply = msg[3:]
assert empty == b""
self.frontend.send_multipart([client_addr, b'', reply])
if self.available_workers == 1:
# on first recv, start accepting frontend messages
self.frontend.on_recv(self.handle_frontend)
def handle_frontend(self, msg):
client_addr, empty, request = msg
assert empty == b""
self.available_workers -= 1
worker_id = self.workers.popleft()
self.backend.send_multipart([worker_id, b'', client_addr, b'', request])
if self.available_workers == 0:
# stop receiving until workers become available again
self.frontend.stop_on_recv()
def prepare_worker(nb_worker):
os.chdir('/home/mz/p2/rpy2')
# Start R workers :
r_process = [
Popen(['python3', 'rpy2_worker.py', '{}'.format(i)])
for i in range(nb_worker)]
time.sleep(2)
return r_process
if __name__ == "__main__":
url_worker = "ipc:///tmp/feeds/rpy2_workers"
url_client = "ipc:///tmp/feeds/rpy2_clients"
expressions = [
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400, min=-8, max=1379852), 400)",
"a <- c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))",
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)", "mat4 <- log10(matrix(runif(455*455), 455))",
"d<-log1p(seq(77, 150))", "mat4 <- log10(matrix(runif(200*200), 200))", "mat4 *12",
"mat12 <- matrix(runif(72*72), 72) / 6", 'R.Version(); a<-c(1,2,3,4); a*8',
"sequ <- seq(1,1000)", "ct <- Sys.time()", "matz <- data.frame(matrix(runif(741*741), 654) / 8.6)",
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(555*555), 555) / 3.5",
"maty <- matrix(runif(200*200), 200)", "maty <- matrix(runif(200*200), 200)",
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400, min=-8, max=1379852), 400)",
"a <- c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))",
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)", "mat4 <- log10(matrix(runif(455*455), 455))",
"d<-log1p(seq(77, 150))", "mat4 <- log10(matrix(runif(200*200), 200))",
"mat12 <- matrix(runif(72*72), 72) / 6", 'R.Version(); a<-c(1,2,3,4); a*8',
"sequ <- seq(1,1000)", "ct <- Sys.time()", "matz <- data.frame(matrix(runif(741*741), 654) / 8.6)",
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(555*555), 555) / 3.5",
"maty <- matrix(runif(200*200), 200)", "maty <- matrix(runif(200*200), 200)"
]
# The number of worker will be used to launch each worker and by
# ... the broker to do the balance between them :
NBR_WORKERS = 12
# Prepare the workers in separated process :
rpy2_workers = prepare_worker(NBR_WORKERS)
# Launch a bunch of clients (one per expression) :
threads = []
for i in range(len(expressions)):
thread_c = threading.Thread(target=client_thread_py,
args=(url_client, expressions[i], i,))
# thread_c.daemon = True
thread_c.start()
threads.append(thread_c)
# The queue/broker is here started in the same process as the clients
# ... but it can be launched (with workers) on background,
# ... waiting for requests coming from clients :
queue = Rpy2_Queue(url_client, url_worker, NBR_WORKERS)
# The example have to be shutdown using CTRL+C when all requests are done:
try:
IOLoop.instance().start()
except KeyboardInterrupt:
IOLoop.instance().stop()
print('Shuting down Rpy2 process...')
for process in rpy2_workers: # Shuting down worker process :
process.send_signal(signal.SIGTERM)
process.wait()
# Check the example has procuced exepcted results :
assert len(result_list) == len(expressions)
print("len(result_list) == len(expressions)")
smple = randint(0, len(expressions))
print('Random result\nn#{}:\n\n{}'
.format(smple, str(result_list[smple])))
#def client_thread_ro(client_url, expression, i):
# """
# Basic request-reply ZMQ client sending R expression to evaluate
# by a Rpy2 worker and receiving serialized rpy2 object
# (less usefull in this testcase).
# """ context = zmq.Context.instance()
# socket = context.socket(zmq.REQ)
#
# socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
# socket.connect(client_url)
#
# socket.send(expression.encode())
# reply = socket.recv()
#
# try:
# reply = pickle.loads(reply) # rpy2 object have been sent serialized with pickle
# except Exception as err:
# print("Attempted to unpickle without RLock :\n", err)
# with rlock:
# reply = pickle.loads(reply)
# with rlock:
# result_list.append((i, reply))
# return
# -*- coding: utf-8 -*-
"""
@author: mthh
Quick example on how to use rpy2 (high-level interface) with some downloaded CRAN packages
(here following the examples from https://cran.r-project.org/web/packages/flows/vignettes/flows.html
and https://cran.r-project.org/web/packages/SpatialPosition/vignettes/SpatialPosition.html) to see
how smooth the integration between R and python is with rpy2 (http://rpy2.bitbucket.org/)
"""
import rpy2.robjects as robjects
import pandas as pd
from rpy2.robjects.packages import importr
from pandas.rpy import common as com
# Import R packages (flows + other necessary ones) in python :
r_flows = importr("flows")
r_base = importr("base")
grdevices = importr('grDevices')
# Read some data (in python) :
nav_data = pd.read_csv('/home/mz/nav.csv')
# Convert it to a RPy2 object :
Rnav_data = com.convert_to_r_dataframe(nav_data)
# Rnav_data is now a R data.frame but it is not in the ".GlobalEnv" of R
# ... until we put it explicitly in there
# Follow the "flows" package vignette to test this out :
myflows = r_flows.prepflows(Rnav_data, i = 'i', j = 'j', fij = 'fij') # R : myflows <- prepflows(mat = nav, i = "i", j = "j", fij = "fij")
grdevices.png(file="/tmp/flows_lorenz.png", width=512, height=512)
mystat = r_flows.statmat(myflows, output = "lorenz", verbose = True) # R : statmat(mat = myflows, output = "none", verbose = TRUE)
grdevices.dev_off() # The graphcial output has been saved in "/tmp/flows_lorenz.png"
# Ok, so let's try the example "Commuters flows in the French Grand Est"
myflows = r_flows.prepflows(Rnav_data, i = 'i', j = 'j', fij = 'fij')
robjects.globalenv['myflows'] = myflows # Well... the "diag" function is boring to call in python, so doing it in R
robjects.reval('diag(myflows) <- 0') # Let's evualuate it with a R syntax
myflows = robjects.globalenv['myflows'] # Fetch the modified variable in python environemment
# It's also possible to manipulate the "shallow" copy as a numpy array :
#myflows_np = np.asarray(myflows) # np.asarray yield a shallow copy (instead of np.array)
#np.fill_diagonal(myflows_np, 0) # Fill the diag with 0
# -> the 'myflows' R matrix has been modified
flowSel1 = r_flows.firstflowsg(mat = myflows, method = "xfirst", k = 500)
flowSel2 = r_flows.firstflowsg(mat = myflows, method = "xfirst", k = 1000)
# Python can't use the '*' operator between two R 'Matrix'
# ... and the R '*' don't seems to be a dot product, so let's evaluate that
# ... with a R syntax :
robjects.globalenv['myflows'] = myflows # Pass the objects to R .GlobalEnv
robjects.globalenv['flowSel1'] = flowSel1 # Pass the objects to R .GlobalEnv
robjects.reval('tmp_mat <- myflows * flowSel1')
tmp_mat = robjects.globalenv['tmp_mat']
res_comp = r_flows.compmat(mat1 = myflows, mat2 = tmp_mat, digits = 1)
#In [28]: print(res_comp)
# mat1 mat2 absdiff reldiff
# nblinks 3191.0000 137.000 3054.0 95.70668
# sumflows 313298.6642 193196.227 120102.4 38.33481
# connectcompx 1.0000 10.000 9.0 NA
# (...)
### Lets do it other way now, copy paste the code from the vignette to evaluate
### directly (and fetch the result map in a png file) :
grdevices.png(file="/tmp/flows_map.png", width=512, height=512)
robjects.r("""
library(sp)
flowSel1 <- firstflows(mat = myflows/rowSums(myflows)*100, method = "xfirst",
k = 20);
flowSel2 <- domflows(mat = myflows, w = colSums(myflows), k = 1);
flowSel <- myflows * flowSel1 * flowSel2;
inflows <- data.frame(id = colnames(myflows), w = colSums(myflows));
opar <- par(mar = c(0,0,2,0));plot(GE, col = "#cceae7", border = NA)
plotMapDomFlows(mat = flowSel, spdf = UA, spdfid = "ID", w = inflows, wid = "id",
wvar = "w", wcex = 0.05, add = TRUE,
legend.flows.pos = "topright",
legend.flows.title = "Nb. of commuters")
title("Dominant Flows of Commuters")
mtext(text = "INSEE, 2011", side = 4, line = -1, adj = 0.01, cex = 0.8)
""")
grdevices.dev_off() ## Output the same map as in the vignette
# Runtime execution approx. 3s (including rpy2 initialization)
##############################################################################
# Example with SpatialPosition and SpatialDataFrames
# Slightly adapted from the vignette :
# https://cran.r-project.org/web/packages/SpatialPosition/vignettes/SpatialPosition.html
##############################################################################
import rpy2.rinterface as rinterface
r_SpatialPosition = importr('SpatialPosition')
#r_geojsonio = importr('geojsonio')
r_sp = importr('sp')
rgdal = importr('rgdal')
graphics = importr('graphics')
#known_pts = r_geojsonio.geojson_read('/home/mz/code/noname/tmp/geom/hamburg_pts.geojson', what='sp')
#mask_layer = r_geojsonio.geojson_read('/home/mz/code/noname/tmp/geom/hamburg_mask.geojson', what='sp')
known_pts = rgdal.readOGR('/home/mz/code/RSpatialPosition/test/chinese_restaurant_hmbrg_selec3035.shp', 'chinese_restaurant_hmbrg_selec3035')
mask_layer = rgdal.readOGR('/home/mz/code/RSpatialPosition/test/hamburg_polyg3035.shp', 'hamburg_polyg3035')
globalAccessibility = r_SpatialPosition.stewart(
knownpts = known_pts, varname = "value1", typefct = "exponential",
span = 1000, beta = 3, resolution = 200, longlat = False, mask = mask_layer)
rasterAccessibility = r_SpatialPosition.rasterStewart(x = globalAccessibility,
mask = mask_layer)
# Now prepare the graphical output :
grdevices.png(file="/tmp/stewart_output.png", width=512, height=512)
graphics.par(mar = rinterface.IntSexpVector((4,2,2,1)))
r_SpatialPosition.plotStewart(x = rasterAccessibility, add = False, nclass = 6)
breakValues = r_SpatialPosition.plotStewart(x = rasterAccessibility,
add = False, nclass = 6)
contLines = r_SpatialPosition.contourStewart(x = rasterAccessibility,
breaks = breakValues)
r_sp.plot(contLines, add = True)
r_sp.plot(mask_layer, add = True)
graphics.mtext("Global Accessibility to Chinese Restaurants in Hamburg",
side = 3,cex = 1.5)
graphics.mtext("Potential nb. of 'star'\n (= random value/weight given to each restaurant) "
"distance function:\nexponential, span = 1 km, beta = 3",
side = 1, line = 1)
grdevices.dev_off()
# A great potential map computed though python thanks to R and rpy2 is now in the /tmp folder !
# -*- coding: utf-8 -*-
"""
@author: mz
"""
from rpy2.rinterface import RRuntimeError
import numpy as np
import pickle
import time
import sys
import zmq
class Rpy2_exec2py:
"""
Rpy2 worker returning pure python objects through a ZMQ socket.
"""
def __init__(self, ident, worker_url, env='.GlobalEnv'):
import rpy2.robjects as robjects
self.r = robjects.r
self.context = zmq.Context.instance()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident))
self.socket.connect(worker_url)
self.socket.send(b"READY")
try:
self.rversion = self.r('R.version.string')[0]
print("Worker {} initialized on {}".format(
self.socket.identity.decode(), self.rversion))
except Exception as err:
err(
"Something wrong happened while initializing the Rpy2 executor")
return -1
def listen(self):
try:
while True:
address, empty, request = self.socket.recv_multipart()
# print("Worker {} received {}\n".format(
# self.socket.identity.decode(), request.decode()), end='')
try:
result = self.r(request.decode())
# According to the rpy2 doc,
# using numpy.array (instead of np.asarray)
# copy the data in the new object :
result = np.array(result)
except RRuntimeError as err:
result = "Rpy2 intercepted error : " + str(err)
print(result)
# Send back the result serialized with pickle :
result = pickle.dumps(result)
self.socket.send_multipart([address, b'', result])
except zmq.ContextTerminated:
return
except KeyboardInterrupt:
self.socket.close()
return
if __name__ == '__main__':
if len(sys.argv) == 3:
rps = Rpy2_exec2py(sys.argv[1], sys.argv[2])
rps.listen()
elif len(sys.argv) == 2:
worker_url = 'ipc:///tmp/feeds/rpy2_workers'
rps = Rpy2_exec2py(sys.argv[1], worker_url)
rps.listen()
else:
print('Usage : {} worker_id ipc_path'.format(sys.argv[0]))
sys.exit()
#class Rpy2_exec2ro:
# """
# Rpy2 worker returning R/rpy2 objects serialized by pickle
# through a ZMQ socket (less useful for the intented purpose of this test)
# """
# def __init__(self, ident, worker_url, env='.GlobalEnv'):
# import rpy2.robjects as robjects
# self.r = robjects.r
# try:
# self.rversion = self.r('R.version.string')[0]
# print("Initialized\n{}".format(self.rversion))
# except Exception as err:
# err(
# "Something wrong happened while initializing the Rpy2 executor")
# return -1
# self.context = zmq.Context.instance()
# self.socket = self.context.socket(zmq.REQ)
# self.socket.setsockopt_string(zmq.IDENTITY, '{}'.format(ident))
# self.socket.connect(worker_url)
# self.socket.send(b"READY")
#
# def listen(self):
# try:
# while True:
# address, empty, request = self.socket.recv_multipart()
# print("%s: %s\n" % (self.socket.identity.decode(),
# request.decode()), end='')
# try:
# result = self.r(request.decode())
# except RRuntimeError as err:
# result = "Rpy2 intercepted error : " + str(err)
# print(result)
# result = pickle.dumps(result)
# self.socket.send_multipart([address, b'', result])
# except zmq.ContextTerminated:
# return
# except KeyboardInterrupt:
# self.socket.close()
# return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment