Created
January 22, 2014 07:13
-
-
Save w00lf/8554671 to your computer and use it in GitHub Desktop.
Filling tables with objects from xml, with forked insertion and also creation of dump for fast insert.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace :fias do | |
namespace :parse_xml do | |
class DumpCreator | |
# creates psql dump from template | |
attr_reader :target_file, :current_attributes | |
def initialize(target_file) | |
@target_file = target_file | |
target_file = File.open(target_file, 'a') | |
@stack = [] | |
end | |
def create_attributes(attrs) | |
@current_attributes = { | |
'aoguid' => '\N', | |
'buildnum' => '\N', | |
'counter' => '\N', | |
'eststatus' => '\N', | |
'houseguid' => '\N', | |
'houseid' => '\N', | |
'housenum' => '\N', | |
'ifnsfl' => '\N', | |
'ifnsul' => '\N', | |
'normdoc' => '\N', | |
'okato' => '\N', | |
'oktmo' => '\N', | |
'postalcode' => '\N', | |
'statstatus' => '\N', | |
'strstatus' => '\N', | |
'strucnum' => '\N', | |
'terrifnsfl' => '\N', | |
'terrifnsul' => '\N', | |
'startdate' => '\N', | |
'enddate' => '\N', | |
'updatedate' => '\N', | |
'created_at' => Time.now, | |
'updated_at' => Time.now | |
} | |
attrs.map do |mass| | |
attr_name = mass[0].downcase | |
current_attributes[attr_name] = mass[1] | |
end | |
end | |
def write_attributes | |
order = [ | |
'aoguid', | |
'buildnum', | |
'counter', | |
'eststatus', | |
'houseguid', | |
'houseid', | |
'housenum', | |
'ifnsfl', | |
'ifnsul', | |
'normdoc', | |
'okato', | |
'oktmo', | |
'postalcode', | |
'statstatus', | |
'strstatus', | |
'strucnum', | |
'terrifnsfl', | |
'terrifnsul', | |
'startdate', | |
'enddate', | |
'updatedate', | |
'created_at', | |
'updated_at' | |
] | |
result = [] | |
order.length.times {|n| result.push(current_attributes[order.shift()]) } | |
target_file.write("#{result.join("\t")}\n") | |
end | |
def end_parse | |
end_line = '\.' | |
target_file.write(end_line) | |
target_file.close() | |
end | |
end | |
class ModelWriter | |
attr_reader :model, :current_attributes | |
def initialize(model) | |
@model = model | |
@logger = Logger.new("#{Rails.root}/log/#{model}_Update_#{Time.now.to_i}.log") | |
@start_time = Time.now | |
@created_count = 0 | |
@failed_count = 0 | |
end | |
def create_attributes(attrs) | |
@current_attributes = {} | |
attrs.map do |mass| | |
attr_name = mass[0].downcase | |
next unless model.attribute_names.include?(attr_name) | |
@current_attributes[attr_name] = mass[1] | |
end | |
rescue PG::Error | |
ActiveRecord::Base.connection.disconnect! | |
config = Rails.application.config.database_configuration[Rails.env] | |
ActiveRecord::Base.establish_connection(config) | |
end | |
def write_attributes | |
model.create(@current_attributes) | |
@created_count += 1 | |
rescue Exception => e | |
@failed_count += 1 | |
@logger.error("Cannot create entry from attributes: #{@current_attributes.inspect}, reason: #{e.message}") | |
end | |
def end_parse | |
@logger.info("Finished, created: #{@created_count}, failed: #{@failed_count}") | |
end | |
end | |
class ModelUpdater < ModelWriter | |
def initialize(model) | |
super(model) | |
if GC.respond_to?(:copy_on_write_friendly=) | |
GC.copy_on_write_friendly = true | |
end | |
@current_primary_key = { FiasAddrobj => 'aoid', FiasHome => 'houseid' }[model] | |
@updated_count = 0 | |
@created_count = 0 | |
@failed_count = 0 | |
@create_queue = [] | |
@update_queue = [] | |
end | |
def formated_changes(changes) | |
changes.map {|x,y| "\t#{x}: #{y[0]} => #{y[1]}\n" } | |
end | |
def fork_and_itearte(queue, &block) | |
5.times do |i| | |
Process.fork do | |
ActiveRecord::Base.connection.disconnect! | |
config = Rails.application.config.database_configuration[Rails.env] | |
ActiveRecord::Base.establish_connection(config) | |
queue[i*500..(i+1)*500-1].each do |obj| | |
yield(obj) | |
end | |
exit | |
end | |
end | |
Process.waitall | |
end | |
def fork_and_write(type) | |
case type | |
when :create | |
fork_and_itearte(@create_queue) do |attributes| | |
create_object(attributes) | |
end | |
@create_queue = [] | |
when :update | |
fork_and_itearte(@update_queue) do |attributes| | |
update_object(attributes) | |
end | |
@update_queue = [] | |
end | |
end | |
def create_object(attributes) | |
model.create(attributes) | |
@created_count += 1 | |
@logger.info("Created new #{model}: #{attributes[@current_primary_key]}") | |
rescue Exception => e | |
@logger.error("Failed to create #{model}, #{attributes[@current_primary_key]}, #{e.message}") | |
@failed_count += 1 | |
@logger.error("Failed to create #{model}, #{attributes[@current_primary_key]}, attributes: #{attributes.inspect}") | |
end | |
def update_object(attributes) | |
@updating_object.attributes= @current_attributes | |
changes = @updating_object.changes | |
if @updating_object.save | |
@updated_count += 1 | |
@logger.info("Updated #{model}, #{@updating_object.send(@current_primary_key)}, changes: #{formated_changes(changes).join}") | |
else | |
@failed_count += 1 | |
@logger.error("Failed to update #{model}, #{@updating_object.send(@current_primary_key)}, changes: #{formated_changes(changes).join}") | |
end | |
end | |
def write_attributes | |
if @updating_object = (model.unscoped.find(@current_attributes[@current_primary_key]) rescue nil) | |
if @update_queue.length < 2500 | |
@update_queue.push(@current_attributes) | |
else | |
fork_and_write(:update) | |
end | |
else | |
if @create_queue.length < 2500 | |
@create_queue.push(@current_attributes) | |
else | |
fork_and_write(:create) | |
end | |
end | |
end | |
def end_parse | |
@logger.info("Updated: #{@updated_count}, created: #{@created_count}, failed: #{@failed_count}") | |
end | |
end | |
module AbstractParser | |
attr_reader :io, :progress, :logger, :handler, :obj_tag | |
def initialize(io, handler, obj_tag) | |
@obj_tag = obj_tag | |
@handler = handler | |
@io = io | |
@progress = ProgressBar.new("Converting", io.size) | |
@tags_count = 0 | |
@writed_count = 0 | |
start_parse() | |
@progress.finish | |
end | |
def start_parse; end | |
def start_element name, attrs = [] | |
return unless name.to_s.eql?(obj_tag) | |
handler.create_attributes(attrs) | |
end | |
def end_element(name) | |
handler.write_attributes | |
progress.set(io.pos) | |
end | |
def end_document | |
handler.end_parse() | |
end | |
end | |
class NokogiriParse < Nokogiri::XML::SAX::Document | |
include AbstractParser | |
def initialize(io, target_model, obj_tag) | |
super(io, target_model, obj_tag) | |
end | |
def start_parse | |
Nokogiri::XML::SAX::Parser.new(self).parse(io) | |
end | |
end | |
def retrive_xml_link(options = {}) | |
client = Savon.client(wsdl: "http://fias.nalog.ru/WebServices/Public/DownloadService.asmx?WSDL") | |
responce = client.call(:get_last_download_file_info).body[:get_last_download_file_info_response][:get_last_download_file_info_result] | |
responce[:fias_complete_xml_url] if options[:type] == :import | |
responce[:fias_delta_xml_url] if options[:type] == :update | |
end | |
def take_from_directory(dir, selectors, &block) | |
dir_files = [] | |
result = {} | |
Dir.foreach(dir) do |item| | |
next if item == '.' or item == '..' | |
selectors.each do |index, val| | |
result[index] = File.open(File.join(dir, item)) if item =~ val | |
end | |
dir_files.push(item) | |
end | |
result[:total] = dir_files | |
yield(result) | |
end | |
def download_and_extract(link, selectors, &block) | |
Dir.mktmpdir("fias_xml_#{Time.now.strftime('%d_%m_%y')}") do |dir| | |
uri_obj = URI.parse(link) | |
file = open(File.join(dir, 'fias.rar'), 'wb') | |
Net::HTTP.start(uri_obj.host) do |http| | |
begin | |
http.request_get(uri_obj.request_uri) do |response| | |
response.read_body do |segment| | |
file.write(segment) | |
end | |
end | |
ensure | |
file.rewind | |
file.close | |
end | |
end | |
p(`unrar e #{file.path} #{dir}`) | |
temp_dir_files = [] | |
result = {} | |
Dir.foreach(dir) do |item| | |
next if item == '.' or item == '..' | |
selectors.each do |index, val| | |
result[index] = File.open(File.join(dir, item)) if item =~ val | |
end | |
temp_dir_files.push(item) | |
end | |
result[:total] = temp_dir_files | |
yield(result) | |
end | |
end | |
def handle_model_parsing(parameters, model_handler) | |
(p("Cannot find xml file for addrobjs, dir files list: #{parameters[:total].join("\n")}"); exit) if parameters[:fias_addrobj].nil? | |
NokogiriParse.new(parameters[:fias_addrobj], model_handler.new(FiasAddrobj), 'Object') | |
p "Parsed all addrobjs, see log/ directory for log file" | |
sleep 10 | |
(p("Cannot find xml file for houses, dir files list: #{parameters[:total].join("\n")}"); exit) if parameters[:fias_homes].nil? | |
NokogiriParse.new(parameters[:fias_homes], model_handler.new(FiasHome), 'House') | |
p "Parsed all houses, see log/ directory for log file" | |
end | |
desc 'Retrieve last actual xml from official site and parse addrobjs and houses into db' | |
task :import, [:dir] => :environment do |task, args| | |
selectors_hash = {fias_addrobj: /AS_ADDROBJ_/, fias_homes: /AS_HOUSE_/} | |
if args[:dir].blank? | |
p "Downloading last actual fias xml file from nalog.ru" | |
xml_url = retrive_xml_link(type: :import) | |
download_and_extract(xml_url, selectors_hash) do |parameters| | |
handle_model_parsing(parameters, ModelWriter) | |
end | |
else | |
p "Parsing files from #{args[:dir]}" | |
take_from_directory(args[:dir], selectors_hash) do |parameters| | |
handle_model_parsing(parameters, ModelWriter) | |
end | |
end | |
end | |
desc 'Retrieve delta xml from site, extract and update databases' | |
task :update, [:dir] => :environment do |task, args| | |
selectors_hash = {fias_addrobj: /AS_ADDROBJ_/, fias_homes: /AS_HOUSE_/} | |
if args[:dir].blank? | |
p "Downloading last update from nalog.ru" | |
xml_url = retrive_xml_link(type: :update) | |
download_and_extract(xml_url,selectors_hash) do |parameters| | |
handle_model_parsing(parameters, ModelUpdater) | |
end | |
else | |
p "Updating models from directory: #{args[:dir]}" | |
take_from_directory(args[:dir], selectors_hash) do |parameters| | |
handle_model_parsing(parameters, ModelUpdater) | |
end | |
end | |
end | |
# desc 'Creating psql dump file from fias xml' | |
# task :dump => :environment do | |
# read_path = File.join('/Users/guest/', 'Downloads', 'fias_xml', 'AS_HOUSE_20131123_05075930-b4eb-4278-a4f4-eef9cbf3d292.XML') | |
# NokogiriParse.new(File.open(read_path), DumpCreator.new(File.join(Rails.root, 'ruby_dump.sql')), 'House') | |
# end | |
end | |
desc 'Add full_address to existed fias_addrobjs' | |
task :add_full_address => :environment do | |
partial = FiasAddrobj.count/10000 | |
if GC.respond_to?(:copy_on_write_friendly=) | |
GC.copy_on_write_friendly = true | |
end | |
partial.times do |k| | |
10.times do |i| | |
Process.fork do | |
p 'Forked proccess' | |
ActiveRecord::Base.connection.disconnect! | |
config = Rails.application.config.database_configuration[Rails.env] | |
ActiveRecord::Base.establish_connection(config) | |
FiasAddrobj.find_each(offset: ((k * 10000) + (i * 1000)), conditions: "full_address is NULL") do |obj| | |
full_addr = "#{obj.formalname}" | |
full_addr = "#{obj.ancestors.map {|n| "#{n.shortname} #{n.formalname}" }.join(', ')}, #{obj.shortname} #{full_addr}" unless obj.ancestors.nil? | |
next if full_addr == obj.full_address | |
res = obj.update_attribute(:full_address, full_addr) | |
p "updated aoid: #{obj.aoid}" if res | |
end | |
exit | |
end | |
end | |
Process.waitall | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment