Last active
March 6, 2021 04:53
-
-
Save s-fujimoto/a688e0648c52579f558a to your computer and use it in GitHub Desktop.
request to amazon elasticsearch service on aws signature version 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from elasticsearch.connection.http_requests import RequestsHttpConnection | |
from botocore.awsrequest import AWSRequest | |
from botocore.auth import SigV4Auth | |
from botocore.credentials import Credentials | |
from botocore.utils import InstanceMetadataFetcher | |
from elasticsearch.compat import urlencode | |
import os | |
try: | |
import requests | |
REQUESTS_AVAILABLE = True | |
except ImportError: | |
REQUESTS_AVAILABLE = False | |
META_URL = "http://169.254.169.254/" | |
class AwsRequestsHttpConnection(RequestsHttpConnection): | |
def __init__(self, access_key=None, secret_key=None, region=None, token=None, **kwargs): | |
super(AwsRequestsHttpConnection, self).__init__(**kwargs) | |
self.base_url = ":".join(self.base_url.split(":")[:-1]) | |
self.access_key = access_key | |
self.secret_key = secret_key | |
self.token = token | |
self.region = self.get_region(region) | |
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()): | |
aws_url = self.base_url + url | |
if params: | |
aws_url = '%s?%s' % (aws_url, urlencode(params or {})) | |
credentials = self.get_credentials(self.access_key, self.secret_key, self.token) | |
request = AWSRequest(method=method, url=aws_url, data=body) | |
SigV4Auth(credentials, "es", self.region).add_auth(request) | |
self.session.headers["X-Amz-Date"] = request.headers.get("X-Amz-Date") | |
self.session.headers["Authorization"] = request.headers.get("Authorization") | |
if request.headers.get("X-Amz-Security-Token"): | |
self.session.headers["X-Amz-Security-Token"] = request.headers.get("X-Amz-Security-Token") | |
return super(AwsRequestsHttpConnection, self).perform_request(method, url, params, body, timeout, ignore) | |
def get_credentials(self, access_key=None, secret_key=None, token=None): | |
if access_key is None and os.environ.get("AWS_ACCESS_KEY_ID"): | |
access_key = os.environ["AWS_ACCESS_KEY_ID"] | |
secret_key = os.environ["AWS_SECRET_ACCESS_KEY"] | |
token = os.environ["AWS_SESSION_TOKEN"] | |
elif access_key is None and self.is_ec2instance(): | |
c = InstanceMetadataFetcher().retrieve_iam_role_credentials() | |
access_key = c["access_key"] | |
secret_key = c["secret_key"] | |
token = c["token"] | |
return Credentials(access_key, secret_key, token) | |
def get_region(self, region=None): | |
if region is None and os.environ.get("AWS_REGION"): | |
region = os.environ["AWS_REGION"] | |
elif region is None and self.is_ec2instance(): | |
region = requests.get(META_URL + "latest/meta-data/placement/availability-zone").text[:-1] | |
return region | |
def is_ec2instance(self): | |
try: | |
requests.head(META_URL, timeout=1) | |
return True | |
except: | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This worked well. Thanks!