Skip to content

Instantly share code, notes, and snippets.

@funny-falcon
Created March 12, 2012 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save funny-falcon/2025370 to your computer and use it in GitHub Desktop.
Save funny-falcon/2025370 to your computer and use it in GitHub Desktop.
Fixed EM Mutexes
require 'eventmachine'
require 'fiber'
module EM
module Synchrony
class Mutex
def initialize
@waiters = []
end
def lock
current = Fiber.current
raise FiberError if @waiters.include?(current)
@waiters << current
Fiber.yield unless @waiters.first == current
true
end
def locked?
!@waiters.empty?
end
def sleep(timeout = nil)
unlock
if timeout
current = Fiber.current
timer = EM.add_timer(timeout) do
current.resume
end
res = Fiber.yield
EM.cancel_timer timer # if we resumes not via timer
res
else
Fiber.yield
end
lock
end
def try_lock
lock unless locked?
end
def unlock
raise FiberError unless @waiters.first == Fiber.current
@waiters.shift
unless @waiters.empty?
EM.next_tick{ @waiters.first.resume }
end
self
end
def synchronize(&blk)
lock
yield
ensure
unlock
end
end
class ConditionVariable
#
# Creates a new ConditionVariable
#
def initialize
@waiters = []
end
#
# Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
#
# If +timeout+ is given, this method returns after +timeout+ seconds passed,
# even if no other thread doesn't signal.
#
def wait(mutex, timeout=nil)
current = Fiber.current
begin
@waiters << current
mutex.sleep timeout
ensure
@waiters.delete current
end
self
end
#
# Wakes up the first thread in line waiting for this lock.
#
def signal
if f = @waiters.shift
EM.next_tick{
begin
f.resume
#rescue FiberError
end
}
end
self
end
#
# Wakes up all threads waiting for this lock.
#
def broadcast
# TODO: imcomplete
waiters0 = nil
waiters0 = @waiters.dup
@waiters.clear
waiters0.each do |f|
EM.next_tick{
begin
f.resume
#rescue FiberError
end
}
end
self
end
end
module MonitorMixin
class ConditionVariable
class Timeout < Exception; end
def initialize(monitor)
@monitor = monitor
@cond = EM::Synchrony::ConditionVariable.new
end
def wait(timeout = nil)
@monitor.__send__(:_mon_check_owner)
count = @monitor.__send__(:_mon_exit_for_cond)
begin
@cond.wait(@monitor.instance_variable_get("@mon_mutex"), timeout)
return true
ensure
@monitor.__send__(:_mon_enter_for_cond, count)
end
end
def wait_while
while yield
wait
end
end
def wait_until
until yield
wait
end
end
def signal
@monitor.__send__(:_mon_check_owner)
@cond.signal
end
def broadcast
@monitor.__send__(:_mon_check_owner)
@cond.broadcast
end
end
def self.extend_object(obj)
super(obj)
obj.__send__(:_mon_initialize)
end
def initialize(*args)
super
_mon_initialize
end
def mon_try_enter
if @mon_owner != Fiber.current
unless @mon_mutex.try_lock
return false
end
@mon_owner = Fiber.current
end
@mon_count += 1
return true
end
def mon_enter
if @mon_owner != Fiber.current
@mon_mutex.lock
@mon_owner = Fiber.current
end
@mon_count += 1
end
#
# Leaves exclusive section.
#
def mon_exit
_mon_check_owner
@mon_count -=1
if @mon_count == 0
@mon_owner = nil
@mon_mutex.unlock
end
end
def mon_synchronize
mon_enter
begin
yield
rescue => e
require 'pp'
puts e
pp e.backtrace
ensure
mon_exit
end
end
alias synchronize mon_synchronize
def new_cond
return ConditionVariable.new self
end
def _mon_initialize
@mon_owner = nil
@mon_count = 0
@mon_mutex = EM::Synchrony::Mutex.new
end
def _mon_check_owner
if @mon_owner != Fiber.current
raise FiberError, "current fiber not owner"
end
end
def _mon_enter_for_cond(count)
@mon_owner = Fiber.current
@mon_count = count
end
def _mon_exit_for_cond
count = @mon_count
@mon_owner = nil
@mon_count = 0
return count
end
end
class Monitor
include MonitorMixin
alias try_enter mon_try_enter
alias enter mon_enter
alias exit mon_exit
end
end
end
f1 = nil
f2 = nil
f3 = nil
m = EM::Synchrony::Monitor.new
cond = m.new_cond
f1 = Fiber.new do
m.synchronize do
puts "WAIT!"
cond.wait
puts "WAITED!"
end
EM.stop
end
f2 = Fiber.new do
m.synchronize do
puts "Before signal"
cond.signal
puts "After signal"
end
end
f3 = Fiber.new do
m.synchronize do
Fiber.yield
end
end
EM.run do
# забиваем монитор на f3
f3.resume
# f1 ставится в ожидание
f1.resume
# f2 ставится в ожидание
f2.resume
# а теперь запускаем всю эту хуйню
f3.resume
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment