Skip to content

Instantly share code, notes, and snippets.

@evaneykelen evaneykelen/sqs.rb
Created May 3, 2019

Embed
What would you like to do?
require 'open3'
class Sqs
def self.process_messages
sqs = Aws::SQS::Client.new(region: ENV.fetch("AWS_REGION"),
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"),
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY"))
queue_url = sqs.get_queue_url(queue_name: ENV.fetch("AWS_SQS_QUEUE_NAME")).queue_url
poller = Aws::SQS::QueuePoller.new(queue_url, { client: sqs })
poller_stats = poller.poll({ max_number_of_messages: 10,
idle_timeout: 60 }) do |messages|
messages.each do |message|
json = JSON.parse(message.body)
Rails.logger.ap "Message contents: #{json}"
json["Records"]&.each do |record|
del_msg = false
if record["eventName"] =~ /ObjectCreated/i
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
if key =~ /json\z/i
del_msg = process_manifest(bucket, key)
else
del_msg = true # OK to delete all msgs about uploaded files other than JSON manifests
end
else
del_msg = true
end
if del_msg
sqs.delete_message({
queue_url: queue_url,
receipt_handle: message.receipt_handle
})
end
end
end
end
true
end
def self.process_manifest(bucket, key)
s3 = Aws::S3::Resource.new(region: ENV.fetch("AWS_REGION"),
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"),
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY"))
begin
obj = s3.bucket(bucket).object(key)
result = obj.get.body.read
json = JSON.parse(result)
Rails.logger.ap "S3 file contents: #{json}"
page = {
tenant_urn: json["tenant_urn"],
source_urn: json["source_urn"],
locale: json["locale"],
tags: json["tags"]&.join(","),
title: json["title"],
}
if json["content_body"].present?
page[:body] = json["content_body"]
else
text = extract_text(json["content_s3_region"],
json["content_s3_bucket"],
json["content_s3_key"])
if text.blank?
# Associated file not (yet) present
return false
else
page[:body] = text
end
end
if json["operation"] =~ /create|update/i
if existing_page = Page.find_by(tenant_urn: json["tenant_urn"],
source_urn: json["source_urn"])
existing_page.attributes = page
existing_page.save!
existing_page.index!
else
new_page = Page.create!(page)
end
else
# Delete operation
if page_to_delete = Page.find_by(tenant_urn: json["tenant_urn"],
source_urn: json["source_urn"])
page_to_delete.remove_from_index!
page_to_delete.destroy
end
end
rescue Aws::S3::Errors::NoSuchKey
Rails.logger.ap "Cannot find #{bucket} / #{key}"
end
true
end
def self.extract_text(region, bucket, key)
s3 = Aws::S3::Resource.new(region: region,
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"),
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY"))
begin
obj = s3.bucket(bucket).object(key)
url = obj.presigned_url(:get, expires_in: 600) # 600 seconds
result, errors, _ = Open3.capture3("java -jar lib/tika-app-1.19.1.jar --text '#{url}'")
Rails.logger.ap "Tika: #{result}"
return result&.strip
rescue Aws::S3::Errors::NoSuchKey
Rails.logger.ap "Cannot find #{bucket} / #{key}"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.