Created
March 14, 2019 22:07
-
-
Save csmr/ffc1c691c24cc9bf7b0f2e9bd729c471 to your computer and use it in GitHub Desktop.
Process spawning and named-pipe inter-process messaging between them (infinite loops)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Module for handling process spawning, interproc messaging, and process logging | |
module PipedProcs | |
module NamedPipe | |
## IO methods for named pipe IPC fifo | |
## requires OS supports File.mkfifo. | |
# Open one end of named pipe | |
# @argument pipe_path - path to fifo | |
# @argument flag - r, w, w+ - open in read/write mode | |
# @argument wait_on_str - reader waits until writer puts this string in | |
# @returns named_pipe -handle | nil if cannot make pipe | |
def named_pipe( pipe_path, flag, wait_on_str=nil ) | |
unless File.pipe? pipe_path | |
raise "non-pipe path #{pipe_path} exists - fail." if File.exists? pipe_path | |
status = File.mkfifo pipe_path | |
raise "cannot mkfifo #{pipe_path} - fail & exit." unless status == 0 | |
end | |
pipe_handle = open(pipe_path, flag) | |
# TODO implement serialization | |
pipe_handle.define_singleton_method(:put!) do |arf=nil| | |
syswrite arf + "\n" | |
flush | |
end | |
# TODO implement deserialization | |
pipe_handle.define_singleton_method(:get!) do |arf=nil| | |
rwe = IO.select([self], [], [], 0.001) | |
msg = rwe[0][0].gets unless rwe==nil || rwe[0][0]==nil | |
msg.strip if msg!=nil | |
end | |
if false #|| wait_on_str | |
if flag.include? "w" #rite end | |
pipe_handle.put! wait_on_str | |
else # read-end | |
msg = pipe_handle.get! | |
# raise "no handshake #{msg} in #{pipe_path} - fail" unless msg.include? wait_on_str | |
end | |
end | |
pipe_handle | |
end | |
# Reads from a named pipe | |
# @agument timeout_secs_int - timeout for waiting on something to read | |
# @argument readlen_bytes - number of bytes to read | |
# @return nil if nothing to read | |
def read_bytes_nonblocking( pipe_handle_io, timeout_secs_int=0, readlen_bytes=2 ) | |
msg = nil | |
rwe = IO.select([pipe_handle_io], [], [], timeout_secs_int) | |
msg = rwe[0][0].read(readlen_bytes) unless rwe == nil | |
msg.strip if msg != nil | |
end | |
# Reads from a named pipe | |
# @agument timeout_secs_int - timeout for waiting on something to read | |
# @return nil if nothing to read | |
def read_line_nonblocking( pipe_handle_io, timeout_secs_int=0 ) | |
# todo all pipe_handles as arr into select | |
rwe = IO.select([pipe_handle_io], [], [], timeout_secs_int) | |
# todo flatten result arr unless nil | |
# todo select !nil? | |
# todo return arr of msg str | |
msg = rwe[0][0].gets unless rwe==nil || rwe[0][0]==nil | |
msg.strip if msg!=nil | |
end | |
def read_message pipe_handle | |
msg = read_line_nonblocking pipe_handle | |
msg != nil ? msg = msg.split(" ") : nil | |
end | |
def remove_named_pipe_files pipehash | |
pipehash.each_pair { |k, fpath| | |
File.delete fpath if File.exists? fpath | |
} | |
end | |
end # mod Utils | |
# A class to abstract using named pipes behind a message box mixin. | |
# note: mostly pseudocode | |
# Maybe implement something like: | |
# mybox = MessageBox.new my_proc_id | |
# mybox.pipe( 'ui', :r ) | |
# mybox.pipe( 'ui', :w ) | |
# mybox.read( 'ui' ) # => nil if none | |
# mybox.send( 'ui', 'ack') ) | |
# # or with a callback to call once reply | |
# mybox.write('server', get_user(n), updateHighscore) | |
class MessageBox | |
include PipedProcs::NamedPipe | |
attr_reader :msg_arr, :nicks, :pipedir | |
def initialize nick, targets, path | |
@@msg_max_len = 64 | |
@msgarr = [] | |
@nick = nick.to_s | |
@pipedir = path || "." | |
@nicks = { | |
# r||w: { "nik" => io_obj, ... } | |
r: {}, | |
w: {} | |
} | |
targets.each_pair{ |n, f| | |
p "msg box init: " + n + " - " + f | |
pipe(n, f) | |
} | |
end | |
def pipe target_nick, flag | |
_p = [@nick, "w", target_nick] | |
_f = flag[0] | |
_p.reverse! if _f == 'r' | |
_p = [@pipedir, '/', _p.join('_'), '.pipe'].join | |
_p = named_pipe( _p, flag ) | |
@nicks[_f.to_sym][target_nick] = _p | |
p "pipe, @nicks " + @nicks.to_s | |
end | |
# returns last message [from nick] | |
def next_msg nick=nil | |
download | |
return nil if @msgarr.empty? | |
return @msgarr.shift if nick == nil | |
@msgarr.reject! {|n, m| nick == n } | |
end | |
def download | |
@nicks[:r].map { |n, p| | |
inmsg = read_message p | |
@msgarr.push [ n, inmsg ] unless inmsg == nil | |
} | |
garbage_clean | |
end | |
def send str, to_nick | |
ref = @nicks[:w][to_nick] | |
unless ref == nil | |
ref.put! str | |
end | |
end | |
def garbage_clean | |
while @msgarr.length > @@msg_max_len | |
# dump em! | |
@msgarr.shift | |
end | |
end | |
end # MessageBox | |
# Handles process spawning and IPC messaging mapping | |
class ProcRunner | |
include PipedProcs::NamedPipe | |
# arg is hash with pattern { me: [ other => flag ] } | |
def initialize map = {} | |
@@pipedir = "/tmp/procrunner" | |
@proc_map = map | |
@logfile = "#{@@pipedir}/logfile" | |
@pidfile = "#{@@pipedir}/pidfile" | |
Dir.mkdir @@pipedir unless File.exists? @@pipedir | |
system("echo '' > #{@logfile}") # empty it | |
system("echo '' > #{@pidfile}") # empty it | |
p map.to_s | |
end | |
def spawn( nick, targets = [] ) | |
procblock = Proc.new if block_given? | |
spawn_process nick, &procblock | |
end | |
# @@method to daemonize processes | |
# @@arguments name, daemonize, detach [block] | |
def spawn_process( nick='spyops', daemonize=true, detach=true ) | |
p "spawn " + nick | |
pid = fork do | |
# TODO handle cleanup trap('EXIT') { exit } | |
Process.daemon(true, true) if daemonize | |
Process.setproctitle("ruby_#{nick}") | |
# IO | |
File.open(@pidfile, 'a') { |f| f.puts(Process.pid) } | |
$stdout.reopen(@logfile, "a") | |
$stdout.sync | |
$stderr.reopen($stdout ) | |
msgs = PipedProcs::MessageBox.new nick, @proc_map[nick], @@pipedir | |
p "msg_box " + msgs.to_s | |
if block_given? | |
Proc.new.call msgs | |
end | |
p "[exit fork]", nick | |
end | |
Process.detach(pid) if detach | |
end | |
end | |
end # mod PipedProcs | |
pipe_map = { | |
'ui' => {'engine' => 'w+', 'state' => 'r'}, | |
'engine' => {'ui' => 'r', 'state' => 'w+'}, | |
'state' => {'ui' => 'w+'} | |
} | |
app = PipedProcs::ProcRunner.new pipe_map | |
app.spawn 'ui' do |mb| | |
n = 0 | |
while true | |
if n%1000000==0 | |
p "ui; " + mb.next_msg.to_s | |
mb.send "helo" + n.to_s, 'engine' | |
end | |
n += 1 | |
end | |
end | |
app.spawn 'engine' do |mb| | |
n = 0 | |
while true | |
if n%1000000==0 | |
p "ng got:" + mb.next_msg.to_s | |
mb.send "helo" + n.to_s, 'state' | |
end | |
n += 1 | |
end | |
end | |
app.spawn 'state' do |mb| | |
n = 0 | |
while true | |
if n%1000000==0 | |
mb.send "helo" + n.to_s, 'ui' | |
end | |
n += 1 | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment