How to execute code in Elastic Beanstalk only if you're the leader
# We are not using this file to run actual cron jobs. This is because the | |
# built-in Elastic Beanstalk cron feature actually puts the jobs at the end | |
# of the SQS queue. Instead we will run them manually. However, we need to | |
# have a non-empty cron.yaml or the SQS daemon won't even run and leader | |
# election won't work. | |
version: 1 | |
cron: | |
- name: "DoNothingJob" | |
url: "/periodic_tasks" | |
schedule: "0 0 31 2 *" |
# Class that runs some code, only if the current instance is the leader. | |
class LeaderRunner | |
def initialize | |
@instance_id = `wget -q -O - http://169.254.169.254/latest/meta-data/instance-id` | |
@dynamo_client = Aws::DynamoDB::Client.new | |
@ec2_client = Aws::EC2::Client.new | |
yaml = YAML.load_file('/etc/aws-sqsd.d/default.yaml') | |
@table = yaml['registry_table'] | |
end | |
# @return [String] | |
def get_leader | |
leader_id = @dynamo_client.get_item( | |
key: { 'id': 'leader-election-record'}, | |
table_name: @table | |
).item['leader_id'] | |
leader_id.split('.')[0] | |
end | |
# @param instance_id [String] | |
# @return [String] | |
def instance_state(instance_id) | |
@ec2_client.describe_instances( | |
instance_ids: [instance_id] | |
)[0][0].instances[0][:state][:name] | |
end | |
# @param msg [String] | |
def log_message(msg) | |
Rails.logger.info(msg) | |
end | |
# Actually run the code. | |
def execute | |
raise NotImplementedError | |
end | |
def run | |
leader_id = get_leader | |
leader_state = instance_state(leader_id) | |
if leader_id == @instance_id && leader_state != 'running' | |
log_message("We are the leader but we're not in a good state (#{leader_state}). Exiting now.") | |
return | |
end | |
while leader_id != @instance_id && leader_state != 'running' | |
# Current leader is in a bad state, e.g. shutting down. | |
# Wait for leader-election to kick in and check again in 2 seconds. | |
# At that point we may be the leader. | |
log_message("Not the leader but the leader is not in a good state (#{leader_state}). Checking in 2 seconds...") | |
sleep(2) | |
leader_id = get_leader | |
leader_state = instance_state(leader_id) | |
end | |
if leader_id != @instance_id | |
log_message("Leader is #{leader_id}, not us, exiting") | |
return | |
end | |
execute | |
end | |
end | |
# An example which runs an ActiveJob. | |
class JobRunner < LeaderRunner | |
# @param job_class [String] | |
def initialize(job_class) | |
super() | |
@job_class = job_class | |
end | |
def execute | |
log_message("Executing job #{@job_class}") | |
@job_class.constantize.perform_now | |
log_message("Done executing #{@job_class}") | |
end | |
end | |
if ARGV[0].present? | |
JobRunner.new(ARGV[0]).run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment