Last active
June 13, 2020 05:05
-
-
Save evaneykelen/8161c46be3aa2804379bea0d6dd8ec8a to your computer and use it in GitHub Desktop.
sqs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'open3' | |
class Sqs | |
def self.process_messages | |
sqs = Aws::SQS::Client.new(region: ENV.fetch("AWS_REGION"), | |
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"), | |
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY")) | |
queue_url = sqs.get_queue_url(queue_name: ENV.fetch("AWS_SQS_QUEUE_NAME")).queue_url | |
poller = Aws::SQS::QueuePoller.new(queue_url, { client: sqs }) | |
poller_stats = poller.poll({ max_number_of_messages: 10, | |
idle_timeout: 60 }) do |messages| | |
messages.each do |message| | |
json = JSON.parse(message.body) | |
Rails.logger.ap "Message contents: #{json}" | |
json["Records"]&.each do |record| | |
del_msg = false | |
if record["eventName"] =~ /ObjectCreated/i | |
bucket = record["s3"]["bucket"]["name"] | |
key = record["s3"]["object"]["key"] | |
if key =~ /json\z/i | |
del_msg = process_manifest(bucket, key) | |
else | |
del_msg = true # OK to delete all msgs about uploaded files other than JSON manifests | |
end | |
else | |
del_msg = true | |
end | |
if del_msg | |
sqs.delete_message({ | |
queue_url: queue_url, | |
receipt_handle: message.receipt_handle | |
}) | |
end | |
end | |
end | |
end | |
true | |
end | |
def self.process_manifest(bucket, key) | |
s3 = Aws::S3::Resource.new(region: ENV.fetch("AWS_REGION"), | |
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"), | |
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY")) | |
begin | |
obj = s3.bucket(bucket).object(key) | |
result = obj.get.body.read | |
json = JSON.parse(result) | |
Rails.logger.ap "S3 file contents: #{json}" | |
page = { | |
tenant_urn: json["tenant_urn"], | |
source_urn: json["source_urn"], | |
locale: json["locale"], | |
tags: json["tags"]&.join(","), | |
title: json["title"], | |
} | |
if json["content_body"].present? | |
page[:body] = json["content_body"] | |
else | |
text = extract_text(json["content_s3_region"], | |
json["content_s3_bucket"], | |
json["content_s3_key"]) | |
if text.blank? | |
# Associated file not (yet) present | |
return false | |
else | |
page[:body] = text | |
end | |
end | |
if json["operation"] =~ /create|update/i | |
if existing_page = Page.find_by(tenant_urn: json["tenant_urn"], | |
source_urn: json["source_urn"]) | |
existing_page.attributes = page | |
existing_page.save! | |
existing_page.index! | |
else | |
new_page = Page.create!(page) | |
end | |
else | |
# Delete operation | |
if page_to_delete = Page.find_by(tenant_urn: json["tenant_urn"], | |
source_urn: json["source_urn"]) | |
page_to_delete.remove_from_index! | |
page_to_delete.destroy | |
end | |
end | |
rescue Aws::S3::Errors::NoSuchKey | |
Rails.logger.ap "Cannot find #{bucket} / #{key}" | |
end | |
true | |
end | |
def self.extract_text(region, bucket, key) | |
s3 = Aws::S3::Resource.new(region: region, | |
access_key_id: ENV.fetch("AWS_ACCESS_KEY_ID"), | |
secret_access_key: ENV.fetch("AWS_SECRET_ACCESS_KEY")) | |
begin | |
obj = s3.bucket(bucket).object(key) | |
url = obj.presigned_url(:get, expires_in: 600) # 600 seconds | |
result, errors, _ = Open3.capture3("java -jar lib/tika-app-1.19.1.jar --text '#{url}'") | |
Rails.logger.ap "Tika: #{result}" | |
return result&.strip | |
rescue Aws::S3::Errors::NoSuchKey | |
Rails.logger.ap "Cannot find #{bucket} / #{key}" | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment