Skip to content

Instantly share code, notes, and snippets.

@danhealy
Last active May 22, 2018 18:20
Show Gist options
  • Save danhealy/92c8b236c1c53a18c535 to your computer and use it in GitHub Desktop.
Save danhealy/92c8b236c1c53a18c535 to your computer and use it in GitHub Desktop.
Parallel DynamoDB Scan
batch_size = 100 # Items per request
parallel_requests = 4
total_items_to_request = 4000
iterations = (total_items_to_request / (parallel_requests * batch_size)) # => 10
parallel_requests.times do |seg|
Process.fork do
c = AWS::DynamoDB::Client.new(
:access_key_id => "...",
:secret_access_key => "...",
:api_version => '2012-08-10'
)
result = nil
iterations.times do |i|
req = {
table_name: "my_table",
limit: batch_size,
total_segments: parallel_requests,
segment: seg
}
if result && result[:last_evaluated_key]
req[:exclusive_start_key] = result[:last_evaluated_key]
end
result = c.scan(req)
# result[:member] is an array of batch_size items to process
end
end
end
Process.waitall
@phstc
Copy link

phstc commented Jan 3, 2015

😄

A thread safe implementation:

require 'aws-sdk'

class MyTable
  SCAN_WORKERS = 4

  class << self
    def client
      @dynamodb ||= Aws::DynamoDB::Client.new(
        region:             'us-east-1',
        access_key_id:      '...',
        secret_access_key:  '...'
      )
    end

    def scan(filter_expression, expression_attribute_values)
      items = Queue.new

      SCAN_WORKERS.times.map do |seg|
        Thread.new do
          resp = client.scan(
            table_name: 'mytable',
            limit: 1,
            total_segments: SCAN_WORKERS,
            segment: seg,
            expression_attribute_values: expression_attribute_values,
            filter_expression: filter_expression
          )

          items.push resp.items
        end

      end.each &:join

      items
    end
  end
end

items = MyTable.scan('value > :s', ':s' => 0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment