Skip to content

Instantly share code, notes, and snippets.

@s-fujimoto
Last active March 6, 2021 04:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s-fujimoto/a688e0648c52579f558a to your computer and use it in GitHub Desktop.
Save s-fujimoto/a688e0648c52579f558a to your computer and use it in GitHub Desktop.
request to amazon elasticsearch service on aws signature version 4
from elasticsearch.connection.http_requests import RequestsHttpConnection
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.credentials import Credentials
from botocore.utils import InstanceMetadataFetcher
from elasticsearch.compat import urlencode
import os
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
META_URL = "http://169.254.169.254/"
class AwsRequestsHttpConnection(RequestsHttpConnection):
def __init__(self, access_key=None, secret_key=None, region=None, token=None, **kwargs):
super(AwsRequestsHttpConnection, self).__init__(**kwargs)
self.base_url = ":".join(self.base_url.split(":")[:-1])
self.access_key = access_key
self.secret_key = secret_key
self.token = token
self.region = self.get_region(region)
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
aws_url = self.base_url + url
if params:
aws_url = '%s?%s' % (aws_url, urlencode(params or {}))
credentials = self.get_credentials(self.access_key, self.secret_key, self.token)
request = AWSRequest(method=method, url=aws_url, data=body)
SigV4Auth(credentials, "es", self.region).add_auth(request)
self.session.headers["X-Amz-Date"] = request.headers.get("X-Amz-Date")
self.session.headers["Authorization"] = request.headers.get("Authorization")
if request.headers.get("X-Amz-Security-Token"):
self.session.headers["X-Amz-Security-Token"] = request.headers.get("X-Amz-Security-Token")
return super(AwsRequestsHttpConnection, self).perform_request(method, url, params, body, timeout, ignore)
def get_credentials(self, access_key=None, secret_key=None, token=None):
if access_key is None and os.environ.get("AWS_ACCESS_KEY_ID"):
access_key = os.environ["AWS_ACCESS_KEY_ID"]
secret_key = os.environ["AWS_SECRET_ACCESS_KEY"]
token = os.environ["AWS_SESSION_TOKEN"]
elif access_key is None and self.is_ec2instance():
c = InstanceMetadataFetcher().retrieve_iam_role_credentials()
access_key = c["access_key"]
secret_key = c["secret_key"]
token = c["token"]
return Credentials(access_key, secret_key, token)
def get_region(self, region=None):
if region is None and os.environ.get("AWS_REGION"):
region = os.environ["AWS_REGION"]
elif region is None and self.is_ec2instance():
region = requests.get(META_URL + "latest/meta-data/placement/availability-zone").text[:-1]
return region
def is_ec2instance(self):
try:
requests.head(META_URL, timeout=1)
return True
except:
return False
@sapiensantiquus
Copy link

This worked well. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment