Skip to content

Instantly share code, notes, and snippets.

@darvid
Forked from jarpy/requirements.txt
Last active August 29, 2017 17:29
Show Gist options
  • Save darvid/3b6fa2c14e730b1b10b0c43005f4d68b to your computer and use it in GitHub Desktop.
Save darvid/3b6fa2c14e730b1b10b0c43005f4d68b to your computer and use it in GitHub Desktop.
Serverless Elasticsearch Curator for AWS Lambda
boto3>=1.4.6,<2.0.0
certifi>=2017.7.27.1,<2018.0.0.0
elasticsearch-curator>=5.1.2,<6.0.0
PyYAML>=3.12,<4.0
from __future__ import print_function
import datetime
import boto3
import certifi
import curator
import yaml
from curator.exceptions import NoIndices
from elasticsearch import Elasticsearch
curator_username = 'curator'
curator_password_key = '/elasticsearch/sg/roles/curator'
def get_ssm_param(name, decrypt=True):
client = boto3.client('ssm')
try:
response = client.get_parameter(Name=name, WithDecryption=decrypt)
return response['Parameter']['Value']
except KeyError:
return None
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ParameterNotFound':
return None
raise
def handler(event, context):
with open('serverless-curator.yml') as config_file:
config = yaml.load(config_file)
# Create a place to track any indices that are deleted.
deleted_indices = {}
# We can define multiple Elasticsearch clusters to manage, so we'll have
# an outer loop for working through them.
for cluster_config in config:
cluster_name = cluster_config['name']
deleted_indices[cluster_name] = []
# Create a collection to the cluster. We're using mangaged clusters in
# Elastic Cloud for this example, so we can enable SSL security.
es = Elasticsearch(
cluster_config['endpoint'].format(
curator_username=curator_username,
curator_password=get_ssm_param(curator_password_key),
),
use_ssl=True,
verify_certs=True, ca_certs=certifi.where())
repo_name = None
if 'repository' in cluster_config:
repo_name = cluster_config['repository'].pop('name')
curator.create_repository(
es,
repo_name=repository,
**cluster_config['repository'])
# Now we'll work through each set of time-series indices defined in
# our config for this cluster.
for index in cluster_config['indices']:
prefix = index['prefix']
print('Checking "{}" indices on {} cluster.'.format(
prefix, cluster_name))
# Fetch all the index names.
index_list = curator.IndexList(es)
try:
# Reduce the list to those that match the prefix.
index_list.filter_by_regex(kind='prefix', value=prefix)
# Reduce again, by age.
index_list.filter_by_age(source='name', direction='older',
timestring='%Y.%m.%d', unit='days',
unit_count=index['days'])
if repo_name is not None:
curator.Snapshot(
index_list,
repo_name,
datetime.datetime.utcnow().strftime('%Y-%m-%d'),
).do_action()
curator.DeleteIndices(index_list).do_action()
# If nothing is left in the list, we'll get a NoIndices exception.
# That's OK.
except NoIndices:
pass
# Record the names of any indices we removed.
deleted_indices[cluster_name].extend(index_list.working_list())
lambda_response = {'deleted': deleted_indices}
print(lambda_response)
return lambda_response
---
# Define Elasticsearch Clusters and indices here, to have them periodically
# pruned by Curator.
- name: example logging cluster
endpoint: https://{{curator_username}}:{{curator_password}}@elasticsearch:9200/
indices:
- prefix: logstash-
days: 365
repository:
name: logstash
bucket: logstash.snapshots.example.com
repo_type: s3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment