Skip to content

Instantly share code, notes, and snippets.

@myfashionhub
Created July 21, 2015 21:48
Show Gist options
  • Save myfashionhub/612dafd6496d088c6bee to your computer and use it in GitHub Desktop.
Save myfashionhub/612dafd6496d088c6bee to your computer and use it in GitHub Desktop.
Subscription initiation
require 'time'
class SubscriptionInitiationWorker < Boxxspring::Worker::TaskBase
def process_payload(payload)
if payload['create_subscription_tasks']
properties_and_sources.each do |property_id, sources|
sources.each_with_index do |source, index|
if source.last_polled_at.nil? || Time.now.utc >= Time.parse(source.last_polled_at) + source.time_to_live
self.logger.info("Source (id: #{source.id}) was last polled more than (or exactly) #{source.time_to_live} seconds ago.")
last_task = last_task(property_id, source)
unless last_task.present? && ['idle', 'running'].include?(last_task.state)
write = task_write_state(subscription_ingestion_task(property_id, source), 'idle',
self.logger.info("Creating SubscriptionIngestionTask."))
self.logger.info("The SubscriptionIngestionTask (id: #{write.id}) has been successfully created.") unless write.is_a?(Boxxspring::Error)
update_last_polled_at(property_id, source)
else
self.logger.info("The last task (id: #{last_task.id}) for property #{property_id} is running/idle.")
end
end
end
end
else
update_properties_and_sources(payload)
self.logger.info("The sources in memory have been updated to reflect a new source (id: #{payload['$this']['ids'].first}).")
end
end
private
def properties_and_sources
@properties_and_sources ||= begin
result = {}
properties.each do |property|
sources_to_add = sources(property)
result[property.id] = sources_to_add unless sources_to_add.blank?
end
result
end
end
def properties
operation("/properties").query
end
def sources(property)
operation("/properties/#{property.id}/sources").where(
sort_by: 'updated_at', sort_direction: 'asc', count: 250
).query
end
def source(property_id, source_id)
operation("/properties/#{property_id}/sources/#{source_id}").read
end
def update_last_polled_at(property_id, source)
source.last_polled_at = Time.now.utc.iso8601
self.logger.info("The last_polled_at has been updated for Source (id:#{source.id}).")
end
def update_properties_and_sources(payload)
property_id = payload['sources'].first['property_id']
source_id = payload['sources'].first['id']
properties_and_sources[property_id].each_with_index do |source, index|
if source.id == source_id
return properties_and_sources[property_id][index] = source(property_id, source_id)
end
end
properties_and_sources[property_id] += [source(property_id, source_id)]
end
def last_task(property_id, source)
operation("/properties/#{property_id}/tasks").where(type_name: 'subscription_ingestion_task',
subject_id: source.id,
sort_by: 'created_at',
order: 'desc',
count: '1').read
end
def subscription_ingestion_task(property_id, source)
Boxxspring::SubscriptionIngestionTask.new(
state: 'idle',
subject_type_name: 'Source',
subject_id: source.id,
provider: source.provider,
property_id: property_id
)
end
def operation(path)
Boxxspring::Operation.new(path, Boxxspring::Worker.configuration.api_credentials.to_hash)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment