Created
August 5, 2008 22:32
-
-
Save mojombo/4130 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- driver.rb-orig 2008-06-26 17:06:27.000000000 -0700 | |
+++ driver.rb 2008-06-26 18:22:58.000000000 -0700 | |
@@ -1,47 +1,171 @@ | |
module God | |
- | |
- class DriverEvent | |
- attr_accessor :condition, :at | |
+ class TimedEvent | |
+ include Comparable | |
+ | |
+ attr_accessor :at | |
- # Instantiate a new TimerEvent that will be triggered after the specified delay | |
- # +condition+ is the Condition | |
+ # Instantiate a new TimedEvent that will be triggered after the specified delay | |
# +delay+ is the number of seconds from now at which to trigger | |
# | |
- # Returns TimerEvent | |
- def initialize(condition, delay) | |
- self.condition = condition | |
+ # Returns TimedEvent | |
+ def initialize(delay = 0) | |
self.at = Time.now + delay | |
end | |
- | |
+ | |
def due? | |
Time.now >= self.at | |
end | |
+ | |
+ def <=>(other) | |
+ self.at <=> other.at | |
+ end | |
end # DriverEvent | |
+ | |
+ class DriverEvent < TimedEvent | |
+ attr_accessor :condition, :task | |
+ | |
+ def initialize(delay, task, condition) | |
+ super delay | |
+ self.task = task | |
+ self.condition = condition | |
+ end | |
+ | |
+ def handle_event | |
+ @task.handle_poll(@condition) | |
+ end | |
+ end # DriverEvent | |
+ | |
+ class DriverOperation < TimedEvent | |
+ attr_accessor :task, :name, :args | |
+ | |
+ def initialize(task, name, args) | |
+ super(0) | |
+ self.task = task | |
+ self.name = name | |
+ self.args = args | |
+ end | |
+ | |
+ # Handle the next queued operation that was issued asynchronously | |
+ # | |
+ # Returns nothing | |
+ def handle_event | |
+ @task.send(@name, *@args) | |
+ end | |
+ end | |
+ class DriverEventQueue | |
+ def initialize | |
+ @shutdown = false | |
+ @waiting = [] | |
+ @events = [] | |
+ @waiting.taint | |
+ @events.taint | |
+ self.taint | |
+ end | |
+ | |
+ # | |
+ # Wake any sleeping threads after setting the sentinel | |
+ # | |
+ def shutdown | |
+ @shutdown = true | |
+ begin | |
+ Thread.critical = true | |
+ @waiting.each do |t| | |
+ t.run | |
+ end | |
+ ensure | |
+ Thread.critical = false | |
+ end | |
+ end | |
+ | |
+ # | |
+ # Sleep until the queue has something due | |
+ # | |
+ def pop | |
+ begin | |
+ while (Thread.critical = true; @events.empty? or !@events.first.due?) | |
+ @waiting.push Thread.current | |
+ if @events.empty? | |
+ raise ThreadError, "queue empty" if @shutdown | |
+ Thread.stop | |
+ else | |
+ Thread.critical = false | |
+ sleep @events.first.at - Time.now | |
+ Thread.critical = true | |
+ end | |
+ end | |
+ @events.shift | |
+ ensure | |
+ Thread.critical = false | |
+ end | |
+ end | |
+ | |
+ alias shift pop | |
+ alias deq pop | |
+ | |
+ # | |
+ # Add an event to the queue, wake any waiters if what we added needs to | |
+ # happen sooner than the next pending event | |
+ # | |
+ def push(event) | |
+ Thread.critical = true | |
+ @events << event | |
+ @events.sort! | |
+ begin | |
+ t = @waiting.shift if @events.first == event | |
+ t.wakeup if t | |
+ rescue ThreadError | |
+ retry | |
+ ensure | |
+ Thread.critical = false | |
+ end | |
+ begin | |
+ t.run if t | |
+ rescue ThreadError | |
+ end | |
+ end | |
+ | |
+ alias << push | |
+ alias enq push | |
+ | |
+ def empty? | |
+ @que.empty? | |
+ end | |
+ | |
+ def clear | |
+ @events.clear | |
+ end | |
+ | |
+ def length | |
+ @events.length | |
+ end | |
+ | |
+ alias size length | |
+ | |
+ def num_waiting | |
+ @waiting.size | |
+ end | |
+ end | |
+ | |
+ | |
class Driver | |
attr_reader :thread | |
- INTERVAL = 0.25 | |
- | |
# Instantiate a new Driver and start the scheduler loop to handle events | |
# +task+ is the Task this Driver belongs to | |
# | |
# Returns Driver | |
def initialize(task) | |
@task = task | |
- @events = [] | |
- @ops = Queue.new | |
+ @events = God::DriverEventQueue.new | |
@thread = Thread.new do | |
loop do | |
begin | |
- if !@ops.empty? | |
- self.handle_op | |
- elsif !@events.empty? | |
- self.handle_event | |
- else | |
- sleep INTERVAL | |
- end | |
+ @events.pop.handle_event | |
+ rescue ThreadError => e | |
+ # queue is empty | |
+ break | |
rescue Exception => e | |
message = format("Unhandled exception in driver loop - (%s): %s\n%s", | |
e.class, e.message, e.backtrace.join("\n")) | |
@@ -51,31 +175,8 @@ | |
end | |
end | |
- # Handle the next queued operation that was issued asynchronously | |
- # | |
- # Returns nothing | |
- def handle_op | |
- command = @ops.pop | |
- @task.send(command[0], *command[1]) | |
- end | |
- | |
- # Handle the next event (poll condition) that is due | |
- # | |
- # Returns nothing | |
- def handle_event | |
- if @events.first.due? | |
- event = @events.shift | |
- @task.handle_poll(event.condition) | |
- end | |
- | |
- # don't sleep if there is a pending event and it is due | |
- unless @events.first && @events.first.due? | |
- sleep INTERVAL | |
- end | |
- end | |
- | |
# Clear all events for this Driver | |
- # | |
+ # | |
# Returns nothing | |
def clear_events | |
@events.clear | |
@@ -87,7 +188,7 @@ | |
# | |
# Returns nothing | |
def message(name, args = []) | |
- @ops.push([name, args]) | |
+ @events.push(DriverOperation.new(@task, name, args)) | |
end | |
# Create and schedule a new DriverEvent | |
@@ -98,11 +199,8 @@ | |
def schedule(condition, delay = condition.interval) | |
applog(nil, :debug, "driver schedule #{condition} in #{delay} seconds") | |
- @events.concat([DriverEvent.new(condition, delay)]) | |
- | |
- # sort events | |
- @events.sort! { |x, y| x.at <=> y.at } | |
+ @events.push(DriverEvent.new(delay, @task, condition)) | |
end | |
end # Driver | |
-end # God | |
\ No newline at end of file | |
+end # God |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment