Skip to content

Instantly share code, notes, and snippets.

@DerPauli
Created April 11, 2020 15:23
Show Gist options
  • Save DerPauli/bba00e550444b992133c4cf55c49aa4d to your computer and use it in GitHub Desktop.
Save DerPauli/bba00e550444b992133c4cf55c49aa4d to your computer and use it in GitHub Desktop.
Lambda function to change a imageDetails.json file into a imagedefinitions.json file
import json
import boto3
import zipfile
import tempfile
code_pipeline = boto3.client('codepipeline')
s3 = boto3.client('s3')
def lambda_handler(event, context):
job_id = event['CodePipeline.job']['id']
bucketN = event['CodePipeline.job']['data']['inputArtifacts'][0]['location']['s3Location']['bucketName']
objectK = event['CodePipeline.job']['data']['inputArtifacts'][0]['location']['s3Location']['objectKey']
outputN = event['CodePipeline.job']['data']['outputArtifacts'][0]['location']['s3Location']['bucketName']
outputK = event['CodePipeline.job']['data']['outputArtifacts'][0]['location']['s3Location']['objectKey']
tmp_file = tempfile.NamedTemporaryFile()
with tempfile.NamedTemporaryFile() as tmp_file:
s3.download_file(bucketN, objectK, tmp_file.name)
with zipfile.ZipFile(tmp_file.name, 'r') as zip:
json_data = zip.read('imageDetail.json')
obj = json.loads(json_data)
image_uri = obj['ImageURI'].split('@')[0]
image_tag = obj['ImageTags'][0]
repo_name = obj['RepositoryName']
image = f"{image_uri}:{image_tag}"
print(image)
definition = [{
'name': repo_name,
'imageUri': image
}]
with open('/tmp/imagedefinitions.json', 'w') as outfile:
json.dump(definition, outfile)
# with open('/tmp/imagedefinitions.json') as json_file:
# data = json.load(json_file)
# print('data["name"] = ' + data['name'])
# print('data["imageUri"] = ' + data['imageUri'])
# if(data['name'] != repo_name || data['imageUri'] != image_uri)
# code_pipeline.put_job_failure_result(jobId=job_id)
zipfile.ZipFile('/tmp/imagedefinitions.zip', mode='w').write("/tmp/imagedefinitions.json")
response = s3.upload_file('/tmp/imagedefinitions.zip', outputN, outputK)
code_pipeline.put_job_success_result(jobId=job_id)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment