rbranson (owner)

Revisions

gist: 178302 Download_button fork
public
Public Clone URL: git://gist.github.com/178302.git
Embed All Files: show embed
twittersex.rb #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/ruby
#
# This benchmark test implements a simple threaded message queue writer backed
# by an SQLite3 database. It is modeled after a Twitter-like queue-per-destination
# message system which has N number of actors, each with a queue. The idea
# is to demonstrate a safe, disk-backed queueing system with a large number of
# queues. All of the queues are stored in a separate table in the database file.
#
# In 2009, by Rick Branson, Released into Public Domain.
#
 
RANDOM_TEXT_BODY_SIZE = 100000
SOURCE_TEXT_MAX = 16
MSG_TEXT_MAX = 140
 
# go ahead and commit a write when the queue gets to...
WRITE_QUEUE_BACKLOG = 250
 
# go ahead and commit when we haven't for N seconds...
WRITE_QUEUE_SLEEP_TIME = 1
 
# this is the maximum number of messages that will be written to disk at a time
WRITE_QUEUE_MSG_DISK_BACKLOG = 25000
 
require "rubygems"
require "sqlite3"
require "benchmark"
require "thread"
 
if ARGV.size < 3
  puts "#{__FILE__} <database> <user count> <msg count>"
  exit
end
 
$dbfile, $tcount, $mcount = ARGV
 
File.unlink($dbfile) rescue nil
 
class BackgroundMessageDatabase
  def initialize(file)
    @db = SQLite3::Database.new(file)
    @db.execute("PRAGMA synchronous = FULL;")
    @db.execute("PRAGMA journal_mode = TRUNCATE;")
    @queue = Queue.new
    @backlog = 0
    
    self.start
  end
  
  def write(dst, src, msg)
    @queue << [ dst, src, msg ]
    @backlog += 1
 
    # we could check the Queue.size but that would hit a mutex
    # and be slow. this is kind of just for general purpose
    # prodding of the background thread anyway and @backlog is
    # not meant to be an accurate count of the number of messages
    # waiting on the background writer.
    if !@finishing and @backlog > WRITE_QUEUE_BACKLOG
      if @thread.status
        @thread.run
      end
      
      @backlog = 0
    end
  end
  
  # WARNING: this is usually ran from inside another thread
  def work_queue
    if !@queue.empty?
      puts "background: queue has things."
      begin
        in_flight = 0
        @db.transaction
    
        while !@queue.empty?
          dst, src, bod = @queue.pop
          insert = Proc.new { @db.execute("INSERT INTO #{dst} VALUES(?, ?)", src, bod) }
        
          begin
            insert.call
          rescue SQLite3::SQLException
            @db.execute("CREATE TABLE #{dst} (source varchar, msg varchar)")
            insert.call
          end
      
          in_flight += 1
      
          if in_flight >= WRITE_QUEUE_MSG_DISK_BACKLOG
            puts "background: WRITE_QUEUE_MSG_DISK_BACKLOG reached, writing #{in_flight} messages..."
            @db.commit
            @db.transaction
            in_flight = 0
          end
        end
      rescue Exception => e
        puts "#{e.inspect}"
      ensure
        @db.commit
      end
    end
  end
  
  def start
    @thread = Thread.new do
      loop do
        work_queue
        Thread.exit if @finishing
        puts "background: zZzZ..."
        sleep(WRITE_QUEUE_SLEEP_TIME)
      end
    end
  end
  
  def finish
    puts "writer: finishing..."
    @finishing = true
    
    if @thread.status
      if @thread.status == "sleeping"
        puts "writer: background thread is sleeping, terminating..."
        @thread.terminate
      else
        puts "writer: background thread is still writing, waiting..."
        @thread.run rescue nil # rescue us from a lovely thread edge condition
        @thread.join
      end
    end
    
    puts "writer: flushing remainder of queue..."
    work_queue
  end
end
 
bmdb = BackgroundMessageDatabase.new($dbfile)
 
##########################################
 
puts "stress: generating #{RANDOM_TEXT_BODY_SIZE} byte body of text..."
 
randsrc = [('a'..'z'),('A'..'Z')].map{|i| i.to_a}.flatten;
randstring = (0...RANDOM_TEXT_BODY_SIZE).map{ randsrc[rand(randsrc.length)] }.join
 
##########################################
 
puts "stress: building user list..."
userlist = (1..$tcount.to_i).map { randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - SOURCE_TEXT_MAX), SOURCE_TEXT_MAX] }
 
#########################################
 
#########################################
puts "stress: writing messages..."
bm = Benchmark.measure do
  (0..$mcount.to_i).each do |n|
    dst = userlist[rand($tcount.to_i)]
    src = userlist[rand($tcount.to_i)]
    bod = randstring[rand(RANDOM_TEXT_BODY_SIZE) % (RANDOM_TEXT_BODY_SIZE - MSG_TEXT_MAX), 1 + rand(MSG_TEXT_MAX - 1)]
    bmdb.write(dst, src, bod)
  end
  puts "stress: done writing messages, waiting for write queue to catch up..."
  bmdb.finish
end
 
puts "performance: #{$mcount.to_f / bm.real} msg/sec"
 
#########################################