Last active
August 29, 2015 14:25
-
-
Save brixen/bc9e2a88338439bee855 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requirements: | |
# | |
# * AWS credentials set in AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY | |
# * A local PostgreSQL database | |
# | |
# ## Gems | |
# | |
# gem install sequel progress_bar pg aws-sdk oga | |
# | |
# ## PostgreSQL | |
# | |
# By default this script connects to a PostgreSQL database called "test" using | |
# user "root" running at localhost. You can change these settings using | |
# environment variables (see below) if needed. You can create the DB as | |
# following: | |
# | |
# createdb -U your-postgres-super-user -E UTF8 -O postgres-user-for-this-script test | |
# | |
# For example: | |
# | |
# createdb -U root -E UTF8 -O root test | |
# | |
# ## Running | |
# | |
# Once the database has been set up simple run this script. It might take a few | |
# tries to get it to crash, but it will crash eventually. Increasing the amount | |
# of dummy data records can increase the workdload thus making it crash faster | |
# (hopefully). | |
# | |
require 'thread' | |
require 'sequel' | |
require 'progress_bar' | |
require 'aws-sdk' | |
require 'json' | |
require 'oga' | |
require 'securerandom' | |
Thread.abort_on_exception = true | |
puts 'Connecting...' | |
db_host = ENV['DB_HOST'] || 'localhost' | |
db_name = ENV['DB_NAME'] || 'test' | |
db_user = ENV['DB_USER'] || ENV['USER'] || 'root' | |
db_pass = ENV['DB_PASSWORD'] | |
db_options = {:test => true, :max_connections => 60, :encoding => 'UTF-8'} | |
if defined?(JRuby) | |
DB = Sequel.connect( | |
"jdbc:postgresql://#{db_host}/#{db_name}?user=#{db_user}&password=#{db_pass}", | |
db_options | |
) | |
else | |
DB = Sequel.connect( | |
"postgres://#{db_user}:#{db_pass}@#{db_host}/#{db_name}", | |
db_options | |
) | |
end | |
Sequel.extension(:migration) | |
puts 'Creating table...' | |
DB.drop_table?(:reviews) | |
DB.create_table :reviews do | |
primary_key :id | |
String :reviewer_origin | |
String :travel_type | |
String :travel_composition | |
end | |
puts 'Preparing dummy data...' | |
dummy_count = 50_000 | |
DB[:reviews].multi_insert(dummy_count.times.map { {:reviewer_origin => SecureRandom.hex} }) | |
DB[:reviews].multi_insert(dummy_count.times.map { {:travel_type => SecureRandom.hex} }) | |
DB[:reviews].multi_insert(dummy_count.times.map { {:travel_composition => SecureRandom.hex} }) | |
queue_url = Aws::SQS::Client.new | |
.create_queue(:queue_name => 'example') | |
.queue_url | |
reviews = DB[:reviews] | |
.where('reviewer_origin IS NOT NULL OR travel_type IS NOT NULL or travel_composition IS NOT NULL') | |
.count | |
last_id = 0 | |
iterated = 0 | |
queue = Queue.new | |
output = Queue.new | |
control = Queue.new | |
progress = ProgressBar.new(reviews) | |
puts 'Starting threads...' | |
threads = 10.times.map do | |
Thread.new do | |
thread_sqs = Aws::SQS::Client.new | |
loop do | |
entries = queue.pop.map do |review| | |
{:id => SecureRandom.hex, :message_body => JSON.dump(review)} | |
end | |
loop do | |
begin | |
thread_sqs.send_message_batch(:queue_url => queue_url, :entries => entries) | |
break | |
rescue Aws::SQS::Errors::SignatureDoesNotMatch => error | |
Rubinius::Logger.log_exception "AWS error", error | |
# Stupid AWS signature errors, lets just ignore them so we can | |
# actually run this script. | |
rescue Object => error | |
Rubinius::Logger.log_exception "general processing error", error | |
end | |
end | |
output << entries.length | |
end | |
end | |
end | |
filler = Thread.new do | |
while iterated < reviews | |
reviews_page = DB[:reviews] | |
.select(:id, :reviewer_origin, :travel_type, :travel_composition) | |
.where('reviewer_origin IS NOT NULL OR travel_type IS NOT NULL or travel_composition IS NOT NULL') | |
.where { id > last_id } | |
.order(:id) | |
.limit(100) | |
.to_a | |
break if reviews_page.empty? | |
reviews_page.each_slice(10) do |slice| | |
queue << slice | |
end | |
last_id = reviews_page.last[:id] | |
end | |
control << :terminate | |
end | |
loop do | |
terminate = !control.empty? && control.pop == :terminate | |
break if terminate && queue.empty? && output.empty? | |
progress.increment!(output.pop) | |
end | |
filler.join |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source 'https://rubygems.org' | |
gem 'sequel' | |
gem 'progress_bar' | |
gem 'pg' | |
gem 'aws-sdk' | |
gem 'oga' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
task :createdb do | |
user = ENV["USER"] || "postgres" | |
sh "createdb -U #{user} -E UTF8 -O #{user} test" | |
end | |
task :default => :createdb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment