Skip to content

Instantly share code, notes, and snippets.

@mthh
Last active February 18, 2016 19:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mthh/6cf8c0b3ffcc959786bb to your computer and use it in GitHub Desktop.
Save mthh/6cf8c0b3ffcc959786bb to your computer and use it in GitHub Desktop.
noname_1
require(pbdZMQ)
require(methods)
require(stats)
require(base)
R_Worker <- function(identifiant, worker_url = 'ipc:///tmp/feeds/workers'){
ctx = zmq.ctx.new()
socket = zmq.socket(ctx, .pbd_env$ZMQ.ST$REQ)
zmq.setsockopt(socket, .pbd_env$ZMQ.SO$IDENTITY, identifiant)
zmq.connect(socket, worker_url)
zmq.send(socket, charToRaw("READY"))
# tryCatch(
while(TRUE){
ret <- zmq.recv.multipart(socket, FALSE)
address <- ret[[1]]
request <- rawToChar(ret[[3]])
result <- rEval(request)
print(paste('Worker', identifiant, ' received :', request, '\n',
'Result : ', result, sep = ' '))
msg = list(
address,
charToRaw(''),
charToRaw(result)
)
zmq.send.multipart(socket, msg, FALSE)
if(result == "Now exiting R\n")break
}
zmq.close(socket)
zmq.ctx.destroy(ctx)
# )
}
rEval <- function(message){
validateOuput <- function(output)encodeString(toString(format(output)))
if(grepl('system', message) | grepl('exitR', message)){
out <- "Now exiting R\n"
}
else{
output <- tryCatch(
eval(parse(text=message), envir = .GlobalEnv),
error = function(e)paste0(e$message, "\n")
)
# print(output)
out <- validateOuput(output)
}
return(out)
}
R_Worker(commandArgs(TRUE)[1])
require(pbdZMQ)
require(methods)
require(base)
require(R6)
R_Worker <- R6Class(
classname = 'R_Worker',
public = list(
initialize = function(identifiant, worker_url){
if (!missing(identifiant)) self$identifiant = identifiant
if (!missing(worker_url)) self$worker_url = worker_url
},
histo_count=0,
bind_socket = function(){
ctx <<- zmq.ctx.new()
socket <<- zmq.socket(ctx, .pbd_env$ZMQ.ST$REQ)
zmq.setsockopt(socket, .pbd_env$ZMQ.SO$IDENTITY, self$identifiant)
zmq.connect(socket, self$worker_url)
zmq.send(socket, charToRaw("READY"))
print(paste0("Worker ", self$identifiant, " is ON"))
},
listen = function(){
while(TRUE){
ret <- zmq.recv.multipart(socket, FALSE)
address <- ret[[1]]
request <- rawToChar(ret[[3]])
result <- self$rEval(request)
# print(paste('Worker', identifiant, ' received :', request, '\n',
# 'Result : ', result, sep = ' '))
msg = list(
address,
charToRaw(''),
charToRaw(result)
)
zmq.send.multipart(socket, msg, FALSE)
self$histo_count <<- self$histo_count + 1
if(result == "Now exiting R\n")break
}
},
close = function(){
zmq.close(socket)
zmq.ctx.destroy(ctx)
exit_msg <- paste0("Worker ", self$identifiant, " closed after ", self$histo_count, " requests")
print(exit_msg)
},
rEval = function(message){
validateOuput <- function(output)encodeString(toString(format(output)))
if(grepl('system', message) | grepl('exitR', message)){
out <- "Now exiting R\n"
}
else{
output <- tryCatch(
eval(parse(text=message), envir = .GlobalEnv),
error = function(e)paste0(e$message, "\n")
)
out <- validateOuput(output)
}
return(out)
},
identifiant = NA,
worker_url = NA),
private = list(socket = NA, ctx = NA)
)
main <- function(identifiant, worker_url){
worker <- R_Worker$new(identifiant = identifiant, worker_url = worker_url)
worker$bind_socket()
none <- tryCatch(
worker$listen(),
error = function(j) j,
interupt = function(i) i,
finally = worker$close()
)
q('no')
# Try-catch is used to close nicely (dereferencing socket and context)
# the application when receiving SIGINT, but this could probably
# lead to race conditions (if others signals are send while handling this one)
}
main(commandArgs(TRUE)[1], 'ipc:///tmp/feeds/workers')
# R_Worker <- setRefClass(
# 'R_Worker',
#
# fields = list(
# identifiant = 'character',
# worker_url = 'character',
# histo_count = 'numeric',
# socket = 'externalptr',
# ctx = 'externalptr'),
#
# methods = list(
# bind_socket = function(){
# histo_count <<- 0
# ctx <<- zmq.ctx.new()
# socket <<- zmq.socket(ctx, .pbd_env$ZMQ.ST$REQ)
# zmq.setsockopt(socket, .pbd_env$ZMQ.SO$IDENTITY, identifiant)
# zmq.connect(socket, worker_url)
# zmq.send(socket, charToRaw("READY"))
# print(paste0("Worker ", identifiant, " is ON"))
# },
#
# listen = function(){
# while(TRUE){
# ret <- zmq.recv.multipart(socket, FALSE)
# address <- ret[[1]]
# request <- rawToChar(ret[[3]])
# result <- rEval(request)
# # print(paste('Worker', identifiant, ' received :', request, '\n',
# # 'Result : ', result, sep = ' '))
# msg = list(
# address,
# charToRaw(''),
# charToRaw(result)
# )
# zmq.send.multipart(socket, msg, FALSE)
# histo_count <<- histo_count + 1
# if(result == "Now exiting R\n")break
# }
# },
#
# rEval = function(message){
# validateOuput <- function(output)encodeString(toString(format(output)))
# if(grepl('system', message) | grepl('exitR', message)){
# out <- "Now exiting R\n"
# }
# else{
# output <- tryCatch(
# eval(parse(text=message), envir = .GlobalEnv),
# error = function(e)paste0(e$message, "\n")
# )
# # print(output)
# out <- validateOuput(output)
# }
# return(out)
# },
#
# close = function(){
# zmq.close(socket)
# zmq.ctx.destroy(ctx)
# exit_msg <- paste0("Worker ", identifiant, " closed after ", histo_count, " requests")
# print(exit_msg)
# }
# ))
# -*- coding: utf-8 -*-
"""
Just some basics tests to make a ZMQ broker trying to do load balancing
between "clients" (frontend) coming from a python environnement requesting
computations to be done in a R environnement (workers / backend-side).
@author: mz
"""
from psutil import Popen, signal
from collections import deque
import threading
import time
import zmq
import sys
import os
url_worker = 'ipc:///tmp/feeds/workers'
url_client = 'ipc:///tmp/feeds/clients'
result_list = [] # Results will be added as soon as a R worker will have send back the reply
rlock = threading.RLock() # In order to safely add the results
if not os.path.isdir('/tmp/feeds'):
try:
os.mkdir('/tmp/feeds')
except Exception as err:
print(err)
sys.exit()
def R_client_thread(client_url, expression, context, i):
"""Basic client sending a request (REQ) to a ROUTER (the broker)"""
socket = context.socket(zmq.REQ)
socket.connect(client_url)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.send(expression.encode())
reply = socket.recv()
with rlock:
result_list.append((i, reply))
socket.close()
def prepare_worker(nb_worker):
os.chdir('/home/mz/code/noname')
# Start the R worker :
r_process = [
Popen(['Rscript', '--vanilla', 'R/R_worker_class.R', '{}'.format(i)])
for i in range(nb_worker)]
time.sleep(1)
return r_process
def launch_broker(context, r_process, nb_clients):
NBR_WORKERS = len(r_process)
frontend = context.socket(zmq.ROUTER)
frontend.bind(url_client)
backend = context.socket(zmq.ROUTER)
backend.bind(url_worker)
# Queue of available workers (using collections.deque for fast append/pop)
available_workers = 0
workers_list = deque()
# init poller
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
while True:
socks = dict(poller.poll())
# Handle worker activity on backend
if (backend in socks and socks[backend] == zmq.POLLIN):
# Queue worker address for LRU routing
message = backend.recv_multipart()
assert available_workers <= NBR_WORKERS
worker_addr = message[0]
# add worker back to the list of workers :
available_workers += 1
workers_list.append(worker_addr)
# Should always be empty :
empty = message[1]
assert empty == b""
# Third frame is 'R' (from a Ready R worker) or else a client reply address
client_addr = message[2]
# If it's a reply to a client:
if client_addr != b'R':
empty = message[3]
assert empty == b""
reply = message[4] # Send it back to the client :
frontend.send_multipart([client_addr, b"", reply])
nb_clients -= 1
if nb_clients < 1: break
# poll on frontend only if workers are available
if available_workers > 0:
if (frontend in socks and socks[frontend] == zmq.POLLIN):
client_addr, empty, request = frontend.recv_multipart()
assert empty == b""
# Dequeue and drop the next worker address
available_workers += -1
worker_id = workers_list.pop()
backend.send_multipart([worker_id, b"",
client_addr, b"", request])
# As there is neither support for CTRL+C nor a real eventloop in this test:
if nb_clients < 1: break
time.sleep(0.2)
frontend.close()
backend.close()
context.term()
def test(nb_worker = 4):
result_list.clear()
context = zmq.Context()
# Set a bunch a expression to evaluate (less or more coslty) :
expressions = [
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400), 400)",
"a<-c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))",
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)",
"d<-log1p(seq(77, 150))", "mat4 <- log10(matrix(runif(200*200), 200))",
"log10(diag(matrix(123*123), 123))", "sequ <- seq(1,1000)",
"abs(data.frame(log10(diag(matrix(runif(500*425), 500))) * cospi((1:500)**-0.3))) #! 0.01",
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(555*555), 555) / 3.5",
"maty <- matrix(runif(200*200), 200)", "maty <- matrix(runif(200*200), 200)",
'R.Version()', "b<-log1p(seq(1, 200))", "mat <- matrix(runif(400*400), 400)",
"a<-c(1,2,3,4)", "mat12 <- matrix(runif(72*72), 72) / 6", "d<-log1p(seq(7, 250))",
"mat12 <- matrix(runif(90*90), 90) * matrix(runif(90*90), 90)",
"d<-log1p(seq(77, 150))", "mat4 <- matrix(runif(200*200), 200)",
"w <- data.frame(1:200 %*% matrix(runif(200*200), 200))",
"matx <- matrix(runif(200*200), 200)", "matz <- matrix(runif(200*200), 200)",
"maty <- matrix(runif(200*200), 200)"]
# Launch bakcground R workers :
r_process = prepare_worker(nb_worker)
# Launch the broker (already knowing the number of clients to come
# for a more convenient exit) :
broker = threading.Thread(target=launch_broker, args=(context, r_process, len(expressions)))
broker.start()
# Launch clients, one per expression to evaluate :
threads = [
threading.Thread(target=R_client_thread, args=(url_client, expressions[i], context, i))
for i in range(len(expressions))
]
[t.start() for t in threads] # They should all try to connect to the broker and be routed to an available worker ...
[t.join() for t in threads] # ...then each client add its result to a list and quit
# The broker should have return :
broker.join()
# Close the bakcground R workers :
for process in r_process:
process.send_signal(signal.SIGINT)
# process.terminate()
process.wait()
# Each expression to evaluate should have yielded to a result :
assert len(result_list) == len(expressions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment