Skip to content

Instantly share code, notes, and snippets.

@arturo-c
Created August 26, 2016 19:45
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save arturo-c/0125dcac62c672c7aa10842c1b5b739c to your computer and use it in GitHub Desktop.
Save arturo-c/0125dcac62c672c7aa10842c1b5b739c to your computer and use it in GitHub Desktop.
sample python to es
import base64
import datetime
import json
import os
import time
import traceback
import urlparse
import botocore.auth
import botocore.awsrequest
import botocore.credentials
import botocore.endpoint
import botocore.session
import boto3.dynamodb.types
# The following parameters are required to configure the ES cluster
ES_ENDPOINT = 'search-bookshout-books-geltjfzt2bwmvmczlsx3qg7oo4.us-east-1.es.amazonaws.com'
# The following parameters can be optionally customized
DOC_TABLE_FORMAT = '{}' # Python formatter to generate index name from the DynamoDB table name
DOC_TYPE_FORMAT = '{}_type' # Python formatter to generate type name from the DynamoDB table name, default is to add '_type' suffix
ES_REGION = None # If not set, use the runtime lambda region
ES_MAX_RETRIES = 3 # Max number of retries for exponential backoff
DEBUG = True # Set verbose debugging information
class ES_Exception(Exception):
'''Exception capturing status_code from Client Request'''
status_code = 0
payload = ''
def __init__(self, status_code, payload):
self.status_code = status_code
self.payload = payload
Exception.__init__(self, 'ES_Exception: status_code={}, payload={}'.format(status_code, payload))
# Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request
def post_data_to_es(payload, region, creds, host, path, method='POST', proto='https://'):
'''Post data to ES endpoint with SigV4 signed http headers'''
sigv4 = botocore.auth.SigV4Auth(creds, 'es', region)
params = {'context': {}, 'method': method, 'url': proto + host + path, 'region': region, 'headers': {'Host': host}, 'body': payload}
req = botocore.awsrequest.create_request_object(params)
sigv4.add_auth(req)
prep_req = req.prepare()
http_session = botocore.endpoint.PreserveAuthSession()
res = http_session.send(prep_req)
if res.status_code >= 200 and res.status_code <= 299:
return res._content
else:
raise ES_Exception(res.status_code, res._content)
# High-level POST data to Amazon Elasticsearch Service with exponential backoff
# according to suggested algorithm: http://docs.aws.amazon.com/general/latest/gr/api-retries.html
def post_to_es(payload):
'''Post data to ES cluster with exponential backoff'''
# Get aws_region and credentials to post signed URL to ES
es_region = ES_REGION or os.environ['AWS_REGION']
session = botocore.session.Session({'region': es_region})
creds = botocore.credentials.get_credentials(session)
es_url = urlparse.urlparse(ES_ENDPOINT)
es_endpoint = es_url.netloc or es_url.path # Extract the domain name in ES_ENDPOINT
# Post data with exponential backoff
retries = 0
while (retries < ES_MAX_RETRIES):
if retries > 0:
millis = 2**retries * .100
if DEBUG:
print('DEBUG: Wait for {:.1f} seconds'.format(millis))
time.sleep(millis)
try:
es_ret_str = post_data_to_es(payload, es_region, creds, es_endpoint, '/_bulk')
if DEBUG:
print('DEBUG: Return from ES: {}'.format(es_ret_str))
es_ret = json.loads(es_ret_str)
if es_ret['errors']:
print('ERROR: ES post unsucessful, errors present, took={}ms'.format(es_ret['took']))
# Filter errors
es_errors = [item for item in es_ret['items'] if item.get('index').get('error')]
print('ERROR: List of items with errors: {}'.format(json.dumps(es_errors)))
else:
print('INFO: ES post successful, took={}ms'.format(es_ret['took']))
break # Sending to ES was ok, break retry loop
except ES_Exception as e:
if (e.status_code >= 500) and (e.status_code <= 599):
retries += 1 # Candidate for retry
else:
raise # Stop retrying, re-raise exception
# Compute a compound doc index from the key(s) of the object in lexicographic order: "k1=key_val1|k2=key_val2"
def compute_doc_index(keys_raw, deserializer):
index = []
for key in sorted(keys_raw):
index.append('{}={}'.format(key, deserializer.deserialize(keys_raw[key])))
return '|'.join(index)
def _lambda_handler(event, context):
if DEBUG:
print('DEBUG: Event: {}'.format(event))
records = event['Records']
now = datetime.datetime.utcnow()
es_actions = [] # Items to be added/updated/removed from ES - for bulk API
cnt_insert = cnt_modify = cnt_remove = 0
for record in records:
# Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay)
if DEBUG:
print('DEBUG: Record: {}'.format(record))
if record.get('eventSource') == 'aws:kinesis':
ddb = json.loads(base64.b64decode(record['kinesis']['data']))
doc_table = 'production-book'
doc_type = 'book'
doc_index = ddb['isbn']
else:
print('ERROR: Ignoring non dynamodb event sources: {}'.format(record.get('eventSource')))
continue
if DEBUG:
print('DEBUG: doc_table={}'.format(doc_table))
print('DEBUG: doc_index={}'.format(doc_index))
event_name = 'INSERT'
# Update counters
if (event_name == 'INSERT'):
cnt_insert += 1
elif (event_name == 'MODIFY'):
cnt_modify += 1
elif (event_name == 'REMOVE'):
cnt_remove += 1
else:
print('WARN: Unsupported event_name: {}'.format(event_name))
doc_fields = ddb
# Add metadata
doc_fields['@timestamp'] = now.isoformat()
if DEBUG:
print('DEBUG: doc_fields: {}'.format(doc_fields))
# Generate JSON payload
doc_json = json.dumps(doc_fields)
if DEBUG:
print('DEBUG: doc_json: {}'.format(doc_json))
# Generate ES payload for item
action = {'index': {'_index': doc_table, '_type': doc_type, '_id': doc_index}}
es_actions.append(json.dumps(action)) # Action line with 'index' directive
es_actions.append(doc_json) # Payload line
# Prepare bulk payload
es_actions.append('') # Add one empty line to force final \n
es_payload = '\n'.join(es_actions)
if DEBUG:
print('DEBUG: Payload:', es_payload)
print('INFO: Posting to ES: inserts={} updates={} deletes={}, total_lines={}, bytes_total={}'.format(
cnt_insert, cnt_modify, cnt_remove, len(es_actions)-1, len(es_payload)))
post_to_es(es_payload) # Post to ES with exponential backoff
# Global lambda handler - catches all exceptions to avoid dead letter in the DynamoDB Stream
def lambda_handler(event, context):
try:
return _lambda_handler(event, context)
except Exception:
print('ERROR: ', traceback.format_exc())
@singhmaheshk
Copy link

Hi Art,
Thanks for the code. It worked really well for me for 2 years. From 20th Sept, 2019 I started getting below error in the code. I am not good with python. Can you please help to me understand what could be the reason for below error.

('ERROR: ', 'Traceback (most recent call last):\n File "/var/task/lambda_function.py", line 276, in lambda_handler\n return _lambda_handler(event, context)\n File "/var/task/lambda_function.py", line 270, in _lambda_handler\n post_to_es(es_payload) # Post to ES with exponential backoff\n File "/var/task/lambda_function.py", line 71, in post_to_es\n session = botocore.session.Session({'region': es_region})\n File "/var/runtime/botocore/session.py", line 127, in init\n self.session_var_map.update(session_vars)\n File "/usr/lib64/python2.7/_abcoll.py", line 566, in update\n self[key] = other[key]\n File "/var/runtime/botocore/session.py", line 938, in setitem\n self._update_config_store_from_session_vars(key, value)\n File "/var/runtime/botocore/session.py", line 959, in _update_config_store_from_session_vars\n config_name, env_vars, default, typecast = config_options\nValueError: too many values to unpack\n')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment