Skip to content

Instantly share code, notes, and snippets.

@w00lf
Created January 22, 2014 07:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save w00lf/8554671 to your computer and use it in GitHub Desktop.
Save w00lf/8554671 to your computer and use it in GitHub Desktop.
Filling tables with objects from xml, with forked insertion and also creation of dump for fast insert.
namespace :fias do
namespace :parse_xml do
class DumpCreator
# creates psql dump from template
attr_reader :target_file, :current_attributes
def initialize(target_file)
@target_file = target_file
target_file = File.open(target_file, 'a')
@stack = []
end
def create_attributes(attrs)
@current_attributes = {
'aoguid' => '\N',
'buildnum' => '\N',
'counter' => '\N',
'eststatus' => '\N',
'houseguid' => '\N',
'houseid' => '\N',
'housenum' => '\N',
'ifnsfl' => '\N',
'ifnsul' => '\N',
'normdoc' => '\N',
'okato' => '\N',
'oktmo' => '\N',
'postalcode' => '\N',
'statstatus' => '\N',
'strstatus' => '\N',
'strucnum' => '\N',
'terrifnsfl' => '\N',
'terrifnsul' => '\N',
'startdate' => '\N',
'enddate' => '\N',
'updatedate' => '\N',
'created_at' => Time.now,
'updated_at' => Time.now
}
attrs.map do |mass|
attr_name = mass[0].downcase
current_attributes[attr_name] = mass[1]
end
end
def write_attributes
order = [
'aoguid',
'buildnum',
'counter',
'eststatus',
'houseguid',
'houseid',
'housenum',
'ifnsfl',
'ifnsul',
'normdoc',
'okato',
'oktmo',
'postalcode',
'statstatus',
'strstatus',
'strucnum',
'terrifnsfl',
'terrifnsul',
'startdate',
'enddate',
'updatedate',
'created_at',
'updated_at'
]
result = []
order.length.times {|n| result.push(current_attributes[order.shift()]) }
target_file.write("#{result.join("\t")}\n")
end
def end_parse
end_line = '\.'
target_file.write(end_line)
target_file.close()
end
end
class ModelWriter
attr_reader :model, :current_attributes
def initialize(model)
@model = model
@logger = Logger.new("#{Rails.root}/log/#{model}_Update_#{Time.now.to_i}.log")
@start_time = Time.now
@created_count = 0
@failed_count = 0
end
def create_attributes(attrs)
@current_attributes = {}
attrs.map do |mass|
attr_name = mass[0].downcase
next unless model.attribute_names.include?(attr_name)
@current_attributes[attr_name] = mass[1]
end
rescue PG::Error
ActiveRecord::Base.connection.disconnect!
config = Rails.application.config.database_configuration[Rails.env]
ActiveRecord::Base.establish_connection(config)
end
def write_attributes
model.create(@current_attributes)
@created_count += 1
rescue Exception => e
@failed_count += 1
@logger.error("Cannot create entry from attributes: #{@current_attributes.inspect}, reason: #{e.message}")
end
def end_parse
@logger.info("Finished, created: #{@created_count}, failed: #{@failed_count}")
end
end
class ModelUpdater < ModelWriter
def initialize(model)
super(model)
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
@current_primary_key = { FiasAddrobj => 'aoid', FiasHome => 'houseid' }[model]
@updated_count = 0
@created_count = 0
@failed_count = 0
@create_queue = []
@update_queue = []
end
def formated_changes(changes)
changes.map {|x,y| "\t#{x}: #{y[0]} => #{y[1]}\n" }
end
def fork_and_itearte(queue, &block)
5.times do |i|
Process.fork do
ActiveRecord::Base.connection.disconnect!
config = Rails.application.config.database_configuration[Rails.env]
ActiveRecord::Base.establish_connection(config)
queue[i*500..(i+1)*500-1].each do |obj|
yield(obj)
end
exit
end
end
Process.waitall
end
def fork_and_write(type)
case type
when :create
fork_and_itearte(@create_queue) do |attributes|
create_object(attributes)
end
@create_queue = []
when :update
fork_and_itearte(@update_queue) do |attributes|
update_object(attributes)
end
@update_queue = []
end
end
def create_object(attributes)
model.create(attributes)
@created_count += 1
@logger.info("Created new #{model}: #{attributes[@current_primary_key]}")
rescue Exception => e
@logger.error("Failed to create #{model}, #{attributes[@current_primary_key]}, #{e.message}")
@failed_count += 1
@logger.error("Failed to create #{model}, #{attributes[@current_primary_key]}, attributes: #{attributes.inspect}")
end
def update_object(attributes)
@updating_object.attributes= @current_attributes
changes = @updating_object.changes
if @updating_object.save
@updated_count += 1
@logger.info("Updated #{model}, #{@updating_object.send(@current_primary_key)}, changes: #{formated_changes(changes).join}")
else
@failed_count += 1
@logger.error("Failed to update #{model}, #{@updating_object.send(@current_primary_key)}, changes: #{formated_changes(changes).join}")
end
end
def write_attributes
if @updating_object = (model.unscoped.find(@current_attributes[@current_primary_key]) rescue nil)
if @update_queue.length < 2500
@update_queue.push(@current_attributes)
else
fork_and_write(:update)
end
else
if @create_queue.length < 2500
@create_queue.push(@current_attributes)
else
fork_and_write(:create)
end
end
end
def end_parse
@logger.info("Updated: #{@updated_count}, created: #{@created_count}, failed: #{@failed_count}")
end
end
module AbstractParser
attr_reader :io, :progress, :logger, :handler, :obj_tag
def initialize(io, handler, obj_tag)
@obj_tag = obj_tag
@handler = handler
@io = io
@progress = ProgressBar.new("Converting", io.size)
@tags_count = 0
@writed_count = 0
start_parse()
@progress.finish
end
def start_parse; end
def start_element name, attrs = []
return unless name.to_s.eql?(obj_tag)
handler.create_attributes(attrs)
end
def end_element(name)
handler.write_attributes
progress.set(io.pos)
end
def end_document
handler.end_parse()
end
end
class NokogiriParse < Nokogiri::XML::SAX::Document
include AbstractParser
def initialize(io, target_model, obj_tag)
super(io, target_model, obj_tag)
end
def start_parse
Nokogiri::XML::SAX::Parser.new(self).parse(io)
end
end
def retrive_xml_link(options = {})
client = Savon.client(wsdl: "http://fias.nalog.ru/WebServices/Public/DownloadService.asmx?WSDL")
responce = client.call(:get_last_download_file_info).body[:get_last_download_file_info_response][:get_last_download_file_info_result]
responce[:fias_complete_xml_url] if options[:type] == :import
responce[:fias_delta_xml_url] if options[:type] == :update
end
def take_from_directory(dir, selectors, &block)
dir_files = []
result = {}
Dir.foreach(dir) do |item|
next if item == '.' or item == '..'
selectors.each do |index, val|
result[index] = File.open(File.join(dir, item)) if item =~ val
end
dir_files.push(item)
end
result[:total] = dir_files
yield(result)
end
def download_and_extract(link, selectors, &block)
Dir.mktmpdir("fias_xml_#{Time.now.strftime('%d_%m_%y')}") do |dir|
uri_obj = URI.parse(link)
file = open(File.join(dir, 'fias.rar'), 'wb')
Net::HTTP.start(uri_obj.host) do |http|
begin
http.request_get(uri_obj.request_uri) do |response|
response.read_body do |segment|
file.write(segment)
end
end
ensure
file.rewind
file.close
end
end
p(`unrar e #{file.path} #{dir}`)
temp_dir_files = []
result = {}
Dir.foreach(dir) do |item|
next if item == '.' or item == '..'
selectors.each do |index, val|
result[index] = File.open(File.join(dir, item)) if item =~ val
end
temp_dir_files.push(item)
end
result[:total] = temp_dir_files
yield(result)
end
end
def handle_model_parsing(parameters, model_handler)
(p("Cannot find xml file for addrobjs, dir files list: #{parameters[:total].join("\n")}"); exit) if parameters[:fias_addrobj].nil?
NokogiriParse.new(parameters[:fias_addrobj], model_handler.new(FiasAddrobj), 'Object')
p "Parsed all addrobjs, see log/ directory for log file"
sleep 10
(p("Cannot find xml file for houses, dir files list: #{parameters[:total].join("\n")}"); exit) if parameters[:fias_homes].nil?
NokogiriParse.new(parameters[:fias_homes], model_handler.new(FiasHome), 'House')
p "Parsed all houses, see log/ directory for log file"
end
desc 'Retrieve last actual xml from official site and parse addrobjs and houses into db'
task :import, [:dir] => :environment do |task, args|
selectors_hash = {fias_addrobj: /AS_ADDROBJ_/, fias_homes: /AS_HOUSE_/}
if args[:dir].blank?
p "Downloading last actual fias xml file from nalog.ru"
xml_url = retrive_xml_link(type: :import)
download_and_extract(xml_url, selectors_hash) do |parameters|
handle_model_parsing(parameters, ModelWriter)
end
else
p "Parsing files from #{args[:dir]}"
take_from_directory(args[:dir], selectors_hash) do |parameters|
handle_model_parsing(parameters, ModelWriter)
end
end
end
desc 'Retrieve delta xml from site, extract and update databases'
task :update, [:dir] => :environment do |task, args|
selectors_hash = {fias_addrobj: /AS_ADDROBJ_/, fias_homes: /AS_HOUSE_/}
if args[:dir].blank?
p "Downloading last update from nalog.ru"
xml_url = retrive_xml_link(type: :update)
download_and_extract(xml_url,selectors_hash) do |parameters|
handle_model_parsing(parameters, ModelUpdater)
end
else
p "Updating models from directory: #{args[:dir]}"
take_from_directory(args[:dir], selectors_hash) do |parameters|
handle_model_parsing(parameters, ModelUpdater)
end
end
end
# desc 'Creating psql dump file from fias xml'
# task :dump => :environment do
# read_path = File.join('/Users/guest/', 'Downloads', 'fias_xml', 'AS_HOUSE_20131123_05075930-b4eb-4278-a4f4-eef9cbf3d292.XML')
# NokogiriParse.new(File.open(read_path), DumpCreator.new(File.join(Rails.root, 'ruby_dump.sql')), 'House')
# end
end
desc 'Add full_address to existed fias_addrobjs'
task :add_full_address => :environment do
partial = FiasAddrobj.count/10000
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
partial.times do |k|
10.times do |i|
Process.fork do
p 'Forked proccess'
ActiveRecord::Base.connection.disconnect!
config = Rails.application.config.database_configuration[Rails.env]
ActiveRecord::Base.establish_connection(config)
FiasAddrobj.find_each(offset: ((k * 10000) + (i * 1000)), conditions: "full_address is NULL") do |obj|
full_addr = "#{obj.formalname}"
full_addr = "#{obj.ancestors.map {|n| "#{n.shortname} #{n.formalname}" }.join(', ')}, #{obj.shortname} #{full_addr}" unless obj.ancestors.nil?
next if full_addr == obj.full_address
res = obj.update_attribute(:full_address, full_addr)
p "updated aoid: #{obj.aoid}" if res
end
exit
end
end
Process.waitall
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment