Skip to content

Instantly share code, notes, and snippets.

@lrowe
Created July 20, 2015 07:03
Show Gist options
  • Save lrowe/189625bdb076dbb16731 to your computer and use it in GitHub Desktop.
Save lrowe/189625bdb076dbb16731 to your computer and use it in GitHub Desktop.
SQS worker
# ami-5189a661 14.04
# ec2 role backfill-instance
packages:
- python3-dev
- python-virtualenv
power_state:
mode: poweroff
runcmd:
- set -e
- chown -R ubuntu:ubuntu /opt/backfill
- sudo -u ubuntu virtualenv --python=python3.4 /opt/backfill
- sudo -u ubuntu /opt/backfill/bin/pip install boto==2.38.0
- sudo -u ubuntu /opt/backfill/bin/python3 /opt/backfill/backfill_worker.py
write_files:
- path: /opt/backfill/backfill_worker.py
content: |
from shlex import quote
import subprocess
import sys
import boto.sqs
import multiprocessing
username = '...'
password = '...'
conn = boto.sqs.connect_to_region('us-west-2')
inq = conn.get_queue('backfill-content-md5-in')
outq = conn.get_queue('backfill-content-md5-out')
def content_md5sum(path):
url = "https://{username}:{password}@www.encodeproject.org{path}@@download".format(
username=username, password=password, path=path)
try:
output = subprocess.check_output([
'/bin/bash', '-c',
'set -o pipefail; curl -s -S -L %s | gunzip | md5sum' % quote(url),
], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
return 'error ' + e.output.decode('ascii').replace('\n', '\t')
else:
return output[:32].decode('ascii')
def process(ignore):
while True:
message = inq.read()
if not message:
break
path = message.get_body()
print(path)
result = content_md5sum(path)
print(path + ' ' + result)
sys.stdout.flush()
outq.write_batch([('msg', path + ' ' + result, 0)])
message.delete()
if __name__ == '__main__':
procs = max(1, int(multiprocessing.cpu_count()/1.5))
pool = multiprocessing.Pool(procs)
pool.map(process, range(procs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment