Skip to content

Instantly share code, notes, and snippets.

@dpk
Created November 14, 2012 21:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dpk/4074919 to your computer and use it in GitHub Desktop.
Save dpk/4074919 to your computer and use it in GitHub Desktop.
Fauxlot, a sketch for a duxlot/phenny-like bot
require 'socket'
class Fauxlot
@@linere = /(?::([^ ]+) +)?((?:.(?! :))+.)(?: +:?(.*))?/
def initialize server, port=6667, options = {}
@server = server
@port = port
@nick = (options[:nick] or 'fauxlot')
@nprocesses = (options[:processes] or 4)
@prefix = (options[:prefix] or '\'')
@channels = (options[:channels] or ['#swhack'])
@log = (options[:log] or STDERR)
@timeout = (options[:timeout] or 30)
end
def start
@socket = TCPSocket.new @server, @port
send 'NICK', [], @nick
send 'USER', ['fauxlot', '0', '*'], 'Faux Lot'
trap('INT') { stop }
start_processes
@channels.each {|channel| join channel }
handle_loop
end
def stop
stop_processes
send 'QUIT'
@socket = nil
end
def join channel
send 'JOIN', [], channel
end
def handle_loop
@commands = {}
@command_ctr = 0
loop do
rsocks = @processes.map(&:outqueue)
rsocks << @socket
selected = Queue.select(rsocks, [], [], 2)
if selected
rs, ws, es = selected
rs.each do |r|
if r == @socket
line = recv
if command = parse_command(line)
process = @processes[@command_ctr % @processes.length]
process.inqueue.send command
command.handler_process = process
end
elsif r.is_a? Queue
response = r.receive
if response.is_a? CommandStatus
command = @commands[response.command_id]
if response.status == :started
command.started = Time.now
else response.status == :done
command.done!
end
elsif response.is_a? Response
command = @commands[response.command_id]
send 'PRIVMSG', [command.channel], response.message
end
end
end
else
@commands.each do |id, command|
if command.started? and not command.done? and (Time.now - command.started) >= @timeout
deal_with_runaway command
elsif command.done? # to prevent this process taking longer as the bot gets older
@log.puts "GC: reaped #{id}"
@commands.delete id
end
end
end
end
end
def parse_command line
m = @@linere.match line.chomp
user = m[1]
event, *eargs = m[2].split ' '
text = m[3]
nick = user.split('!').first rescue nil
if event == 'PRIVMSG' and text[0...(@prefix.length)] == @prefix
command, args = text.split(' ', 2)
command = command[(@prefix.length)..-1]
command = Command.new(@command_ctr += 1, command, args, nick, eargs.first)
@commands[command.id] = command
command
else
if event == 'PING'
send 'PONG', [@nick]
end
nil
end
end
private
def send command, params=[], text=nil
tosend = "#{command.upcase} #{params.join ' '}#{" :#{text}" if text}\r\n"
@socket.write tosend
@log.puts 'SENT:' + tosend.inspect
end
def recv
line = @socket.gets
@log.puts 'RECV:' + line.inspect
line
end
def start_processes
@log.puts "Starting processes ..."
@processes = []
@nprocesses.times do
start_process
end
end
def stop_processes
@log.puts "Stopping processes ..."
@processes.each(&:stop)
while (pid, stat = Process.wait2(-1) rescue nil)
@log.puts "#{pid}: #{status.exitstatus}"
end
end
def deal_with_runaway command
@log.puts "**RUNAWAY COMMAND!** #{command.command} for #{command.user} on #{command.channel} in process #{command.handler_process.pid}"
command.done!
movecomms = []
@commands.each do |id, oc|
if not oc.done? and oc.handler_process == command.handler_process
movecomms << oc
end
end
@log.puts "#{movecomms.length} queued command(s) need moving to another process."
send 'PRIVMSG', [command.channel], "#{command.user}: Sorry, that command (#@prefix#{command.command}) took too long to process."
command.handler_process.kill
::Process.wait2 command.handler_process.pid
@processes.delete command.handler_process
@log.puts "#{command.handler_process.pid} killed"
start_process
moven = 0
movecomms.each do |mc|
process = @processes[moven % @processes.length]
mc.handler_process = nil # prevent trying to dump IOs
process.inqueue.send mc
mc.handler_process = process
@log.puts "Moved command #{mc.id} to pid #{process.pid}"
moven += 1
end
end
def start_process
inqueue = Queue.new
outqueue = Queue.new
pid = fork do
inqueue.close_write
outqueue.close_read
@process = Process.new $$, inqueue, outqueue
trap('INT') { }
trap('TERM') { @process.stop }
@log.puts "Started #$$"
outqueue.send true
@process.loop
end
inqueue.close_read
outqueue.close_write
outqueue.receive or return @log.puts "something went wrong with starting #$$ ..."
@processes << Process.new(pid, inqueue, outqueue)
end
class Queue
def self.select r, w=[], e=[], timeout=nil
rmaps = {}
wmaps = {}
emaps = {}
rios = r.map do |q|
if q.is_a? Queue
rmaps[q.reader] = q
q.reader
else
q
end
end
wios = w.map do |q|
if q.is_a? Queue
wmaps[q.writer] = q
q.writer
else
q
end
end
eios = e.map do |q|
if q.is_a? Queue
emaps[q.writer] = q
emaps[q.reader] = q
[q.reader, q.writer]
else
q
end
end.flatten
result = IO.select(rios, wios, eios, timeout)
return nil if result.nil?
result[0].map! {|io| rmaps[io] or io }
result[1].map! {|io| wmaps[io] or io }
result[2].map! {|io| emaps[io] or io }
result
end
def initialize
@read, @write = IO.pipe
end
def send obj
rep = Marshal.dump obj
@write.write rep
end
def read
Marshal.load @read
end
alias receive read
alias recv read
def send_nonblock obj
IO.select([], [@write], [], 0) and send obj
end
def read_nonblock
IO.select([@read], [], [], 0) and send obj
end
alias receive_nonblock read_nonblock
alias recv_nonblock read
def each &block
until @read.eof?
block.call receive
end
end
def close_read
@read.close
end
def close_write
@write.close
end
def reader
@read
end
def writer
@write
end
def inspect
"#<#{self.class} fd:#{@read.inspect},#{@write.inspect}>"
end
end
class Process
attr_reader :pid, :inqueue, :outqueue
def initialize pid, inqueue, outqueue
@pid = pid
@inqueue = inqueue
@outqueue = outqueue
@busy = false
end
def stop
if @pid == $$
if @busy
@stopasap = true
else
finishup
end
else
::Process.kill 'TERM', @pid
end
end
def kill
if @pid == $$
exit 1
else
::Process.kill 'KILL', @pid
end
end
def loop
@inqueue.each do |command|
begin
@busy = true
@outqueue.send CommandStatus.new :started, command.id
if command.command == 'randomwait'
dur = rand 10
sleep dur
@outqueue.send Response.new(command.id, "Waited #{dur} seconds for #{command.args.inspect}")
elsif command.command == 'runaway'
sleep 60
@outqueue.send Response.new(command.id, "You should never see this.")
else
@outqueue.send Response.new(command.id, "You used: #{command.command} (responded from pid #$$)")
end
@outqueue.send CommandStatus.new :done, command.id
ensure
@busy = false
stop if @stopasap
end
end
end
def finishup
return nil unless @pid == $$
@inqueue.close_read
@outqueue.close_write
exit 0
end
end
class Command
attr_accessor :id, :command, :args, :user, :channel, :handler_process, :started
def initialize id, command, args, user, channel
@id = id
@command = command
@args = args
@user = user
@channel = channel
@done = false
end
def started?; not not @started; end
def done?; @done; end
def done!; @done = true; end
end
class Response
attr_accessor :command_id, :message
def initialize command_id, message
@command_id = command_id
@message = message
end
end
class CommandStatus
attr_reader :status, :command_id
def initialize status, command_id
@status = status
@command_id = command_id
end
end
end
if $0 == __FILE__
f = Fauxlot.new 'irc.freenode.net'
f.start
end

To do

  • command interface

    most commands should be web services; all commands should use the same interface. like a Rack application, but for commands!

    for instance, i could define a WebServiceCommand class that would just take the url of an oblique-ish service and execute on it; or a more typical command class which does its own computation in a block; etc.

  • "database" side

    will marshaled files do? as long as only one process is responsible for each file, and they don't grow too large, yes.

  • periodic / scheduler

    already have this, kinda: @042171:73-80. runs every two seconds -- when the select timeout runs out. but putting things like .in there would be very ughy, and limit their time resolution to 2secs. or, have the timeout constantly running out and manually polling for periodics, but that's what select is designed to avoid.

    put scheduler in its own process, with queues going either way. a pipe to insert things into the scheduler queue, and a pipe to let the main process know when a task is ready, which is waited on in the main select loop. loop of the scheduler looks like this:

    • select on the pipe with a timeout of the time until the next task will be ready. when it returns:
      • if it's a new task from the pipe, add it to the task list;
      • if the timeout went, look for any tasks that need doing and pass them back to the main process before deleting them from the list.
      • repeat ad infinitum
  • catch worker crashes

    listen for SIGCHLD, use the self-pipe trick to handle them in the main loop. restart as appropriate.

  • refactor, kinda.

    move things that should be in their own methods into their own method.

  • name

    something suitably pseudo-arthurian

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment