-
-
Save jim3ma/305cf4718509f25bbe016ffd31cb754f to your computer and use it in GitHub Desktop.
Operate ElasticSearch with AWS sigv4 in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime | |
import json | |
import os | |
import time | |
import traceback | |
import urlparse | |
import botocore.auth | |
import botocore.awsrequest | |
import botocore.credentials | |
import botocore.endpoint | |
import botocore.session | |
import boto3.dynamodb.types | |
# The following parameters are required to configure the ES cluster | |
ES_ENDPOINT = 'search-bookshout-books-geltjfzt2bwmvmczlsx3qg7oo4.us-east-1.es.amazonaws.com' | |
# The following parameters can be optionally customized | |
DOC_TABLE_FORMAT = '{}' # Python formatter to generate index name from the DynamoDB table name | |
DOC_TYPE_FORMAT = '{}_type' # Python formatter to generate type name from the DynamoDB table name, default is to add '_type' suffix | |
ES_REGION = None # If not set, use the runtime lambda region | |
ES_MAX_RETRIES = 3 # Max number of retries for exponential backoff | |
DEBUG = True # Set verbose debugging information | |
class ES_Exception(Exception): | |
'''Exception capturing status_code from Client Request''' | |
status_code = 0 | |
payload = '' | |
def __init__(self, status_code, payload): | |
self.status_code = status_code | |
self.payload = payload | |
Exception.__init__(self, 'ES_Exception: status_code={}, payload={}'.format(status_code, payload)) | |
# Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request | |
def post_data_to_es(payload, region, creds, host, path, method='POST', proto='https://'): | |
'''Post data to ES endpoint with SigV4 signed http headers''' | |
sigv4 = botocore.auth.SigV4Auth(creds, 'es', region) | |
params = {'context': {}, 'method': method, 'url': proto + host + path, 'region': region, 'headers': {'Host': host}, 'body': payload} | |
req = botocore.awsrequest.create_request_object(params) | |
sigv4.add_auth(req) | |
prep_req = req.prepare() | |
http_session = botocore.endpoint.PreserveAuthSession() | |
res = http_session.send(prep_req) | |
if res.status_code >= 200 and res.status_code <= 299: | |
return res._content | |
else: | |
raise ES_Exception(res.status_code, res._content) | |
# High-level POST data to Amazon Elasticsearch Service with exponential backoff | |
# according to suggested algorithm: http://docs.aws.amazon.com/general/latest/gr/api-retries.html | |
def post_to_es(payload): | |
'''Post data to ES cluster with exponential backoff''' | |
# Get aws_region and credentials to post signed URL to ES | |
es_region = ES_REGION or os.environ['AWS_REGION'] | |
session = botocore.session.Session({'region': es_region}) | |
creds = botocore.credentials.get_credentials(session) | |
es_url = urlparse.urlparse(ES_ENDPOINT) | |
es_endpoint = es_url.netloc or es_url.path # Extract the domain name in ES_ENDPOINT | |
# Post data with exponential backoff | |
retries = 0 | |
while (retries < ES_MAX_RETRIES): | |
if retries > 0: | |
millis = 2**retries * .100 | |
if DEBUG: | |
print('DEBUG: Wait for {:.1f} seconds'.format(millis)) | |
time.sleep(millis) | |
try: | |
es_ret_str = post_data_to_es(payload, es_region, creds, es_endpoint, '/_bulk') | |
if DEBUG: | |
print('DEBUG: Return from ES: {}'.format(es_ret_str)) | |
es_ret = json.loads(es_ret_str) | |
if es_ret['errors']: | |
print('ERROR: ES post unsucessful, errors present, took={}ms'.format(es_ret['took'])) | |
# Filter errors | |
es_errors = [item for item in es_ret['items'] if item.get('index').get('error')] | |
print('ERROR: List of items with errors: {}'.format(json.dumps(es_errors))) | |
else: | |
print('INFO: ES post successful, took={}ms'.format(es_ret['took'])) | |
break # Sending to ES was ok, break retry loop | |
except ES_Exception as e: | |
if (e.status_code >= 500) and (e.status_code <= 599): | |
retries += 1 # Candidate for retry | |
else: | |
raise # Stop retrying, re-raise exception | |
# Compute a compound doc index from the key(s) of the object in lexicographic order: "k1=key_val1|k2=key_val2" | |
def compute_doc_index(keys_raw, deserializer): | |
index = [] | |
for key in sorted(keys_raw): | |
index.append('{}={}'.format(key, deserializer.deserialize(keys_raw[key]))) | |
return '|'.join(index) | |
def _lambda_handler(event, context): | |
if DEBUG: | |
print('DEBUG: Event: {}'.format(event)) | |
records = event['Records'] | |
now = datetime.datetime.utcnow() | |
es_actions = [] # Items to be added/updated/removed from ES - for bulk API | |
cnt_insert = cnt_modify = cnt_remove = 0 | |
for record in records: | |
# Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay) | |
if DEBUG: | |
print('DEBUG: Record: {}'.format(record)) | |
if record.get('eventSource') == 'aws:kinesis': | |
ddb = json.loads(base64.b64decode(record['kinesis']['data'])) | |
doc_table = 'production-book' | |
doc_type = 'book' | |
doc_index = ddb['isbn'] | |
else: | |
print('ERROR: Ignoring non dynamodb event sources: {}'.format(record.get('eventSource'))) | |
continue | |
if DEBUG: | |
print('DEBUG: doc_table={}'.format(doc_table)) | |
print('DEBUG: doc_index={}'.format(doc_index)) | |
event_name = 'INSERT' | |
# Update counters | |
if (event_name == 'INSERT'): | |
cnt_insert += 1 | |
elif (event_name == 'MODIFY'): | |
cnt_modify += 1 | |
elif (event_name == 'REMOVE'): | |
cnt_remove += 1 | |
else: | |
print('WARN: Unsupported event_name: {}'.format(event_name)) | |
doc_fields = ddb | |
# Add metadata | |
doc_fields['@timestamp'] = now.isoformat() | |
if DEBUG: | |
print('DEBUG: doc_fields: {}'.format(doc_fields)) | |
# Generate JSON payload | |
doc_json = json.dumps(doc_fields) | |
if DEBUG: | |
print('DEBUG: doc_json: {}'.format(doc_json)) | |
# Generate ES payload for item | |
action = {'index': {'_index': doc_table, '_type': doc_type, '_id': doc_index}} | |
es_actions.append(json.dumps(action)) # Action line with 'index' directive | |
es_actions.append(doc_json) # Payload line | |
# Prepare bulk payload | |
es_actions.append('') # Add one empty line to force final \n | |
es_payload = '\n'.join(es_actions) | |
if DEBUG: | |
print('DEBUG: Payload:', es_payload) | |
print('INFO: Posting to ES: inserts={} updates={} deletes={}, total_lines={}, bytes_total={}'.format( | |
cnt_insert, cnt_modify, cnt_remove, len(es_actions)-1, len(es_payload))) | |
post_to_es(es_payload) # Post to ES with exponential backoff | |
# Global lambda handler - catches all exceptions to avoid dead letter in the DynamoDB Stream | |
def lambda_handler(event, context): | |
try: | |
return _lambda_handler(event, context) | |
except Exception: | |
print('ERROR: ', traceback.format_exc()) |
Try botocore.session.Session(region_name = es_region)
I also got the same ERROR.
region_name = es_region
didn't help
Hi Guys,
So far I have following two working solution for the issue.
- Use AWS lambda layer with old code to work - aws-amplify/amplify-cli#2353
- Use the updated code from this code sample - https://github.com/TomPengxiangWang/Python/blob/master/dynamo-to-elastic.py
the solution is to completely remove parameters from Session (just call Session() )
Taken actually from that code:
https://github.com/TomPengxiangWang/Python/blob/master/dynamo-to-elastic.py
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Jim,
Thanks for the code. It worked really well for me for 2 years. From 20th Sept, 2019 I started getting below error in the code. I am not good with python. Can you please help to me understand what could be the reason for below error.
('ERROR: ', 'Traceback (most recent call last):\n File "/var/task/lambda_function.py", line 276, in lambda_handler\n return _lambda_handler(event, context)\n File "/var/task/lambda_function.py", line 270, in _lambda_handler\n post_to_es(es_payload) # Post to ES with exponential backoff\n File "/var/task/lambda_function.py", line 71, in post_to_es\n session = botocore.session.Session({'region': es_region})\n File "/var/runtime/botocore/session.py", line 127, in init\n self.session_var_map.update(session_vars)\n File "/usr/lib64/python2.7/_abcoll.py", line 566, in update\n self[key] = other[key]\n File "/var/runtime/botocore/session.py", line 938, in setitem\n self._update_config_store_from_session_vars(key, value)\n File "/var/runtime/botocore/session.py", line 959, in _update_config_store_from_session_vars\n config_name, env_vars, default, typecast = config_options\nValueError: too many values to unpack\n')