Skip to content

Instantly share code, notes, and snippets.

@wwalker
Created May 29, 2009 20:09
Show Gist options
  • Save wwalker/120178 to your computer and use it in GitHub Desktop.
Save wwalker/120178 to your computer and use it in GitHub Desktop.
# set time zone as early as possible
ENV['TZ']='UTC'
require 'rubygems'
require 'activesupport'
# set the time zone for Active Support (it monkey patches Time)
Time.zone = 'Etc/UTC'
require 'logger'
# define an application wide log object (doesn't exist until DeliveryHandler.new() has run)
def dh_logger
DeliveryHandler::dh_log
end
# define an application wide config lookup tool (doesn't work until DeliveryHandler.new() has run DeliveryHandler::init_activerecord)
def dh_config(key, mod = 'delivery_handler')
config = SystemConfiguration.find_by_module_and_setting(mod, key)
return nil if ! config
config.value
end
def dh_status(key)
status = CurrentState.find_by_key(key)
return nil if ! status
status.value
end
def dh_status_increment(key)
status = CurrentState.find_by_key(key)
if ! status
status = CurrentState.create(:key => key, :value => 0)
end
CurrentState.increment_counter(:value, status.id)
end
def dh_status_decrement(key)
status = CurrentState.find_by_key(key)
if ! status
status = CurrentState.create(:key => key, :value => 1)
end
CurrentState.decrement_counter(:value, status.id)
end
def dh_status_set(key, value)
status = CurrentState.find_by_key(key)
if ! status
status = CurrentState.create(:key => key, :value => value)
else
status.value = value
status.save
end
end
class DeliveryHandler
attr_reader :thread
attr_reader :run
def dh_log
@@dh_logger
end
def initialize
#FIXME
# init_pid_file # should be handled in calling app, not here in the library.
@run = true
@@delivery_handlers = {}
init_activerecord
init_logging
zero_states
end
def zero_states
# FIXME
dh_status_set('current_6000', 0)
dh_status_set('current_6001', 0)
dh_status_set('current_6002', 0)
end
def run
Signal.trap("USR1") do
dh_log.info "Received and USR1 signal, shutting down"
@run = false
end
dh_log.debug "Singal handler for USR1 installed (probably won't work due to jruby interception of the signal)"
dh_log.debug "Our PID - #{$$}"
while @run # let signal handlers shut us down cleanly
messages_to_deliver.each do |mdr|
dh_log.info "Handling mdr # #{mdr.id}"
if mdr.job.abort?
mdr.current_state = MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_JOB_ABORTED
mdr.save
dh_log.info "job id #{mdr.job.id} was marked for abort after activation, aborting individual delivery attempt"
next
else
case mdr.message.media_type
when Message::MEDIA_TYPE_VOICE
dh = VoiceDeliveryHandler.new(mdr)
when Message::MEDIA_TYPE_EMAIL
dh = EmailDeliveryHandler.new(mdr)
when Message::MEDIA_TYPE_SMS
dh = SmsDeliveryHandler.new(mdr)
end
dh.deliver
@@delivery_handlers["#{Time.now} #{mdr.id}"] = dh
end
end
sleep 1
end
end
def reap_completed_handlers
@@delivery_handlers.keys.each do |key|
dh = @@delivery_handlers[key]
delivery_handler_thread = dh.thread
next if delivery_handler_thread.alive?
case @mdr.message.media_type
when Message::MEDIA_TYPE_EMAIL
decrement_used_capacity('email_capacity')
when Message::MEDIA_TYPE_SMS
decrement_used_capacity('sms_capacity')
end
@mdr.current_state = delivery_handler_thread[:current_state]
@mdr.last_state_at = (delivery_handler_thread[:last_state_at] || Time.now)
@mdr.save
# update the MDR
#if update_mdr
# MessageDeliveryResult.increment_counter(:delivery_attempts, mdr.id)
# mdr.provider_response = delivery_handler_thread['provider_response']
# mdr.current_state = delivery_handler_thread['current_state']
# mdr.attempted_at = (delivery_handler_thread['attempted_at'] || Time.now)
# mdr.last_state_at = (delivery_handler_thread['last_state_at'] || Time.now)
# mdr.delivered_at = (delivery_handler_thread['delivered_at'] || Time.now)
#end
end
end
def init_activerecord
# Read in the rails model
# FIXME hard coded to development
ENV['RAILS_ENV'] = 'development'
# FIXME hard coded path (probably keep, should never change)
# require '../../../rn_website/trunk/rapid_notify/config/environment.rb'
require '/home/pairprogramming/rapid_notify/rn_website/trunk/rapid_notify/config/environment.rb'
ActiveRecord::Base.logger = Logger.new('/tmp/job_scheduler_active_record.log')
ActiveRecord::Base.colorize_logging = false
end
def init_logging
dh_log = Logger.new(STDERR)
dh_log.level = Logger::DEBUG
dh_log.formatter = Logger::Formatter.new
dh_log.datetime_format = "%Y-%m-%d %H:%M:%S"
@@dh_logger = dh_log
end
def messages_to_deliver
messages = MessageDeliveryResult.find(:all, :conditions => "current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_PENDING} OR current_state IS NULL OR current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_QUEUED} OR current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_AWAITING_RETRY}")
#FIXME
# messages = messages.sort(priority)
dh_log.debug("Found #{messages.length} messages to do") if (messages.length > 0)
return messages
end
def decrement_used_capacity(key)
dh_status_decrement("current_#{key}")
end
def increment_used_capacity(key)
loop do
max_capacity = dh_config("max_#{key}").to_i
current_capacity = dh_config("current_#{key}").to_i
puts max_capacity
puts current_capacity
break if (current_capacity < max_capacity)
sleep 0.1 if ! reap_completed_handlers
end
dh_status_increment("current_#{key}")
end
def deliver
increment_used_capacity(@mdr.message.media_type)
@mdr.current_state = MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_RUNNING
@mdr.attempted_at = Time.now
@mdr.save
run
end
end
class VoiceDeliveryHandler < DeliveryHandler
def initialize(mdr)
@mdr = mdr
end
def run
@thread = Thread.new(@mdr.contact.phone_number, @mdr.id,@mdr.get_callerid) do |phone_number,mdr_id,callerid|
me = Thread.current
me[:current_state] = 0
me[:current_state] = make_call(phone_number,mdr_id,callerid)
end
end
def make_call(phone_number,mdr_id, callerid)
prefix = dh_config('sip_prefix')
peer = dh_config('sip_peer')
context = dh_config('target_context')
begin
adhearsion = DRbObject.new_with_uri 'druby://127.0.0.1:9050'
adhearsion.async_call_into_context(
"#{prefix}#{phone_number}@#{peer}",
context,
:callerid => callerid,
:async => '1',
:variables => {:mdr_id => mdr_id}
)
return MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_RUNNING
rescue Exception => err
dh_logger.error "begin exception from async_call_into_context"
dh_logger.error "#{err.message}\n#{err.backtrace.join("\n")}"
dh_logger.error "end exception from async_call_into_context"
end
end
end
class EmailDeliveryHandler < DeliveryHandler
def initialize(mdr)
@mdr = mdr
end
def run
#FIXME hard coded from address
puts __LINE__
@thread = Thread.new(@mdr.contact.sms, @mdr.id,@mdr.message.subject, @mdr.message.body,'support@rapidnotify.com') do |email,mdr_id,subject, body, sender|
puts __LINE__
me = Thread.current
me[:current_state] = 0
puts __LINE__
#me[:current_state] = send_email(email,mdr_id,subject, body, sender)
send_email(email,mdr_id,subject, body, sender)
puts __LINE__
end
end
def send_email
puts __LINE__
from_address = 'support@rapidnotify.com'
return_path_address = 'support@rapidnotify.com'
reply_to_address = sender
# FIXME is this done elsewhere?
msgstr = "To: #{email}
From: #{from_address}
Reply-To: #{reply_to_address}
Return-Path: #{return_path_address}
Subject: #{subject}
#{body}"
puts __LINE__
begin
puts __LINE__
Net::SMTP.start('127.0.0.1', 25) do |smtp|
smtp.send_message msgstr, from_address, email
end
puts __LINE__
@thread[:provider_response] = MessageDeliveryResult::EMAIL_RESULT_ACCEPTED
puts __LINE__
rescue Object => err
puts __LINE__
ae_logger.error "Mail to #{@mdr.contact.email} threw an exception: #{err}"
ae_logger.error "#{err.message}\n#{err.backtrace.join("\n")}"
puts __LINE__
@thread[:provider_response] = MessageDeliveryResult::EMAIL_RESULT_SERVICE_UNAVAILABLE
puts __LINE__
end
end
end
class SmsDeliveryHandler < DeliveryHandler
def initialize(mdr)
@mdr = mdr
end
def run
@thread = Thread.new(@mdr.contact.sms, @mdr.id,@mdr.message.subject, @mdr.message.body) do |sms,mdr_id,subject, body|
me = Thread.current
me[:current_state] = 0
me[:current_state] = send_sms(sms,mdr_id,subject, body)
end
end
def send_sms(sms,mdr_id,subject, message_content)
message_content = body
if !valid_nanp?(sms)
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_BAD_NUMBER
return
end
xml_content = File.read('sms_text.xml')
xml_content.gsub!('PHONE_NUMBER', sms)
xml_content.gsub!('MESSAGE_CONTENT', message_content)
begin
# FIXME hard coded hostname
http = Net::HTTP.new('gateway.ivisionmobile.com', 443)
http.use_ssl = true
# FIXME should we use a more secure option here? Probably not, but review.
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
resp,data = http.post('/ivisionwebpost.aspx', "xmlstring=#{xml_content}")
case resp
when Net::HTTPSuccess
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_ACCEPTED
return
else
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_OTHER_FAILURE
return
end
rescue Object => err
ae_logger.error "Unexpected exception thrown at end of sms delivery"
ae_logger.error "#{err.message}\n#{err.backtrace.join("\n")}"
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_SERVICE_UNAVAILABLE
return
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment