Skip to content

Instantly share code, notes, and snippets.

@leobessa
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leobessa/7516a966513076b3d61c to your computer and use it in GitHub Desktop.
Save leobessa/7516a966513076b3d61c to your computer and use it in GitHub Desktop.
sidekiq dynamodb parallel scan
module Sidekiq
module Apns
class Worker
include Sidekiq::Worker
sidekiq_options :queue => :apns_dts
def perform(*args)
# p args
end
end
end
module DynamodbParallelScan
class Worker
include Sidekiq::Worker
sidekiq_options :queue => :dynamodb_scan
def perform(opts)
scan_args = opts.fetch('scan_args')
attributes_to_get = scan_args.fetch('attributes_to_get'){ %w(id dts) }
scan_filter = Hash[%w(dts).map{|k,v| [k,{comparison_operator: 'NOT_NULL'}] }]
exclusive_start_key = scan_args.fetch('exclusive_start_key'){ nil }
if exclusive_start_key
exclusive_start_key = Hash[exclusive_start_key.map do |attribute_name,attribute_hash|
attribute_type,attribute_value = attribute_hash.first
[attribute_name,{attribute_type.to_sym => attribute_value}]
end]
end
scan_args = {
table_name: scan_args.fetch('table_name'),
attributes_to_get: attributes_to_get,
select: 'SPECIFIC_ATTRIBUTES',
scan_filter: scan_filter,
return_consumed_capacity: 'TOTAL',
exclusive_start_key: exclusive_start_key
}
scan_args[:conditional_operator] = 'OR' if scan_filter.size > 1
scan_args[:total_segments] = scan_args.fetch('total_segments',1)
scan_args[:segment] = scan_args.fetch('segment',0)
response = dynamodb.scan(scan_args)
last_evaluated_key = response.last_evaluated_key && item_to_hash(response.last_evaluated_key)
puts "last_evaluated_key %s " % [last_evaluated_key]
puts "consumed_capacity %s " % [response.consumed_capacity]
batch = Sidekiq::Batch.new
batch.on(:success, self.class.name, scan_args: scan_args.merge('last_evaluated_key' => last_evaluated_key))
batch.jobs do
response.items.each do |item|
h = item_to_value(item)
Array(h['dts']).each{ |token| Sidekiq::Apns::Worker.perform_async(id: h['id'], token: token) }
end
end
end
def on_success(status, options)
scan_args = options.fetch('scan_args')
# p "status #{status}, options #{options}"
if scan_args['last_evaluated_key']
self.class.perform_async(scan_args: scan_args.merge(exclusive_start_key: scan_args['last_evaluated_key']))
end
end
def dynamodb
Aws::DynamoDB.new
end
def item_to_hash(item)
Hash[item.map{|k,v| [k,extract_hash(v)] } ]
end
def item_to_value(item)
Hash[item.map{|k,v| [k,extract_value(v)] } ]
end
def extract_value(struct)
key, value = extract_hash(struct).first
case key
when :n
value.to_i
when :ns
value.map(&:to_i)
else
value
end
end
def extract_hash(struct)
struct.enum_for(:each_pair).select do |_,v|
v
end.map do |k,v|
{k => v}
end.first
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment