Created
January 6, 2016 16:20
-
-
Save bf4/159e9e4e673085af5975 to your computer and use it in GitHub Desktop.
exceptionless timeouts using a selectable queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative "selectable_queue" | |
def run | |
queue = SelectableQueue.new | |
result = catch(:timeout) do | |
object = ->{ rand(5) } | |
puts "Starting background" | |
run_background_job(object, queue) | |
puts "Starting event loop" | |
run_event_loop(queue) | |
end | |
puts "got result '#{result}'" | |
end | |
def run_background_job(object, queue) | |
Thread.new do | |
loop do | |
sleep(30) | |
queue << object.call | |
end | |
end | |
end | |
def run_event_loop(queue) | |
loop do | |
read, _, _ = select([$stdin, queue], nil, nil) | |
handle_stdin if read.include?($stdin) | |
handle_timeline(queue) if read.include?(queue) | |
end | |
end | |
def handle_stdin | |
puts "Got stdin: '#{$stdin.getc}'" | |
end | |
def handle_timeline(queue) | |
element = queue.pop | |
throw :timeout, "got element #{element}" | |
end | |
trap("INT") do | |
puts 'bye bye' | |
exit 1 | |
end | |
run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Usable in a system #select call because | |
# it has a fake IO object inside | |
# https://gist.github.com/garybernhardt/2963229 | |
# A queue that you can pass to IO.select. | |
# | |
# NOT THREAD SAFE: Only one thread should write; only one thread should read. | |
# | |
# Purpose: | |
# Allow easy integration of data-producing threads into event loops. The | |
# queue will be readable from select's perspective as long as there are | |
# objects in the queue. | |
# | |
# Implementation: | |
# The queue maintains a pipe. The pipe contains a number of bytes equal to | |
# the queue size. | |
# | |
# Example use: | |
# queue = SelectableQueue.new | |
# readable, _, _ = IO.select([queue, $stdin]) | |
# print "got #{queue.pop}" if readable.contain?(queue) | |
# | |
require "thread" | |
class SelectableQueue | |
def initialize | |
@queue = Queue.new | |
@read_io, @write_io = IO.pipe | |
end | |
def <<(o) | |
@queue << o | |
# It doesn't matter what we write into the pipe, as long as it's one byte. | |
@write_io << '.' | |
self | |
end | |
def pop(nonblock=false) | |
o = @queue.pop(nonblock) | |
@read_io.read(1) | |
o | |
end | |
def to_io | |
@read_io | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment