Skip to content

Instantly share code, notes, and snippets.

@nulconaux
Created July 7, 2022 07:14
Show Gist options
  • Save nulconaux/8630345450ca9691633220243a3fbe35 to your computer and use it in GitHub Desktop.
Save nulconaux/8630345450ca9691633220243a3fbe35 to your computer and use it in GitHub Desktop.
ETL script example
# https://dev.to/aws/orchestrating-hybrid-workflows-using-amazon-managed-workflows-for-apache-airflow-mwaa-2boc
from copy import copy
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
import sys
import csv
import boto3
import json
import socket
def query_with_fetchone(query2run,secret,region):
try:
# Grab MySQL connection and database settings. We areusing AWS Secrets Manager
# but you could use another service like Hashicorp Vault
# We cannot use Apache Airflow to store these as this script runs stand alone
secret_name = secret
region_name = region
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
info=json.loads(get_secret_value_response['SecretString'])
pw=info['password']
un=info['username']
hs=info['host']
db=info['database']
# Output to the log so we can see and confirm WHERE we are running and WHAT
# we are connecting to
print("Connecting to ",str(hs)," database ", str(db), " as user ", str(un))
print("Database host IP is :", socket.gethostbyname(hs))
print("Source IP is ", socket.gethostname())
conn = MySQLConnection(user=un, password=pw, host=hs, database=db)
cursor = conn.cursor()
query=query2run
print("Query is", str(query))
cursor.execute(query)
records = cursor.fetchall()
c = csv.writer(open("temp.csv","w"))
c.writerows(records)
print("Records exported:")
for row in records:
print(row[0],",",row[1],",",row[2],",",row[3],",",row[4],",",row[5], ",",row[6],",",row[7] )
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
def upload_to_s3(s3bucket,s3folder,region):
# We will upload the temp (temp.csv) file and copy it based on the input params of the script (bucket and dir/file)
try:
s3 = boto3.client('s3', region_name=region)
s3.upload_file('temp.csv',s3bucket,s3folder)
except FileNotFoundError:
print("The file was not found")
return False
except Error as e:
print(e)
if __name__ == '__main__':
try:
arg = sys.argv[2]
except IndexError:
raise SystemExit(f"Usage: {sys.argv[0]} <s3 bucket><s3 file><query><secret><region>")
# The script needs the following arguments to run
# 1. Target S3 bucket where the output of the SQL script will be copied
# 2. Target S3 folder/filename
# 3. The query to execute
# 4. The parameter store (we use AWS Secrets) which holds the values on where to find the MySQL database
# 5. The AWS region
s3bucket=sys.argv[1]
s3folder=sys.argv[2]
query2run=sys.argv[3]
secret=sys.argv[4]
region=sys.argv[5]
query_with_fetchone(query2run,secret,region)
upload_to_s3(s3bucket,s3folder,region)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment