Skip to content

Instantly share code, notes, and snippets.

@csmr
Created March 14, 2019 22:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csmr/ffc1c691c24cc9bf7b0f2e9bd729c471 to your computer and use it in GitHub Desktop.
Save csmr/ffc1c691c24cc9bf7b0f2e9bd729c471 to your computer and use it in GitHub Desktop.
Process spawning and named-pipe inter-process messaging between them (infinite loops)
# Module for handling process spawning, interproc messaging, and process logging
module PipedProcs
module NamedPipe
## IO methods for named pipe IPC fifo
## requires OS supports File.mkfifo.
# Open one end of named pipe
# @argument pipe_path - path to fifo
# @argument flag - r, w, w+ - open in read/write mode
# @argument wait_on_str - reader waits until writer puts this string in
# @returns named_pipe -handle | nil if cannot make pipe
def named_pipe( pipe_path, flag, wait_on_str=nil )
unless File.pipe? pipe_path
raise "non-pipe path #{pipe_path} exists - fail." if File.exists? pipe_path
status = File.mkfifo pipe_path
raise "cannot mkfifo #{pipe_path} - fail & exit." unless status == 0
end
pipe_handle = open(pipe_path, flag)
# TODO implement serialization
pipe_handle.define_singleton_method(:put!) do |arf=nil|
syswrite arf + "\n"
flush
end
# TODO implement deserialization
pipe_handle.define_singleton_method(:get!) do |arf=nil|
rwe = IO.select([self], [], [], 0.001)
msg = rwe[0][0].gets unless rwe==nil || rwe[0][0]==nil
msg.strip if msg!=nil
end
if false #|| wait_on_str
if flag.include? "w" #rite end
pipe_handle.put! wait_on_str
else # read-end
msg = pipe_handle.get!
# raise "no handshake #{msg} in #{pipe_path} - fail" unless msg.include? wait_on_str
end
end
pipe_handle
end
# Reads from a named pipe
# @agument timeout_secs_int - timeout for waiting on something to read
# @argument readlen_bytes - number of bytes to read
# @return nil if nothing to read
def read_bytes_nonblocking( pipe_handle_io, timeout_secs_int=0, readlen_bytes=2 )
msg = nil
rwe = IO.select([pipe_handle_io], [], [], timeout_secs_int)
msg = rwe[0][0].read(readlen_bytes) unless rwe == nil
msg.strip if msg != nil
end
# Reads from a named pipe
# @agument timeout_secs_int - timeout for waiting on something to read
# @return nil if nothing to read
def read_line_nonblocking( pipe_handle_io, timeout_secs_int=0 )
# todo all pipe_handles as arr into select
rwe = IO.select([pipe_handle_io], [], [], timeout_secs_int)
# todo flatten result arr unless nil
# todo select !nil?
# todo return arr of msg str
msg = rwe[0][0].gets unless rwe==nil || rwe[0][0]==nil
msg.strip if msg!=nil
end
def read_message pipe_handle
msg = read_line_nonblocking pipe_handle
msg != nil ? msg = msg.split(" ") : nil
end
def remove_named_pipe_files pipehash
pipehash.each_pair { |k, fpath|
File.delete fpath if File.exists? fpath
}
end
end # mod Utils
# A class to abstract using named pipes behind a message box mixin.
# note: mostly pseudocode
# Maybe implement something like:
# mybox = MessageBox.new my_proc_id
# mybox.pipe( 'ui', :r )
# mybox.pipe( 'ui', :w )
# mybox.read( 'ui' ) # => nil if none
# mybox.send( 'ui', 'ack') )
# # or with a callback to call once reply
# mybox.write('server', get_user(n), updateHighscore)
class MessageBox
include PipedProcs::NamedPipe
attr_reader :msg_arr, :nicks, :pipedir
def initialize nick, targets, path
@@msg_max_len = 64
@msgarr = []
@nick = nick.to_s
@pipedir = path || "."
@nicks = {
# r||w: { "nik" => io_obj, ... }
r: {},
w: {}
}
targets.each_pair{ |n, f|
p "msg box init: " + n + " - " + f
pipe(n, f)
}
end
def pipe target_nick, flag
_p = [@nick, "w", target_nick]
_f = flag[0]
_p.reverse! if _f == 'r'
_p = [@pipedir, '/', _p.join('_'), '.pipe'].join
_p = named_pipe( _p, flag )
@nicks[_f.to_sym][target_nick] = _p
p "pipe, @nicks " + @nicks.to_s
end
# returns last message [from nick]
def next_msg nick=nil
download
return nil if @msgarr.empty?
return @msgarr.shift if nick == nil
@msgarr.reject! {|n, m| nick == n }
end
def download
@nicks[:r].map { |n, p|
inmsg = read_message p
@msgarr.push [ n, inmsg ] unless inmsg == nil
}
garbage_clean
end
def send str, to_nick
ref = @nicks[:w][to_nick]
unless ref == nil
ref.put! str
end
end
def garbage_clean
while @msgarr.length > @@msg_max_len
# dump em!
@msgarr.shift
end
end
end # MessageBox
# Handles process spawning and IPC messaging mapping
class ProcRunner
include PipedProcs::NamedPipe
# arg is hash with pattern { me: [ other => flag ] }
def initialize map = {}
@@pipedir = "/tmp/procrunner"
@proc_map = map
@logfile = "#{@@pipedir}/logfile"
@pidfile = "#{@@pipedir}/pidfile"
Dir.mkdir @@pipedir unless File.exists? @@pipedir
system("echo '' > #{@logfile}") # empty it
system("echo '' > #{@pidfile}") # empty it
p map.to_s
end
def spawn( nick, targets = [] )
procblock = Proc.new if block_given?
spawn_process nick, &procblock
end
# @@method to daemonize processes
# @@arguments name, daemonize, detach [block]
def spawn_process( nick='spyops', daemonize=true, detach=true )
p "spawn " + nick
pid = fork do
# TODO handle cleanup trap('EXIT') { exit }
Process.daemon(true, true) if daemonize
Process.setproctitle("ruby_#{nick}")
# IO
File.open(@pidfile, 'a') { |f| f.puts(Process.pid) }
$stdout.reopen(@logfile, "a")
$stdout.sync
$stderr.reopen($stdout )
msgs = PipedProcs::MessageBox.new nick, @proc_map[nick], @@pipedir
p "msg_box " + msgs.to_s
if block_given?
Proc.new.call msgs
end
p "[exit fork]", nick
end
Process.detach(pid) if detach
end
end
end # mod PipedProcs
pipe_map = {
'ui' => {'engine' => 'w+', 'state' => 'r'},
'engine' => {'ui' => 'r', 'state' => 'w+'},
'state' => {'ui' => 'w+'}
}
app = PipedProcs::ProcRunner.new pipe_map
app.spawn 'ui' do |mb|
n = 0
while true
if n%1000000==0
p "ui; " + mb.next_msg.to_s
mb.send "helo" + n.to_s, 'engine'
end
n += 1
end
end
app.spawn 'engine' do |mb|
n = 0
while true
if n%1000000==0
p "ng got:" + mb.next_msg.to_s
mb.send "helo" + n.to_s, 'state'
end
n += 1
end
end
app.spawn 'state' do |mb|
n = 0
while true
if n%1000000==0
mb.send "helo" + n.to_s, 'ui'
end
n += 1
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment