Skip to content

Instantly share code, notes, and snippets.

@sisyphushappy
Created May 24, 2022 04:14
Show Gist options
  • Save sisyphushappy/044b8c40ee40ded331372187d9aea986 to your computer and use it in GitHub Desktop.
Save sisyphushappy/044b8c40ee40ded331372187d9aea986 to your computer and use it in GitHub Desktop.
Python lambda handler (update_github_remote_tag_file_handler) to update a tag file in a remote GitHub repository with a value
import base64
import boto3
from botocore.exceptions import ClientError
from git import Actor, Repo
import json
import os
def handler(event, context):
print('request: {}'.format(json.dumps(event)))
# set environment variables
author_name = os.getenv('AUTHOR_NAME')
author_email = os.getenv('AUTHOR_EMAIL')
tag_file_name = os.getenv('TAG_FILE_NAME')
tag_value = os.getenv('TAG_VALUE')
github_account_name = os.getenv('GITHUB_ACCOUNT_NAME')
github_repo_name = os.getenv('GITHUB_REPO_NAME')
github_username = os.getenv('GITHUB_USERNAME')
github_access_token_secret_arn = os.getenv('GITHUB_ACCESS_TOKEN_SECRET_ARN')
region_name = os.getenv('REGION')
github_access_token = get_secret(github_access_token_secret_arn, region_name)
# set local filepath variables
repo_local_path = "/tmp/repo"
tag_file_local_path = f"{repo_local_path}/{tag_file_name}"
# git clone the remote repository
remote = f"https://{github_username}:{github_access_token}@github.com/{github_account_name}/{github_repo_name}.git"
repo = Repo.clone_from(remote, repo_local_path)
# update tag file with new value
with open(tag_file_local_path, "w", encoding="utf-8") as f:
f.write(tag_value)
# create robot actor
author = Actor(author_name, author_email)
# git add and commit tag file
repo.index.add([tag_file_local_path])
repo.index.commit(f"Update hash of tag file {tag_file_name}", author=author)
# push to remote origin
origin = repo.remotes[0]
origin.push().raise_if_error()
def get_secret(secret_arn, region_name):
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
return get_secret_value_response['SecretString']
else:
return base64.b64decode(get_secret_value_response['SecretBinary'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment