Created
July 21, 2015 21:48
-
-
Save myfashionhub/612dafd6496d088c6bee to your computer and use it in GitHub Desktop.
Subscription initiation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'time' | |
class SubscriptionInitiationWorker < Boxxspring::Worker::TaskBase | |
def process_payload(payload) | |
if payload['create_subscription_tasks'] | |
properties_and_sources.each do |property_id, sources| | |
sources.each_with_index do |source, index| | |
if source.last_polled_at.nil? || Time.now.utc >= Time.parse(source.last_polled_at) + source.time_to_live | |
self.logger.info("Source (id: #{source.id}) was last polled more than (or exactly) #{source.time_to_live} seconds ago.") | |
last_task = last_task(property_id, source) | |
unless last_task.present? && ['idle', 'running'].include?(last_task.state) | |
write = task_write_state(subscription_ingestion_task(property_id, source), 'idle', | |
self.logger.info("Creating SubscriptionIngestionTask.")) | |
self.logger.info("The SubscriptionIngestionTask (id: #{write.id}) has been successfully created.") unless write.is_a?(Boxxspring::Error) | |
update_last_polled_at(property_id, source) | |
else | |
self.logger.info("The last task (id: #{last_task.id}) for property #{property_id} is running/idle.") | |
end | |
end | |
end | |
end | |
else | |
update_properties_and_sources(payload) | |
self.logger.info("The sources in memory have been updated to reflect a new source (id: #{payload['$this']['ids'].first}).") | |
end | |
end | |
private | |
def properties_and_sources | |
@properties_and_sources ||= begin | |
result = {} | |
properties.each do |property| | |
sources_to_add = sources(property) | |
result[property.id] = sources_to_add unless sources_to_add.blank? | |
end | |
result | |
end | |
end | |
def properties | |
operation("/properties").query | |
end | |
def sources(property) | |
operation("/properties/#{property.id}/sources").where( | |
sort_by: 'updated_at', sort_direction: 'asc', count: 250 | |
).query | |
end | |
def source(property_id, source_id) | |
operation("/properties/#{property_id}/sources/#{source_id}").read | |
end | |
def update_last_polled_at(property_id, source) | |
source.last_polled_at = Time.now.utc.iso8601 | |
self.logger.info("The last_polled_at has been updated for Source (id:#{source.id}).") | |
end | |
def update_properties_and_sources(payload) | |
property_id = payload['sources'].first['property_id'] | |
source_id = payload['sources'].first['id'] | |
properties_and_sources[property_id].each_with_index do |source, index| | |
if source.id == source_id | |
return properties_and_sources[property_id][index] = source(property_id, source_id) | |
end | |
end | |
properties_and_sources[property_id] += [source(property_id, source_id)] | |
end | |
def last_task(property_id, source) | |
operation("/properties/#{property_id}/tasks").where(type_name: 'subscription_ingestion_task', | |
subject_id: source.id, | |
sort_by: 'created_at', | |
order: 'desc', | |
count: '1').read | |
end | |
def subscription_ingestion_task(property_id, source) | |
Boxxspring::SubscriptionIngestionTask.new( | |
state: 'idle', | |
subject_type_name: 'Source', | |
subject_id: source.id, | |
provider: source.provider, | |
property_id: property_id | |
) | |
end | |
def operation(path) | |
Boxxspring::Operation.new(path, Boxxspring::Worker.configuration.api_credentials.to_hash) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment