-
command interface
most commands should be web services; all commands should use the same interface. like a Rack application, but for commands!
for instance, i could define a WebServiceCommand class that would just take the url of an oblique-ish service and execute on it; or a more typical command class which does its own computation in a block; etc.
-
"database" side
will marshaled files do? as long as only one process is responsible for each file, and they don't grow too large, yes.
-
periodic / scheduler
already have this, kinda: @042171:73-80. runs every two seconds -- when the select timeout runs out. but putting things like .in there would be very ughy, and limit their time resolution to 2secs. or, have the timeout constantly running out and manually polling for periodics, but that's what select is designed to avoid.
put scheduler in its own process, with queues going either way. a pipe to insert things into the scheduler queue, and a pipe to let the main process know when a task is ready, which is waited on in the main select loop. loop of the scheduler looks like this:
- select on the pipe with a timeout of the time until the next task will
be ready. when it returns:
- if it's a new task from the pipe, add it to the task list;
- if the timeout went, look for any tasks that need doing and pass them back to the main process before deleting them from the list.
- repeat ad infinitum
- select on the pipe with a timeout of the time until the next task will
be ready. when it returns:
-
catch worker crashes
listen for SIGCHLD, use the self-pipe trick to handle them in the main loop. restart as appropriate.
-
refactor, kinda.
move things that should be in their own methods into their own method.
-
name
something suitably pseudo-arthurian
Created
November 14, 2012 21:20
-
-
Save dpk/4074919 to your computer and use it in GitHub Desktop.
Fauxlot, a sketch for a duxlot/phenny-like bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
class Fauxlot | |
@@linere = /(?::([^ ]+) +)?((?:.(?! :))+.)(?: +:?(.*))?/ | |
def initialize server, port=6667, options = {} | |
@server = server | |
@port = port | |
@nick = (options[:nick] or 'fauxlot') | |
@nprocesses = (options[:processes] or 4) | |
@prefix = (options[:prefix] or '\'') | |
@channels = (options[:channels] or ['#swhack']) | |
@log = (options[:log] or STDERR) | |
@timeout = (options[:timeout] or 30) | |
end | |
def start | |
@socket = TCPSocket.new @server, @port | |
send 'NICK', [], @nick | |
send 'USER', ['fauxlot', '0', '*'], 'Faux Lot' | |
trap('INT') { stop } | |
start_processes | |
@channels.each {|channel| join channel } | |
handle_loop | |
end | |
def stop | |
stop_processes | |
send 'QUIT' | |
@socket = nil | |
end | |
def join channel | |
send 'JOIN', [], channel | |
end | |
def handle_loop | |
@commands = {} | |
@command_ctr = 0 | |
loop do | |
rsocks = @processes.map(&:outqueue) | |
rsocks << @socket | |
selected = Queue.select(rsocks, [], [], 2) | |
if selected | |
rs, ws, es = selected | |
rs.each do |r| | |
if r == @socket | |
line = recv | |
if command = parse_command(line) | |
process = @processes[@command_ctr % @processes.length] | |
process.inqueue.send command | |
command.handler_process = process | |
end | |
elsif r.is_a? Queue | |
response = r.receive | |
if response.is_a? CommandStatus | |
command = @commands[response.command_id] | |
if response.status == :started | |
command.started = Time.now | |
else response.status == :done | |
command.done! | |
end | |
elsif response.is_a? Response | |
command = @commands[response.command_id] | |
send 'PRIVMSG', [command.channel], response.message | |
end | |
end | |
end | |
else | |
@commands.each do |id, command| | |
if command.started? and not command.done? and (Time.now - command.started) >= @timeout | |
deal_with_runaway command | |
elsif command.done? # to prevent this process taking longer as the bot gets older | |
@log.puts "GC: reaped #{id}" | |
@commands.delete id | |
end | |
end | |
end | |
end | |
end | |
def parse_command line | |
m = @@linere.match line.chomp | |
user = m[1] | |
event, *eargs = m[2].split ' ' | |
text = m[3] | |
nick = user.split('!').first rescue nil | |
if event == 'PRIVMSG' and text[0...(@prefix.length)] == @prefix | |
command, args = text.split(' ', 2) | |
command = command[(@prefix.length)..-1] | |
command = Command.new(@command_ctr += 1, command, args, nick, eargs.first) | |
@commands[command.id] = command | |
command | |
else | |
if event == 'PING' | |
send 'PONG', [@nick] | |
end | |
nil | |
end | |
end | |
private | |
def send command, params=[], text=nil | |
tosend = "#{command.upcase} #{params.join ' '}#{" :#{text}" if text}\r\n" | |
@socket.write tosend | |
@log.puts 'SENT:' + tosend.inspect | |
end | |
def recv | |
line = @socket.gets | |
@log.puts 'RECV:' + line.inspect | |
line | |
end | |
def start_processes | |
@log.puts "Starting processes ..." | |
@processes = [] | |
@nprocesses.times do | |
start_process | |
end | |
end | |
def stop_processes | |
@log.puts "Stopping processes ..." | |
@processes.each(&:stop) | |
while (pid, stat = Process.wait2(-1) rescue nil) | |
@log.puts "#{pid}: #{status.exitstatus}" | |
end | |
end | |
def deal_with_runaway command | |
@log.puts "**RUNAWAY COMMAND!** #{command.command} for #{command.user} on #{command.channel} in process #{command.handler_process.pid}" | |
command.done! | |
movecomms = [] | |
@commands.each do |id, oc| | |
if not oc.done? and oc.handler_process == command.handler_process | |
movecomms << oc | |
end | |
end | |
@log.puts "#{movecomms.length} queued command(s) need moving to another process." | |
send 'PRIVMSG', [command.channel], "#{command.user}: Sorry, that command (#@prefix#{command.command}) took too long to process." | |
command.handler_process.kill | |
::Process.wait2 command.handler_process.pid | |
@processes.delete command.handler_process | |
@log.puts "#{command.handler_process.pid} killed" | |
start_process | |
moven = 0 | |
movecomms.each do |mc| | |
process = @processes[moven % @processes.length] | |
mc.handler_process = nil # prevent trying to dump IOs | |
process.inqueue.send mc | |
mc.handler_process = process | |
@log.puts "Moved command #{mc.id} to pid #{process.pid}" | |
moven += 1 | |
end | |
end | |
def start_process | |
inqueue = Queue.new | |
outqueue = Queue.new | |
pid = fork do | |
inqueue.close_write | |
outqueue.close_read | |
@process = Process.new $$, inqueue, outqueue | |
trap('INT') { } | |
trap('TERM') { @process.stop } | |
@log.puts "Started #$$" | |
outqueue.send true | |
@process.loop | |
end | |
inqueue.close_read | |
outqueue.close_write | |
outqueue.receive or return @log.puts "something went wrong with starting #$$ ..." | |
@processes << Process.new(pid, inqueue, outqueue) | |
end | |
class Queue | |
def self.select r, w=[], e=[], timeout=nil | |
rmaps = {} | |
wmaps = {} | |
emaps = {} | |
rios = r.map do |q| | |
if q.is_a? Queue | |
rmaps[q.reader] = q | |
q.reader | |
else | |
q | |
end | |
end | |
wios = w.map do |q| | |
if q.is_a? Queue | |
wmaps[q.writer] = q | |
q.writer | |
else | |
q | |
end | |
end | |
eios = e.map do |q| | |
if q.is_a? Queue | |
emaps[q.writer] = q | |
emaps[q.reader] = q | |
[q.reader, q.writer] | |
else | |
q | |
end | |
end.flatten | |
result = IO.select(rios, wios, eios, timeout) | |
return nil if result.nil? | |
result[0].map! {|io| rmaps[io] or io } | |
result[1].map! {|io| wmaps[io] or io } | |
result[2].map! {|io| emaps[io] or io } | |
result | |
end | |
def initialize | |
@read, @write = IO.pipe | |
end | |
def send obj | |
rep = Marshal.dump obj | |
@write.write rep | |
end | |
def read | |
Marshal.load @read | |
end | |
alias receive read | |
alias recv read | |
def send_nonblock obj | |
IO.select([], [@write], [], 0) and send obj | |
end | |
def read_nonblock | |
IO.select([@read], [], [], 0) and send obj | |
end | |
alias receive_nonblock read_nonblock | |
alias recv_nonblock read | |
def each &block | |
until @read.eof? | |
block.call receive | |
end | |
end | |
def close_read | |
@read.close | |
end | |
def close_write | |
@write.close | |
end | |
def reader | |
@read | |
end | |
def writer | |
@write | |
end | |
def inspect | |
"#<#{self.class} fd:#{@read.inspect},#{@write.inspect}>" | |
end | |
end | |
class Process | |
attr_reader :pid, :inqueue, :outqueue | |
def initialize pid, inqueue, outqueue | |
@pid = pid | |
@inqueue = inqueue | |
@outqueue = outqueue | |
@busy = false | |
end | |
def stop | |
if @pid == $$ | |
if @busy | |
@stopasap = true | |
else | |
finishup | |
end | |
else | |
::Process.kill 'TERM', @pid | |
end | |
end | |
def kill | |
if @pid == $$ | |
exit 1 | |
else | |
::Process.kill 'KILL', @pid | |
end | |
end | |
def loop | |
@inqueue.each do |command| | |
begin | |
@busy = true | |
@outqueue.send CommandStatus.new :started, command.id | |
if command.command == 'randomwait' | |
dur = rand 10 | |
sleep dur | |
@outqueue.send Response.new(command.id, "Waited #{dur} seconds for #{command.args.inspect}") | |
elsif command.command == 'runaway' | |
sleep 60 | |
@outqueue.send Response.new(command.id, "You should never see this.") | |
else | |
@outqueue.send Response.new(command.id, "You used: #{command.command} (responded from pid #$$)") | |
end | |
@outqueue.send CommandStatus.new :done, command.id | |
ensure | |
@busy = false | |
stop if @stopasap | |
end | |
end | |
end | |
def finishup | |
return nil unless @pid == $$ | |
@inqueue.close_read | |
@outqueue.close_write | |
exit 0 | |
end | |
end | |
class Command | |
attr_accessor :id, :command, :args, :user, :channel, :handler_process, :started | |
def initialize id, command, args, user, channel | |
@id = id | |
@command = command | |
@args = args | |
@user = user | |
@channel = channel | |
@done = false | |
end | |
def started?; not not @started; end | |
def done?; @done; end | |
def done!; @done = true; end | |
end | |
class Response | |
attr_accessor :command_id, :message | |
def initialize command_id, message | |
@command_id = command_id | |
@message = message | |
end | |
end | |
class CommandStatus | |
attr_reader :status, :command_id | |
def initialize status, command_id | |
@status = status | |
@command_id = command_id | |
end | |
end | |
end | |
if $0 == __FILE__ | |
f = Fauxlot.new 'irc.freenode.net' | |
f.start | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment