Skip to content

Instantly share code, notes, and snippets.

@kingcu
Created June 20, 2011 20:58
Show Gist options
  • Save kingcu/1036565 to your computer and use it in GitHub Desktop.
Save kingcu/1036565 to your computer and use it in GitHub Desktop.
Mailinator - email processing server, listens for emails pipes over socket, enqueues email to Resque to be processed
require 'logger'
require 'socket'
require 'tempfile'
require 'rubygems'
require 'bundler/setup' #so we can require gems in the Gemfile
require 'resque' #in Gemfile, allowed to require by bundler/setup
require 'app/workers/email_worker'
@num_workers = 2
@sleep_time = 10
@logger = Logger.new("log/email_processor_server.log")
#file = ARGV[0]
#File.unlink(file) if File.exists?(file) && File.socket?(file)
TIMEOUT = 30 #seconds
WORKERS = {}
#SERVERS = [UNIXServer.new(file)]
SERVERS = [TCPServer.new(ARGV[0], ARGV[1])]
SELF_PIPE = [] #will be used in future for signal handling between master/children
#TERM/INT immediately and uncleanly shutdown
#QUIT gracefully shutsdown and HUP will eventually reload config
ACCEPTED_SIGS = [:QUIT, :HUP, :TERM, :INT]
#pile up signals to be handled
SIG_QUEUE = []
#renames processes - not sure what i want to do with this yet
def procname name
$0 = name
end
def init_self_pipe
SELF_PIPE.each { |io| io.close rescue nil }
SELF_PIPE.replace(IO.pipe)
SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end
#called from the master loop - we don't want to spin in a loop
#and eatup resources, so we use select. The master only wakes
#up when something writes to the end of the pipe (awake_master())
def master_sleep time
#SELF_PIPE[0] is the read pipe
IO.select([SELF_PIPE[0]], nil, nil, time) or return
SELF_PIPE[0].readpartial(1)
end
#write something to the pipe to trigger the select on the read end
#to wakeup the master process
def wake_master
SELF_PIPE[1].write('.')
@logger.debug("WOKE MASTER, SIG_QUEUE.size: #{SIG_QUEUE.size}")
end
def master_loop
procname("mailinator master")
begin
#kill any children who are zombied
reap_all_workers()
case SIG_QUEUE.shift
when nil
#spawn new workers if needed
spawn_workers()
#go back to sleep
master_sleep(@sleep_time)
when :QUIT
break #end loop, which will return/end program
when :TERM, :INT
stop(false)
break
when :HUP #reload configs when/if i go this route
end
rescue => e
WORKERS.each_pair { |wpid, worker| kill_worker(:QUIT, wpid) }
@logger.error("rescued exception in master loop: #{e}")
#error handling...kill children since master fucked up
end while true
stop()
end
def stop(graceful=true)
timeout = Time.now + 30
while WORKERS.size > 0 || Time.now > timeout
kill_all_workers(graceful ? :QUIT : :TERM)
sleep(0.1) #don't slam the CPU for thirty seconds!
reap_all_workers()
end
#if timeout is exceeded and there are still workers
#this will kill them with a quickness
kill_all_workers(:KILL)
end
def reap_all_workers
begin
#wait for child to exit (zombie): -1 for any child, WNOHANG saying do not block if no child
wpid, status = Process.waitpid2(-1, Process::WNOHANG)
return unless wpid #don't hang if no wpid
worker = WORKERS.delete(wpid) and worker[:tmp].close rescue nil
#TODO: logging - status.success? happy, else error
rescue Errno::ECHILD #could be no child, so break
break
end while true
end
def kill_all_workers(sig)
WORKERS.keys.each { |wpid| kill_worker(sig, wpid) }
end
def kill_worker(signal, wpid)
@logger.info("Killing worker with pid: #{wpid}")
Process.kill(signal, wpid)
worker = WORKERS.delete(wpid) and worker[:tmp].close rescue nil
end
#Actually does work when given data through the socket
#Workers call this when something is available. We pass
#off to resque since that is aware of our rails app, and
#we don't want to tie up workers here.
def handle_data(res)
sock, sockaddr = res
data = sock.read
@logger.debug("Accepted email: #{data}")
Resque.enqueue(EmailWorker, data)
end
#borrowed heavily from unicorn's excellent philosophy. each worker
#keeps a tempfile and each loop changes permissions using chmod, this
#way we can check the ctime of each tempfile in the workers to see
#if a particular worker is hung. Each worker should be quick to handle
#a request.
def worker_loop worker
flipper = 0 #flips between 0/1
alive = true
#ugly place to put these traps, but we need the TERM/INT to be in the loop
#so we can exit out of the loop by setting alive=false
trap(:QUIT) { alive = false; SERVERS.each { |s| s.close rescue nil } }
[:TERM, :INT].each { |sig| trap(sig) { exit!(0) } }
trap(:CHLD, 'DEFAULT')
begin
worker[:tmp].chmod(flipper = 0 == flipper ? 1 : 0)
SERVERS.each do |server|
data = server.accept
handle_data(data)
end
#do it before and after, so we get a more resolute idea of actual
#time worker is doing something
worker[:tmp].chmod(flipper = 0 == flipper ? 1 : 0)
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
#timeout to detect parent death or activity
IO.select(SERVERS, nil, SELF_PIPE, TIMEOUT)
retry
end while alive
end
def spawn_workers
@num_workers.times do |i|
#don't spawn if already have a worker of that number.
#we might be increasing worker count during load
break if @num_workers == WORKERS.length
worker = {:tmp => Tempfile.new("worker[#{i}]"), :num => i}
WORKERS[fork {
procname("mailinator worker[#{i}]")
#disconnect child processes from signals
ACCEPTED_SIGS.each { |sig| trap(sig, nil) }
SIG_QUEUE.clear()
#init_self_pipe() #replaces the pipe
worker_loop(worker)
}] = worker
end
end
ACCEPTED_SIGS.each { |sig| trap(sig) { SIG_QUEUE << sig; wake_master() } }
#trap signals from the child - just wakening the master so it can continue
#its loop, which will reap any child processes that are now zombied. The
#master will then respawn a new child process if needed to maintain
#the worker count
trap(:CHLD) { wake_master() }
init_self_pipe()
spawn_workers()
master_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment