Created
April 4, 2012 21:40
-
-
Save n3bulous/2305909 to your computer and use it in GitHub Desktop.
Processing Map Reduce Output by Line (first draft)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'slop' | |
require 'aws/s3' | |
class FileProcessor | |
def initialize(file_path) | |
@file_path = file_path | |
end | |
def process(&block) | |
puts "Processing File: #{@file_path}" | |
f = File.open(@file_path, 'r') # readlines no workee? | |
f.each_line { |line| yield line } | |
end | |
end | |
class DirectoryProcessor | |
def initialize(dir_path) | |
@dir_path = dir_path | |
end | |
def process(pattern='part-*', &block) | |
puts "Processing Directory: #{@dir_path}" | |
files = Dir.glob(File.join(@dir_path, pattern)) | |
files.each do |file| | |
processor = FileProcessor.new(file) | |
processor.process(&block) | |
end | |
end | |
end | |
class S3Processor | |
def initialize(bucket, path, options={}) | |
@bucket = bucket | |
@path = path | |
@streaming = options[:streaming] || false | |
self.authenticate | |
end | |
def authenticate | |
AWS::S3::Base.establish_connection!(:access_key_id => ENV['AWS_ACCESS_KEY'], :secret_access_key => ENV['AWS_SECRET_KEY']) | |
unless AWS::S3::Base.connected? | |
$stderr.puts "Could not connect to S3. Have you defined AWS_ACCESS_KEY and AWS_SECRET_KEY in your environment?" | |
exit(1) | |
end | |
end | |
def process(pattern='part-', &block) | |
puts "Processing S3: #{@bucket}/#{@path}" | |
bucket = AWS::S3::Bucket.find(@bucket) | |
files = bucket.objects(:prefix => "#@path/#{pattern}") | |
files.each do |file| | |
puts "Processing S3 File: #{file.key}" | |
buffer = '' | |
AWS::S3::S3Object.stream(file.key, @bucket ) do |chunk| | |
chunk.each_line do |line| | |
if line =~ /\n$/ | |
line = buffer + line | |
yield line | |
buffer = '' | |
else | |
buffer += line | |
end | |
end | |
end | |
end | |
end | |
end | |
class ProcessorFactory | |
def self.create(input) | |
if parts = input.match(/^s3:\/\/(.*)/) | |
bucket, path = parts[1].split('/', 2) | |
S3Processor.new(bucket,path) | |
elsif Dir.exists?(input) | |
DirectoryProcessor.new(input) | |
elsif File.exists?(input) | |
FileProcessor.new(input) | |
else | |
nil | |
end | |
end | |
end | |
class MapReduce2MysqlImport | |
def initialize | |
opts = Slop.parse(:help => true) do | |
on :output=, "Where the output should be written" | |
on :input=, "The source data, can be a file, directory or s3://bucket/path" | |
end | |
@output_file = opts['output'] | |
@processor = ProcessorFactory.create(opts['input']) | |
end | |
def run | |
if @processor.nil? | |
$stderr.puts "Input file or directory doesn't exist." | |
exit(1) | |
end | |
output = File.open(@output_file, 'w') | |
@processor.process do |line| | |
# Do something with a line | |
end | |
end | |
end | |
runner = MapReduce2MysqlImport.new | |
runner.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment