Created
August 20, 2019 18:23
-
-
Save salrashid123/fec7339e245e118654948da3abb8b685 to your computer and use it in GitHub Desktop.
AWS boto to GCS python snippet (not working; do not use!!!)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import boto3 | |
import boto3.session | |
from boto3.session import Session | |
from botocore.credentials import CredentialProvider, RefreshableCredentials | |
from botocore.session import get_session | |
from botocore.signers import RequestSigner | |
import botocore.vendored.requests | |
from botocore.awsrequest import AWSRequest | |
import logging | |
import httplib2 | |
import sys | |
import datetime | |
import pytz | |
import time | |
from datetime import datetime, timedelta | |
from google.oauth2 import service_account | |
import json | |
import google.auth.transport.requests | |
import requests | |
boto3.set_stream_logger(name='botocore') | |
class GCPRequestSigner(RequestSigner): | |
def __init__(self, *args, **kwargs): | |
#super().__init__(*args, **kwargs) | |
return | |
class GCPCredentialProvider(CredentialProvider): | |
def __init__(self): | |
return | |
def load(self): | |
return self | |
def get_frozen_credentials(self): | |
target_scopes = [ | |
'https://www.googleapis.com/auth/devstorage.read_only'] | |
cc = service_account.Credentials.from_service_account_file( | |
'/path/to/svc.json', scopes=target_scopes) | |
request = google.auth.transport.requests.Request() | |
cc.refresh(request) | |
d_with_tz = datetime.now(pytz.utc) + timedelta(seconds=3600) | |
session_credentials = RefreshableCredentials( | |
access_key="", secret_key="", token="", expiry_time=d_with_tz, refresh_using=None, method=None) | |
return session_credentials | |
def _get_session_token(self, config): | |
"" | |
def inject_headers(params, **kwargs): | |
params['headers']['Authorization'] = 'Bearer ya29.c.ffoooo-ehXggNXHHFV78' | |
params['headers']['CustomHeader'] = 'customHeaderValue' | |
def check_headers(*args, **kwargs): | |
print kwargs['request'].headers | |
def before_call_handler(*args, **kwargs): | |
print("before-call event handler " + str(args)) | |
print("before-call. event handler: kwargs: " + str(kwargs)) | |
def after_call_handler(*args, **kwargs): | |
print("after-call. event handler: args: " + str(args)) | |
print("after-call. event handler: kwargs: " + str(kwargs)) | |
def request_created_handler(*args, **kwargs): | |
print("request-created. event handler: args: " + str(args)) | |
print("request-created. event handler: kwargs: " + str(kwargs)) | |
req = kwargs.get('request') | |
req.headers['Authorization'] = 'Bearer bbbb' | |
print(req) | |
def before_sign_handler(*args, **kwargs): | |
print("before-sign. event handler: args: " + str(args)) | |
print("before-sign. event handler: kwargs: " + str(kwargs)) | |
req = kwargs.get('request') | |
req.headers['Authorization'] = 'Bearer foo' | |
print(req) | |
# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/awsrequest.html#botocore-awsrequest | |
hooks = botocore.hooks.HierarchicalEmitter() | |
hooks.register(event_name="before-call.*", handler=before_call_handler) | |
hooks.register(event_name="after-call.*", handler=after_call_handler) | |
hooks.register(event_name="request-created.*", handler=request_created_handler) | |
hooks.register(event_name="before-sign.*", handler=before_sign_handler) | |
bc_session = botocore.session.Session(event_hooks=hooks) | |
cred_provider = bc_session.get_component('credential_provider') | |
cred_provider.insert_before('env', GCPCredentialProvider()) | |
boto3_session = Session(botocore_session=bc_session) | |
#request_signer | |
client = boto3_session.client( | |
"s3", region_name="auto", endpoint_url="https://storage.googleapis.com") | |
client.meta.events.register('before-call', inject_headers) | |
client.meta.events.register('before-sign', check_headers) | |
response = client.list_objects(Bucket='mineral-minutia-802-binauth') | |
print("Objects:") | |
for blob in response["Contents"]: | |
print(blob["Key"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment