Skip to content

Instantly share code, notes, and snippets.

@mojombo
Created August 5, 2008 22:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mojombo/4130 to your computer and use it in GitHub Desktop.
Save mojombo/4130 to your computer and use it in GitHub Desktop.
--- driver.rb-orig 2008-06-26 17:06:27.000000000 -0700
+++ driver.rb 2008-06-26 18:22:58.000000000 -0700
@@ -1,47 +1,171 @@
module God
-
- class DriverEvent
- attr_accessor :condition, :at
+ class TimedEvent
+ include Comparable
+
+ attr_accessor :at
- # Instantiate a new TimerEvent that will be triggered after the specified delay
- # +condition+ is the Condition
+ # Instantiate a new TimedEvent that will be triggered after the specified delay
# +delay+ is the number of seconds from now at which to trigger
#
- # Returns TimerEvent
- def initialize(condition, delay)
- self.condition = condition
+ # Returns TimedEvent
+ def initialize(delay = 0)
self.at = Time.now + delay
end
-
+
def due?
Time.now >= self.at
end
+
+ def <=>(other)
+ self.at <=> other.at
+ end
end # DriverEvent
+
+ class DriverEvent < TimedEvent
+ attr_accessor :condition, :task
+
+ def initialize(delay, task, condition)
+ super delay
+ self.task = task
+ self.condition = condition
+ end
+
+ def handle_event
+ @task.handle_poll(@condition)
+ end
+ end # DriverEvent
+
+ class DriverOperation < TimedEvent
+ attr_accessor :task, :name, :args
+
+ def initialize(task, name, args)
+ super(0)
+ self.task = task
+ self.name = name
+ self.args = args
+ end
+
+ # Handle the next queued operation that was issued asynchronously
+ #
+ # Returns nothing
+ def handle_event
+ @task.send(@name, *@args)
+ end
+ end
+ class DriverEventQueue
+ def initialize
+ @shutdown = false
+ @waiting = []
+ @events = []
+ @waiting.taint
+ @events.taint
+ self.taint
+ end
+
+ #
+ # Wake any sleeping threads after setting the sentinel
+ #
+ def shutdown
+ @shutdown = true
+ begin
+ Thread.critical = true
+ @waiting.each do |t|
+ t.run
+ end
+ ensure
+ Thread.critical = false
+ end
+ end
+
+ #
+ # Sleep until the queue has something due
+ #
+ def pop
+ begin
+ while (Thread.critical = true; @events.empty? or !@events.first.due?)
+ @waiting.push Thread.current
+ if @events.empty?
+ raise ThreadError, "queue empty" if @shutdown
+ Thread.stop
+ else
+ Thread.critical = false
+ sleep @events.first.at - Time.now
+ Thread.critical = true
+ end
+ end
+ @events.shift
+ ensure
+ Thread.critical = false
+ end
+ end
+
+ alias shift pop
+ alias deq pop
+
+ #
+ # Add an event to the queue, wake any waiters if what we added needs to
+ # happen sooner than the next pending event
+ #
+ def push(event)
+ Thread.critical = true
+ @events << event
+ @events.sort!
+ begin
+ t = @waiting.shift if @events.first == event
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ ensure
+ Thread.critical = false
+ end
+ begin
+ t.run if t
+ rescue ThreadError
+ end
+ end
+
+ alias << push
+ alias enq push
+
+ def empty?
+ @que.empty?
+ end
+
+ def clear
+ @events.clear
+ end
+
+ def length
+ @events.length
+ end
+
+ alias size length
+
+ def num_waiting
+ @waiting.size
+ end
+ end
+
+
class Driver
attr_reader :thread
- INTERVAL = 0.25
-
# Instantiate a new Driver and start the scheduler loop to handle events
# +task+ is the Task this Driver belongs to
#
# Returns Driver
def initialize(task)
@task = task
- @events = []
- @ops = Queue.new
+ @events = God::DriverEventQueue.new
@thread = Thread.new do
loop do
begin
- if !@ops.empty?
- self.handle_op
- elsif !@events.empty?
- self.handle_event
- else
- sleep INTERVAL
- end
+ @events.pop.handle_event
+ rescue ThreadError => e
+ # queue is empty
+ break
rescue Exception => e
message = format("Unhandled exception in driver loop - (%s): %s\n%s",
e.class, e.message, e.backtrace.join("\n"))
@@ -51,31 +175,8 @@
end
end
- # Handle the next queued operation that was issued asynchronously
- #
- # Returns nothing
- def handle_op
- command = @ops.pop
- @task.send(command[0], *command[1])
- end
-
- # Handle the next event (poll condition) that is due
- #
- # Returns nothing
- def handle_event
- if @events.first.due?
- event = @events.shift
- @task.handle_poll(event.condition)
- end
-
- # don't sleep if there is a pending event and it is due
- unless @events.first && @events.first.due?
- sleep INTERVAL
- end
- end
-
# Clear all events for this Driver
- #
+ #
# Returns nothing
def clear_events
@events.clear
@@ -87,7 +188,7 @@
#
# Returns nothing
def message(name, args = [])
- @ops.push([name, args])
+ @events.push(DriverOperation.new(@task, name, args))
end
# Create and schedule a new DriverEvent
@@ -98,11 +199,8 @@
def schedule(condition, delay = condition.interval)
applog(nil, :debug, "driver schedule #{condition} in #{delay} seconds")
- @events.concat([DriverEvent.new(condition, delay)])
-
- # sort events
- @events.sort! { |x, y| x.at <=> y.at }
+ @events.push(DriverEvent.new(delay, @task, condition))
end
end # Driver
-end # God
\ No newline at end of file
+end # God
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment