Skip to content

Instantly share code, notes, and snippets.

@we4tech
Created June 12, 2014 08:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save we4tech/d8384e27ff657d2b8819 to your computer and use it in GitHub Desktop.
Save we4tech/d8384e27ff657d2b8819 to your computer and use it in GitHub Desktop.
Converts data from mongodb to CSV, just an example code (which utilizes multi threads and queue in ruby)
require 'rubygems'
require 'bundler/setup'
require 'csv'
require 'pp'
# Configure and load dependencies
env = :development
config = {
'development' => {
'uri' => 'mongodb://localhost:27017/some_database'
}
}
Bundler.require :default
MongoMapper.setup config, env
# Define models
module BasicFields
extend ActiveSupport::Concern
included do
include MongoMapper::Document
key :freebase_id, String
key :name, Hash, default: {}
end
def all_names
self.name.map { |k, v| "#{v} (#{k})" }.join(', ')
end
end
class Film
include BasicFields
key :initial_release_year, Integer
key :genre_ids, Array
key :director_ids, Array
key :country_ids, Array
key :cast_member_ids, Array
key :music_composer_ids, Array
many :genres, in: :genre_ids
many :directors, in: :director_ids
many :countries, in: :country_ids
many :cast_members, in: :cast_member_ids
many :music_composers, in: :music_composer_ids
end
class Genre
include BasicFields
end
class Director
include BasicFields
end
class Country
include BasicFields
end
class CastMember
include BasicFields
end
class MusicComposer
include BasicFields
end
class FilmCsvPresenter
CSV_HEADERS = %w|freebase_id name year genres directors countries casts composers|
EMPTY = 'Empty'
def initialize(film)
@film = film
end
def to_csv
[].tap do |cols|
cols << @film.freebase_id
cols << @film.all_names.presence || EMPTY
cols << @film.initial_release_year.presence || EMPTY
cols << @film.genres.map(&:all_names).join(', ').presence || EMPTY
cols << @film.directors.map(&:all_names).join(', ').presence || EMPTY
cols << @film.countries.map(&:all_names).join(', ').presence || EMPTY
cols << @film.cast_members.map(&:all_names).join(', ').presence || EMPTY
cols << @film.music_composers.map(&:all_names).join(', ').presence || EMPTY
end
end
end
class FilmsExporter
def initialize
@queue = Queue.new
@files = []
@splits = 4
@worker_threads = []
@all_jobs_generated = false
end
def store_as_csv!(file)
out "====> Store CSV into #{file}".colorize(:green)
started_at = Time.now
generate_jobs
create_workers(file)
merge_results!(file)
out "Total spent: #{(Time.now.to_f - started_at.to_f)} secs.".colorize(:green)
end
def merge_results!(final_file)
Thread.new do
while !@all_jobs_generated
sleep 0.1
end
end.join
benchmark 'MERGE' do
@files.each do |worker_file|
out "----> Merging #{worker_file} into #{final_file}".colorize(:red)
`cat #{worker_file} | tee -a #{final_file}`
`rm #{worker_file}`
end
out '----> Merging completed.'.colorize(:green)
end
end
def generate_jobs
Thread.new do
benchmark 'GEN_JOBS' do
idx = 0
Film.find_each do |film|
idx += 1
out "----> Adding [#{idx}] #{film.freebase_id}...".colorize(:yellow)
@queue << FilmCsvPresenter.new(film).to_csv
end
@all_jobs_generated = true
until @worker_threads.map(&:alive?).none?
@queue << :eof
end
end
end
end
def create_workers(file)
@splits.times { |i| create_worker i, file }
end
def create_worker(index, file)
worker_file = "#{file}_#{index}"
@files << worker_file
@worker_threads << Thread.new do
benchmark "Worker[#{index}]" do
out "----> Started worker [#{index}]".colorize(:green)
CSV.open(worker_file, 'w') do |csv|
while (data = @queue.pop)
if data == :eof
out "====> Worker [#{index}] is closing.".colorize(:green)
break
end
out '<---- Writing to file'.colorize(:light_white)
csv << data
end
end
out "====> Worker: #{worker_file} is completed.".colorize(:green)
end
end
end
def benchmark(label)
out "====> Starting #{label}".colorize(:green)
started = Time.now
yield
took = (Time.now.to_f - started.to_f)
out "====> Ended #{label}".colorize(:green)
took_str = "====> #{label} Took: #{took} secs.".colorize(:green)
puts took_str
`echo #{took_str} | tee -a jobs_stat`
end
def out(msg)
@out_on ||= 'true' == ENV['OUT']
puts msg if @out_on
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment