Skip to content

Instantly share code, notes, and snippets.

@jgautsch
Created August 22, 2014 02:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgautsch/cf1d82bd8f348c121640 to your computer and use it in GitHub Desktop.
Save jgautsch/cf1d82bd8f348c121640 to your computer and use it in GitHub Desktop.
the Map script for the NPI dataset MapReduce
# ###########################################################
# Configuration
# ###########################################################
# Load all Gem and ENV dependencies
require 'rubygems'
require 'bundler/setup'
require 'dotenv'
Dotenv.load
require 'require_all'
require 'mongoid'
require 'geocoder'
require 'hashie'
require 'csv'
require 'thread'
require 'active_support/core_ext/integer/time'
require 'active_support/core_ext/numeric/time'
# Load Mongoid config
Mongoid.load!("#{File.dirname(__FILE__)}/mongoid.yml", :production)
# Load all classes
require "#{File.dirname(__FILE__)}/classes.rb"
# Configure Geocoder
Geocoder.configure(
lookup: :dstk,
host: "http://dstk-lb-1122339968.us-east-1.elb.amazonaws.com/",
timeout: 2
)
# ###########################################################
# Map Script
# ###########################################################
taxonomies = DataProcessing::TaxonomyCache.new
input = Queue.new
threads = []
num_threads = 10
@pushing_finished = false
@threads_processing = Array.new(num_threads) { false }
# Producer Thread
# Read from the STDIN and add to a buffered input queue (glorified array)
threads << Thread.new(input) do |ip|
puts "Starting producer loop"
loop do
unless ip.size > 100
if line = STDIN.gets
ip.push(line)
else
@pushing_finished = true
break
end
end
end
end
# Consumer Threads
num_threads.times do |i|
# Handle the input passed by the reader thread
threads << Thread.new(input) do |ip|
puts "Starting consumer loop #{i}"
loop do
if ip.size == 0 and @pushing_finished and !@threads_processing.reduce { |e,r| e && r }
sleep(3)
Thread.exit
else
unless ip.empty?
@threads_processing[i] = true
row = ip.pop
row_object = DataProcessing::Stream::RowObjectBuilder.new(row, taxonomies)
row_object.objectify
@threads_processing[i] = false
end
end
end
end
end
threads.map(&:join)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment