xhr (owner)

Revisions

gist: 221821 Download_button fork
public
Description:
Un mini-scheduler scris in Ruby pentru a rula task-uri in paralel.
Public Clone URL: git://gist.github.com/221821.git
Embed All Files: show embed
scheduler.rb #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
## Ruleaza bucati de Ruby pe un numar maxim de thread-uri
#
# Utilizare:
#
# runner = Scheduler.new :max_threads => 20
# ary.each { |item| runner.queue { stuff_to_do_in_paralel } }
# runner.start
#
# Codul este complet netestat.
 
class Scheduler
  MAX_THREADS = 15
  FREQUENCY = 1
  
  def initialize(opt = {})
    @max_threads = opt[:max_threads] || MAX_THREADS
    @frequency = opt[:interval] || FREQUENCY
    
    @running_threads = 0
    @working_queue = Queue.new
    
    @lock = Mutex.new
    
    @logger = Logger.new(STDOUT)
    @logger.datetime_format = "%H:%M:%S"
  end
  
  def queue(&block)
    @working_queue.push block
  end
  
  def start
    while @working_queue.size > 0
      if @running_threads < @max_threads
        spawn :worker => @working_queue.pop
      else
        # Asteapta @frequency secunde si apoi resume
        sleep @frequency
      end
    end
    
    Thread.list.each { |t| t.join unless t == Thread.main or t == Thread.current }
  end
 
  private
    def spawn(args = {})
      @running_threads += 1
      logger "Running threads: #{@running_threads}, queue size: #{@working_queue.size}"
      
      proc = args[:worker]
      
      Thread.new do
        proc.call
      
        @lock.synchronize { @running_threads -= 1 }
      end
    end
    
    def logger(msg)
      @logger.info(msg)
    end
end