Skip to content

Instantly share code, notes, and snippets.

@cdw
Created April 28, 2015 22:36
Show Gist options
  • Save cdw/22094d73c41b6c99d3b0 to your computer and use it in GitHub Desktop.
Save cdw/22094d73c41b6c99d3b0 to your computer and use it in GitHub Desktop.
Manage an aws cluster from user-data and SQS
#!/usr/bin/env python
# encoding: utf-8
"""
sqs_control_userdata_script.py - control an aws instance from a sqs queue
Run this guy on startup as a userdata script and he will connect to
s3 to download code to a directory, and run commands in it that are
provided by an SQS queue, one job at a time per core
Created by Dave Williams on 2011-02-08
"""
## Import present packages
import os
import sys
import time
import traceback
import ConfigParser
import subprocess as subp
import multiprocessing as mp
## Handle logging and thrown fatal errors
def log_it(log_message):
print log_message
with open('/dev/console', 'a') as console:
console.write(log_message+'\n')
def fatal_error(error_log_message, feed_me = "differently"):
log_it("ERROR: " + error_log_message)
log_it("SHUTTING DOWN: feed me " + feed_me + " next time")
os.system("shutdown now -h")
## Configure extra software on the node
out = subp.call("aptitude -ry install python-scipy python-boto p7zip-full",
shell=True)
log_it("Installed scipy, boto, p7zip with result: "+str(out))
## Initialize working directory
os.chdir('/root')
HOMEDIR = os.getcwd()+'/'
## Configure AWS control parameters
ACCESS_KEY = 'insert own'
SECRET_KEY = 'insert own'
CODE_BUCKET = 'insert own'
JOB_QUEUE = 'insert own'
## Write out boto configuration
boto_config = ConfigParser.SafeConfigParser()
boto_config.add_section('Credentials')
boto_config.set('Credentials', 'aws_access_key_id', ACCESS_KEY)
boto_config.set('Credentials', 'aws_secret_access_key', SECRET_KEY)
with open('/etc/boto.cfg', 'wb') as config_file:
boto_config.write(config_file)
## Connect to aws with boto
try:
log_it("Connecting to boto")
import boto # Had to wait until .boto was written
S3 = boto.connect_s3()
SQS = boto.connect_sqs()
SQS.get_all_queues() # Call to test if our keys were accepted
except (boto.exception.NoAuthHandlerFound, boto.exception.SQSError), e:
log_it(e)
fatal_error("Probably gave bad aws keys", "valid credentials")
## Download files from passed bucket
try:
log_it("Downloading from code bucket")
code_bucket = S3.get_bucket(CODE_BUCKET)
for key in code_bucket.get_all_keys():
if '/' in key.name:
try:
os.makedirs(HOMEDIR + key.name[:key.name.rfind('/')])
except OSError:
pass #Already made that directory. No harm, no foul.
key.get_contents_to_filename(HOMEDIR + key.name)
os.chmod(HOMEDIR + key.name, 0776)
except boto.exception.S3ResponseError:
fatal_error("No bucket with given code_bucket name", "a valid bucket")
except IOError:
fatal_error("Couldn't write code_bucket contents locally")
## Connect to command queue
try:
log_it("Connecting to job queue")
job_queue = SQS.get_queue(JOB_QUEUE)
if type(job_queue) is type(None):
raise KeyError, "Provided job_queue name not found"
except KeyError:
fatal_error("Given job_queue does not exist", "a different queue name")
## Set up processes that will run our jobs
proc_num = mp.cpu_count()
processes = [mp.Process() for i in range(proc_num)]
jobs = [None for i in range(proc_num)]
[p.start() for p in processes]
[p.join() for p in processes]
## Turn control over to the job queue
try:
next_job = job_queue.read()
while next_job != None:
for i in range(proc_num):
if (processes[i].is_alive() is False) and (next_job != None):
processes[i].join()
if jobs[i] is not None:
job_queue.delete_message(jobs[i])
jobs[i] = next_job
log_it("Gonna run " + jobs[i].get_body())
processes[i] = mp.Process(target = subp.call,
args = (jobs[i].get_body(),),
kwargs = {'shell': True})
processes[i].start()
next_job = job_queue.read()
time.sleep(.5)
else:
log_it("Empty " + job_queue.name + ", waiting out current runs")
[p.join() for p in processes]
[job_queue.delete_message(job) for job in jobs]
log_it("All done")
except Exception as instance:
log_it("### An error occurred while running jobs")
log_it("Exception of type " + type(instance) + ": " + instance.message)
exc_type, exc_value, exc_traceback = sys.exc_info()
log_it(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
log_it("Going no further, shutting down now")
finally:
os.system('shutdown now -h')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment