Skip to content

Instantly share code, notes, and snippets.

@n3bulous
Created April 4, 2012 21:40
Show Gist options
  • Save n3bulous/2305909 to your computer and use it in GitHub Desktop.
Save n3bulous/2305909 to your computer and use it in GitHub Desktop.
Processing Map Reduce Output by Line (first draft)
#!/usr/bin/env ruby
require 'slop'
require 'aws/s3'
class FileProcessor
def initialize(file_path)
@file_path = file_path
end
def process(&block)
puts "Processing File: #{@file_path}"
f = File.open(@file_path, 'r') # readlines no workee?
f.each_line { |line| yield line }
end
end
class DirectoryProcessor
def initialize(dir_path)
@dir_path = dir_path
end
def process(pattern='part-*', &block)
puts "Processing Directory: #{@dir_path}"
files = Dir.glob(File.join(@dir_path, pattern))
files.each do |file|
processor = FileProcessor.new(file)
processor.process(&block)
end
end
end
class S3Processor
def initialize(bucket, path, options={})
@bucket = bucket
@path = path
@streaming = options[:streaming] || false
self.authenticate
end
def authenticate
AWS::S3::Base.establish_connection!(:access_key_id => ENV['AWS_ACCESS_KEY'], :secret_access_key => ENV['AWS_SECRET_KEY'])
unless AWS::S3::Base.connected?
$stderr.puts "Could not connect to S3. Have you defined AWS_ACCESS_KEY and AWS_SECRET_KEY in your environment?"
exit(1)
end
end
def process(pattern='part-', &block)
puts "Processing S3: #{@bucket}/#{@path}"
bucket = AWS::S3::Bucket.find(@bucket)
files = bucket.objects(:prefix => "#@path/#{pattern}")
files.each do |file|
puts "Processing S3 File: #{file.key}"
buffer = ''
AWS::S3::S3Object.stream(file.key, @bucket ) do |chunk|
chunk.each_line do |line|
if line =~ /\n$/
line = buffer + line
yield line
buffer = ''
else
buffer += line
end
end
end
end
end
end
class ProcessorFactory
def self.create(input)
if parts = input.match(/^s3:\/\/(.*)/)
bucket, path = parts[1].split('/', 2)
S3Processor.new(bucket,path)
elsif Dir.exists?(input)
DirectoryProcessor.new(input)
elsif File.exists?(input)
FileProcessor.new(input)
else
nil
end
end
end
class MapReduce2MysqlImport
def initialize
opts = Slop.parse(:help => true) do
on :output=, "Where the output should be written"
on :input=, "The source data, can be a file, directory or s3://bucket/path"
end
@output_file = opts['output']
@processor = ProcessorFactory.create(opts['input'])
end
def run
if @processor.nil?
$stderr.puts "Input file or directory doesn't exist."
exit(1)
end
output = File.open(@output_file, 'w')
@processor.process do |line|
# Do something with a line
end
end
end
runner = MapReduce2MysqlImport.new
runner.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment