Skip to content

Instantly share code, notes, and snippets.

@nicholasf
Created May 17, 2011 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nicholasf/976043 to your computer and use it in GitHub Desktop.
Save nicholasf/976043 to your computer and use it in GitHub Desktop.
require 'rubygems'
require 'mutexer'
require 'ruote/engine'
require 'nesstar/config'
require 'ruote/storage/base'
require 'ruote/storage/fs_storage'
require 'ruote/worker'
require 'ruote/participant'
require 'ruote'
require 'fileutils'
require 'yaml'
require 'nesstar/rdf/parser'
#these classes should be loaded by the rails environment via rake
#but multithreading seems to require we explicitly require them
require 'study'
require 'archive_study'
require 'study_field'
# require 'study_related_material'
# require 'variable'
# require 'variable_field'
require 'ruby-debug'
module Nesstar
class Integration
include Config
#call this from the client to run the integration.
def self.run
@storage = Ruote::FsStorage.new("/tmp/nesstar/ruote/")
@worker = Ruote::Worker.new(@storage)
@engine = Ruote::Engine.new(@worker)
register_workflow_participants(@engine)
dataset_process_def = Ruote.process_definition :name => 'convert_datasets' do
sequence do
subprocess :ref => 'initialize_directories'
participant :ref => 'load_study_integrations'
cancel_process :if => '${f:study_ids.size} == 0'
concurrent_iterator :on_field => 'studies_to_download', :to_f => "ddi_id" do
participant :ref => 'download_study'
end
concurrent_iterator :on_field => 'studies_to_download', :to_f => "ddi_id" do
participant :ref => 'download_related_materials'
end
concurrent_iterator :on_field => 'studies_to_download', :to_f => "ddi_id" do
participant :ref => 'download_variables'
end
participant :ref => 'convert_related_materials'
participant :ref => 'convert_variables'
participant :ref => 'ada_archive_contains_all_studies'
participant :ref => 'log_run'
end
process_definition :name => 'initialize_directories' do
sequence do
participant :ref => 'initialize_directory', :dir => $nesstar_dir
participant :ref => 'mkdir', :dir => $xml_dir
participant :ref => 'mkdir', :dir => $studies_xml_dir
participant :ref => 'mkdir', :dir => $related_xml_dir
participant :ref => 'mkdir', :dir => $variables_xml_dir
end
end
end
ARGV << "-d"
wfid = @engine.launch(dataset_process_def)
@engine.wait_for(wfid)
end
# registers the workflow logic participants with ruote
def self.register_workflow_participants(engine)
engine.register_participant 'initialize_directory' do |workitem|
rm_rf(workitem.fields['params']['dir'])
mkdir(workitem.fields["params"]['dir']) unless File.exists?(workitem.fields["params"]['dir'])
end
engine.register_participant 'recreate_xml_directory' do |workitem|
rm_rf(workitem.fields['params']['dir'])
mkdir(workitem.fields['params']['dir'])
end
engine.register_participant 'mkdir' do |workitem|
mkdir(workitem.fields['params']['dir'])
end
## load_study_ids
engine.register_participant 'load_study_integrations' do |workitem|
dataset_urls = []
workitem.fields['studies_to_download'] ||= []
queries = ArchiveStudyQuery.all
for query in queries
query.save!
query_response_file = "#{$xml_dir}query_response_#{Time.now.to_i}.xml"
`curl -o #{query_response_file} --compressed "#{query.query}"`
handler = Nesstar::QueryResponseParser.new(query_response_file)
for url in handler.datasets
ddi_id = url.split(".").last
#the validations on the object ensure we don't duplicate the object (archive + query must be unique, url can repeat)
pre_existing = ArchiveStudyIntegration.find_by_archive_id_and_ddi_id(query.archive.id, ddi_id)
unless pre_existing
ArchiveStudyIntegration.create!(:ddi_id => ddi_id, :archive => query.archive,
:archive_study_query => query, :user_id => query.id)
end
end
end
workitem.fields['studies_to_download'] = Set.new
ArchiveStudyIntegration.all.each do |integration|
workitem.fields['studies_to_download'] << integration.ddi_id
end
end
engine.register_participant 'download_study' do |workitem|
workitem.fields['fetch_errors'] ||= []
workitem.fields['downloaded_files'] ||= []
workitem.fields['study_ids'] ||= Set.new
ddi_id = workitem.fields['ddi_id']
file_name = "#{ddi_id}.xml"
mutex = Mutexer.wait_for_mutex(2)
begin
mutex.synchronize do
# puts "\\n\n study download: downloading: #{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"
http_headers = `curl -i --compressed "#{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"`
http_headers = http_headers.split("\n")
if http_headers.first =~ /500/
workitem.fields['fetch_errors'] << "Error while downloading #{ddi_id}: #{http_headers.first} \n"
Inkling::Log.create!(:category => "integration", :text => "HTTP 500 error downloading: #{$nesstar_server}/obj/fStudy/au.edu.anu.assda.ddi.#{ddi_id}")
next
end
begin
# puts " #{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"
`curl -o #{$studies_xml_dir}#{file_name} --compressed "#{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"`
workitem.fields['downloaded_files'] << file_name
rescue StandardError => boom
puts "#{boom}.to_s"
workitem.fields['fetch_errors'] << "Error while downloading #{ddi_id}: #{boom} \n"
end
study_hash = Nesstar::RDF::Parser.parse("#{$studies_xml_dir}#{ddi_id}.xml")
study = Study.store_with_fields(study_hash)
DdiMapping.batch_create(study_hash) #create mappings entries for any DDI elements/attributes we have not yet noticed
#find archive study integrations which need to be linked to the new study
integrations = ArchiveStudyIntegration.find_all_by_ddi_id_and_study_id(ddi_id, nil)
for integration in integrations
integration.study_id = study.id
integration.save!
end
end
ensure
ActiveRecord::Base.connection_pool.release_connection
end
end
engine.register_participant 'download_related_materials' do |workitem|
mutex = Mutexer.wait_for_mutex(2)
begin
mutex.synchronize do
ddi_id = workitem.fields['ddi_id']
study = Study.find_by_ddi_id(ddi_id)
# puts "related materials for #{study.label}"
#we looks for a study which records the URL of a related materials document
related_materials_entry = study.related_materials_attribute
unless related_materials_entry.nil?
document_name = related_materials_entry.value.split(".").last + ".xml"
`curl -o #{$related_xml_dir}#{document_name} --compressed "#{related_materials_entry.value}"`
end
end
ensure
ActiveRecord::Base.connection_pool.release_connection
end
end
engine.register_participant 'download_variables' do |workitem|
mutex = Mutexer.wait_for_mutex(2)
begin
mutex.synchronize do
ddi_id = workitem.fields['ddi_id']
study = Study.find_by_ddi_id(ddi_id)
#we looks for a study's variables
variable_url = study.variables_attribute.value
var_file_name = variable_url.split(".").last
`curl -o #{$variables_xml_dir}#{var_file_name} --compressed "#{variable_url}"`
end
ensure
ActiveRecord::Base.connection_pool.release_connection
end
end
engine.register_participant 'convert_related_materials' do |workitem|
Dir.entries($related_xml_dir).each do |file_name|
next if file_name == "." or file_name == ".."
begin
related_materials_list = RDF::Parser.parse_related_materials_document("#{$related_xml_dir}#{file_name}")
rescue StandardError => boom
puts "#{$related_xml_dir}#{file_name}: #{boom}"
end
if related_materials_list.nil?
Inkling::Log.create!(:category => "integration", :text => "Empty document: #{$related_xml_dir}#{file_name}")
next
end
related_materials_list.each do |related|
study = Study.find_by_about(related[:study_resource])
next if study.nil?
pre_existing = StudyRelatedMaterial.find_by_study_id_and_uri(study.id, related[:uri], related[:label])
next if pre_existing
related_material = StudyRelatedMaterial.new(:study_id => study.id, :uri => related[:uri],
:comment => related[:comment], :creation_date => related[:creationDate], :complete => related[:complete],
:resource => related[:study_resource])
related_material.save!
end
end
end
engine.register_participant 'convert_variables' do |workitem|
Dir.entries($variables_xml_dir).each do |file_name|
next if file_name == "." or file_name == ".."
variables_list = RDF::Parser.parse_variables("#{$variables_xml_dir }/#{file_name}")
variables_list.each {|var_hash| variable = Variable.store_with_fields(var_hash)}
end
end
engine.register_participant 'ada_archive_contains_all_studies' do |workitem|
for study in Study.all
unless Archive.ada.studies.index(study)
ArchiveStudy.create!(:archive => Archive.ada, :study => study)
end
end
end
engine.register_participant 'log_run' do |workitem|
workitem.fields['downloads'] ||= {}
workitem.fields['fetch_errors'] ||= {}
Inkling::Log.create!(:category => "integration", :text => "Downloaded #{workitem.fields['downloads'].size} studies. Encountered #{workitem.fields['fetch_errors'].size} errors. There are now #{Study.all.size} studies in ADA.")
end
end
end
end
@nicholasf
Copy link
Author

andromeda:ada-cms nicholas$ rake nesstar --trace
(in /Users/nicholas/code/src/clients/anu/ADA-CMS)
** Invoke nesstar (first_time)
** Invoke environment (first_time)
** Execute environment
** Execute nesstar
rake aborted!
[:call, [:const, :File]] is excluded
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:256:in block in check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:255:ineach'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:255:in check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:504:indo_check'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:in block in do_check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:ineach'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:in do_check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:inblock in do_check'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:in each' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:indo_check'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:in block in do_check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:ineach'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:510:in do_check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/rufus-treechecker-1.0.4/lib/rufus/treechecker.rb:172:incheck'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/treechecker.rb:120:in block_check' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/participant_list.rb:64:inregister'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/engine.rb:524:in register_participant' /Users/nicholas/code/src/clients/anu/ADA-CMS/lib/nesstar/integration.rb:80:inregister_workflow_participants'
/Users/nicholas/code/src/clients/anu/ADA-CMS/lib/nesstar/integration.rb:35:in run' /Users/nicholas/code/src/clients/anu/ADA-CMS/lib/tasks/nesstar.rake:2:inblock in <top (required)>'

@nicholasf
Copy link
Author

andromeda:ada-cms nicholas$ rake nesstar --trace
(in /Users/nicholas/code/src/clients/anu/ADA-CMS)
** Invoke nesstar (first_time)
** Invoke environment (first_time)
** Execute environment
** Execute nesstar
rake aborted!
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:121: syntax error, unexpected keyword_ensure, expecting keyword_end
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:124: syntax error, unexpected $end, expecting keyword_end
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:90:in eval' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:90:inconsume'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/dispatch_pool.rb:94:in do_dispatch' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/dispatch_pool.rb:121:inblock in do_threaded_dispatch'

@nicholasf
Copy link
Author

** Invoke nesstar (first_time)
** Invoke environment (first_time)
** Execute environment
** Execute nesstar

proc do |workitem|
workitem.fields["fetch_errors"] ||= []
workitem.fields["downloaded_files"] ||= []
workitem.fields["study_ids"] ||= Set.new
ddi_id = workitem.fields["ddi_id"]
file_name = "#{ddi_id}.xml"
mutex = Mutexer.wait_for_mutex(2)
begin
mutex.synchronize do
http_headers = curl -i --compressed "#{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"
http_headers = http_headers.split("\n")
if http_headers.first =~ /500/ then
(workitem.fields["fetch_errors"] << "Error while downloading #{ddi_id}: #{http_headers.first} \n")
Inkling::Log.create!(:category => "integration", :text => ("HTTP 500 error downloading: #{$nesstar_server}/obj/fStudy/au.edu.anu.assda.ddi.#{ddi_id}"))
next
end
begin
(curl -o #{$studies_xml_dir}#{file_name} --compressed "#{$nesstar_server}/obj/fStudy/au.edu.anu.ada.ddi.#{ddi_id}"
(workitem.fields["downloaded_files"] << file_name))
rescue StandardError => boom
puts("#{boom}.to_s")
(workitem.fields["fetch_errors"] << "Error while downloading #{ddi_id}: #{boom} \n")
end
study_hash = Nesstar::RDF::Parser.parse("#{$studies_xml_dir}#{ddi_id}.xml")
study = Study.store_with_fields(study_hash)
DdiMapping.batch_create(study_hash)
integrations = ArchiveStudyIntegration.find_all_by_ddi_id_and_study_id(ddi_id, nil)
for integration in integrations do
(integration.study_id = study.id
integration.save!)
end
ensure
ActiveRecord::Base.connection_pool.release_connection
end
end
rake aborted!
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:122: syntax error, unexpected keyword_ensure, expecting keyword_end
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:125: syntax error, unexpected $end, expecting keyword_end
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:91:in eval' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/part/block_participant.rb:91:inconsume'
/Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/dispatch_pool.rb:94:in do_dispatch' /Users/nicholas/.rvm/gems/ruby-1.9.2-p0/gems/ruote-2.2.0/lib/ruote/svc/dispatch_pool.rb:121:inblock in do_threaded_dispatch'
andromeda:ada-cms nicholas$

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment