Skip to content

Instantly share code, notes, and snippets.

@cjuroz
Forked from jarpy/requirements.txt
Last active February 4, 2022 12:08
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjuroz/d45f4d73e74f068892c5e4f3d1c7fa7c to your computer and use it in GitHub Desktop.
Save cjuroz/d45f4d73e74f068892c5e4f3d1c7fa7c to your computer and use it in GitHub Desktop.
Serverless Elasticsearch Curator for AWS Lambda using requests-aws4auth to sign requests with AWS ES
# http://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html
pip install elasticsearch-curator -t /path/to/project-dir
pip install requests-aws4auth -t /path/to/project-dir
# Run Elasticsearch Curator from AWS Lambda.
#
# Edit serverless-curator.yaml to define which indices should be purged.
from __future__ import print_function
import os
import certifi
import curator
import yaml
import time
from curator.exceptions import NoIndices
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# This is the entry point where Lambda will start execution.
def handler(event, context):
# For this function, we don't care about 'event' and 'context',
# but they need to be in the function signature anyway.
with open('serverless-curator.yaml') as config_file:
config = yaml.load(config_file)
# Create a place to track any indices that are deleted.
deleted_indices = {}
# Create a place to track backup indices
backup_indices = {}
# We can define multiple Elasticsearch clusters to manage, so we'll have
# an outer loop for working through them.
for cluster_config in config:
cluster_name = cluster_config['name']
deleted_indices[cluster_name] = []
backup_indices[cluster_name] = []
awsauth = AWS4Auth(os.getenv('AWS_ACCESS_KEY_ID'), os.getenv('AWS_SECRET_ACCESS_KEY'), cluster_config['region'], 'es', session_token=os.getenv('AWS_SESSION_TOKEN'))
# Create a collection to the cluster. We're using mangaged clusters in
# Elastic Cloud for this example, so we can enable SSL security.
es = Elasticsearch(cluster_config['endpoint'], http_auth=awsauth, connection_class=RequestsHttpConnection)
# Snapshots first
for snapshot in cluster_config['snapshots']:
prefix = snapshot['prefix']
name = snapshot['name'] + '-' + time.strftime('%Y%m%d%H%M%S')
# # Fetch all the index names.
# index_list = curator.IndexList(es)
# # Reduce the list to those that match the prefix
# if "kibana" not in prefix:
# index_list.filter_by_regex(kind='prefix', value=prefix)
# else:
# index_list.filter_kibana()
# curator.Snapshot(ilo=index_list, repository=snapshot['repository'], name=name).do_action()
############################################################################################
# Workaround for AWS ES 5.3 bug (https://forums.aws.amazon.com/thread.jspa?threadID=257549)
es.snapshot.create(repository=snapshot['repository'], snapshot=name, body='{"indices": "' + prefix + '"}')
# Check if snapshot finished and sleep 5s if necessary before check again.
while True:
resp = es.snapshot.status(repository=snapshot['repository'])
if not resp['snapshots']:
break
print('Snapshot %s is still running...' % name)
time.sleep(5)
############################################################################################
# Record the names of any indices we backup.
# backup_indices[cluster_name].extend(index_list.working_list())
# Now we'll work through each set of time-series indices defined in our config for this cluster.
for index in cluster_config['indices']:
prefix = index['prefix']
print('Checking "%s" indices on %s cluster.' %
(prefix, cluster_name))
# Fetch all the index names.
index_list = curator.IndexList(es)
try:
# Reduce the list to those that match the prefix.
index_list.filter_by_regex(kind='prefix', value=prefix)
# Reduce again, by age.
index_list.filter_by_age(source='name', direction='older',
timestring='%Y.%m.%d', unit='days',
unit_count=index['days'])
curator.DeleteIndices(index_list).do_action()
# If nothing is left in the list, we'll get a NoIndices exception.
# That's OK.
except NoIndices:
pass
# Record the names of any indices we removed.
deleted_indices[cluster_name].extend(index_list.working_list())
lambda_response = {'backup': backup_indices, 'deleted': deleted_indices}
print(lambda_response)
return lambda_response
---
# Define Elasticsearch Clusters and indices here, to have them periodically
# pruned by Curator.
- name: example logging cluster
endpoint: https://curator:PASSWORD@LOG-CLUSTER.us-west-1.aws.found.io:9243
region: us-west-2
snapshots:
- repository: index-backup-stage
prefix: logstash-stage-*
name: curator-stage
- repository: kibana-backup
prefix: .kibana
name: curator-kibana
indices:
- prefix: logstash-
days: 365
- name: example metrics cluster
endpoint: https://curator:PASSWORD@METRIC-CLUSTER.us-west-1.aws.found.io:9243
region: us-west-2
indices:
- prefix: metricbeat-
days: 14
- prefix: packetbeat-
days: 14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment