Skip to content

Instantly share code, notes, and snippets.

@thatandyrose
Created October 8, 2014 15:22
Show Gist options
  • Save thatandyrose/e766fbd6e9ba0d39316d to your computer and use it in GitHub Desktop.
Save thatandyrose/e766fbd6e9ba0d39316d to your computer and use it in GitHub Desktop.
GTFS importer snippets
require 'csv'
require 'open-uri'
module GTFS
class AgencyImporter < BaseImporter
def import(agency_ids = [])
options = {
document_build: ->(row){{agency_id: row[:agency_id], agency_timezone: row[:agency_timezone], agency_name: row[:agency_name]}}
}
if agency_ids.empty?
log("Importing all Agencies.")
else
log("Importing Agencies for ids: #{agency_ids}.")
options[:condition_array] = agency_ids.map(&:to_s)
options[:condition_field] = :agency_id
end
generic_import(options)
end
end
end
module GTFS
class ShapeImporter < BaseImporter
def import
generic_import({
condition_array: Trip.col.distinct(:shape_id),
enforce_condition_delete:true,
condition_field: :shape_id,
document_build: ->(row){
{
shape_id: row[:shape_id],
shape_pt_lat: row[:shape_pt_lat].to_f,
shape_pt_lon: row[:shape_pt_lon].to_f,
shape_pt_sequence: row[:shape_pt_sequence].to_i,
shape_dist_traveled: row[:shape_dist_traveled].to_i
}
}
})
end
end
end
module GTFS
class CalendarDateImporter < BaseImporter
def import
generic_import({
condition_array: Trip.col.distinct(:service_id),
condition_field: :service_id,
document_build: ->(row){{service_id: row[:service_id], effective_on: Time.parse(row[:date]), exception_type: row[:exception_type].to_i}}
})
end
end
end
module GTFS
class BaseImporter
def initialize(file_uri,options = {})
@file_uri = file_uri
@log_lines_threshhold = options[:log_lines_threshold] ? options[:log_lines_threshold] : 10000
collection_name = self.class.to_s.gsub('GTFS::','').gsub('Importer','')
if collection_name != 'Base'
@model_connection = collection_name.constantize
@mongo_col = @model_connection.col
end
end
def open_file
log("Opening file: #{@file_uri}.")
open(@file_uri)
end
def iterate_csv(&block)
file = open_file
log("Iterating file. Size: #{file.size/1024/1024} MB")
row_count = 0
import_count = 0
header = nil
file.each_line do |line|
row = line.parse_csv
row = sanitize_csv_row_values(row)
if row_count > 0 #we don't want to import the header row.
import_count += 1 if block.call(create_row_hash(header,row))
else
header = row
log("First line read. Next log will be in #{@log_lines_threshhold} lines")
end
row_count += 1
log("#{row_count} row(s) iterated. #{import_count} imported.") if is_a_nth(row_count,@log_lines_threshhold)
end
log("#{import_count} row(s) out of #{row_count - 1} imported.")
end
def generic_import(o = {})
log("Importing for #{@model_connection.col_name}.")
if o[:condition_array]
condition_block = ->(row){o[:condition_array].include?(row[o[:condition_field]])}
else
condition_block = ->(row){true}
end
log("Delete documets for #{@model_connection.col_name} before we start.")
@mongo_col.drop
if !o[:condition_array] || o[:condition_array].any?
log("Importer will Iterate CSV")
import_by_iterating(o.merge(condition_block:condition_block))
else
log("No imports for #{@model_connection.col_name} as condition array was empty")
end
end
def sanitize_csv_row_values(row_array)
row_array.map{|val|val.to_s.strip}
end
def create_row_hash(header_array, values_array)
new_hash = {}
header_array.each_with_index do |key, i|
#remove weird char from beginning
if !key[0].match(/^[a-zA-Z0-9_-]+$/)
key.slice!(0)
end
new_hash = new_hash.merge(key.to_sym => values_array[i])
end
new_hash
end
def is_a_nth(count,n)
(count.to_f/n).to_i == (count.to_f/n)
end
def log(msg)
puts "#{DateTime.now.to_s(:short)}: #{msg}"
end
def import_by_iterating(options)
document_build_block = options[:document_build]
enforce_condition_delete = options[:enforce_condition_delete]
condition_field = options[:condition_field]
condition_array = options[:condition_array]
condition_block = options[:condition_block]
batch = []
batch_size = 30000
inserts = 0
iterate_csv do |row|
if enforce_condition_delete || condition_block.call(row)
batch << document_build_block.call(row)
if batch.size >= batch_size
inserts += batch.size
@mongo_col.insert(batch)
batch = []
end
true
end
end
@mongo_col.insert(batch) if batch.size > 0
inserts += batch.size
batch = nil
actual_count = @mongo_col.count
log("Checking reported count was inserted. reported:#{inserts}, actual:#{actual_count}")
if enforce_condition_delete
log("enforce_condition_delete is true. Lets delete some data.")
@mongo_col.remove(condition_field => {'$nin' => condition_array})
removed_count = actual_count - @mongo_col.count
log("#{removed_count} documents removed from #{@model_connection.col_name}")
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment