If you need to do a proof of concept on Cloud Composer and Cloud SQL, here are some steps on how to do a SELECT query in Cloud SQL database from Cloud Composer instance - an Apache AirFlow managed service by Google Cloud Platform.
The following steps will incur you costs depending on the usage time you have for each instance created (both Cloud SQL and Composer).
Before you start, make sure you have created and selected a GCP project to work with.
To create your Composer cluster, follow official instructions here.
This step can take several minutes, so I suggest you do this first, while we complete the initial steps. We will get back to it once the service it's up and running.
Behind the scenes, your Cloud Composer is running on a GKE cluster - to which you will connect in latter steps.
To create a Cloud SQL (MySQL) instance, follow official instructions here.
Cloud SQL Proxy allows your applications to connect securely to a Cloud SQL instance running on GCP without having to manually configuring security measures such as whitelisting IPs or SSL.
It simply runs a small service close do your application, exposes a local IP and port to which your app connects to, and it automatically and securely forwards that connection to your Cloud SQL instance on GCP.
To test it and connect to our MySQL instance for the first time, we will install and run Cloud SQL Proxy on your local machine (this can also be done on Google Cloud shell)
To run Cloud SQL Proxy locally, follow the official steps here. You can also follow this video tutorial.
Once you connect to your Cloud SQL instance using the Proxy, you should be able to run SQL commands in your terminal.
Let's create an example database, table, and insert some example data for us query from Composer later.
Inside your CLoud SQL instance, on your mysql>
prompt of command type the commands.
Create a database named "books"
CREATE DATABASE books;
Let's use the "books" database
USE books;
Create a the first table for that database named "author"
CREATE TABLE author (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL
);
Insert some example data
INSERT INTO author (name) VALUES ("Joe Doe"), ("Mary Smith"), ("Greg Jensen");
Query the table to make sure results are inserted
SELECT * from author;
If you see as a result the names we inserted, and we are good.
From now on you will be using the command line to call GCP's commands. You can either use the Cloud Shell online or install the gcloud
SDK on your local machine.
You need a service-account token with "Project Editor" privilegies, and we will assume the file is in $HOME/credentials.json. Warning: A service account with Project Editor priviledges is very permissive, so make sure you handle that credentials file safely.
Refer to the documentation to get the json credential file. For Cloud Shell, you can simply copy the contents of the file you downloaded to a file named credentials.json
.
We need to create a Deployment and Service yaml files for the Cloud SQL Proxy to run in your Cloud Composer GKE cluster.
Create two files: sqlproxy-deployment.yaml
and sqlproxy-service.yaml
Pre-Requisites:
You will need your Cloud SQL instance connection name ( in the format PROJECT-ID:REGION:INSTANCE-ID
) to replace the placeholder [YOUR_CLUD_SQL_CONNECTION_STRING]
in your file below. You can see your connection string by going to your Cloud SQL Instance details page.
Here are the content of the two files.
sqlproxy-deployment.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: cloudsqlproxy
spec:
replicas: 1
template:
metadata:
labels:
app: cloudsqlproxy
spec:
containers:
# Make sure to specify image tag in production
# Check out the newest version in release page
# https://github.com/GoogleCloudPlatform/cloudsql-proxy/releases
- image: b.gcr.io/cloudsql-docker/gce-proxy:latest
# 'Always' if imageTag is 'latest', else set to 'IfNotPresent'
imagePullPolicy: Always
name: cloudsqlproxy
command:
- /cloud_sql_proxy
- -dir=/cloudsql
# Make sure you have the right CLoud SQL Connection string here
- -instances=[YOUR_CLUD_SQL_CONNECTION_STRING]=tcp:0.0.0.0:3306
- -credential_file=/credentials/credentials.json
# set term_timeout if require graceful handling of shutdown
# NOTE: proxy will stop accepting new connections; only wait on existing connections
- term_timeout=10s
lifecycle:
preStop:
exec:
# (optional) add a preStop hook so that termination is delayed
# this is required if your server still require new connections (e.g., connection pools)
command: ['sleep', '10']
ports:
- name: port-database1
containerPort: 3306
volumeMounts:
- mountPath: /cloudsql
name: cloudsql
- mountPath: /credentials
name: service-account-token
volumes:
- name: cloudsql
emptyDir:
- name: service-account-token
secret:
secretName: service-account-token
sqlproxy-service.yaml
apiVersion: v1
kind: Service
metadata:
name: sqlproxyservice
spec:
ports:
- port: 3306
targetPort: port-database1
selector:
app: cloudsqlproxy
By now you should have your Cloud Composer up and running - make sure you see your Composer environment on your GCP console.
Like mentioned earlier, your Cloud Composer environment is actually running on a GKE cluster and we need to have access to it to install the Cloud SQL Proxy.
Go to your GCP Kubernetes overview page and, based on the name (which will most likely have your composer instance name) click on your Cloud Composer GKE cluster to view its details.
You will need to click on the Connect button to connect to your cluster and allow you to run kubectl
in it.
Copy the command string that looks something like (replace values in brackets with your own cluster details):
gcloud container clusters get-credentials [your_composer_gke_cluster_name] --zone [zone_of_your_cluster] --project [your_project]
And run it on your terminal or Cloud Shell.
On the GKE console, click on "Workloads" on the left menu to be able to view Composer's Deployments and identify the namespace your Composer Scheduler Deployment is running - refer to the image below.
You can also see it by running kubectl get namespaces
command.
Copy the the namespace of your composer scheduler pod, so we can run the remaining commands on that specific namespace (instead of the default one). Once you copied, set the new namespace by running the following:
kubectl config set-context --current --namespace=<insert-namespace-name-here>
To check your current context namespace, run
kubectl config view | grep namespace:
and you should see the name of your Composer scheduler namespace listed.
In the same directory of your credentials.json
lets create a GKE secret to store your credentials file securely in the cluster. Run:
kubectl create secret generic service-account-token --from-file=credentials.json=./credentials.json
To apply deployment, run
kubectl apply -f ./sqlproxy-deployment.yaml
Wait a few seconds and if you run kubectl get deployment
you should see your "cloudsqlproxy" deployment listed as deployed.
To apply service, run:
kubectl apply -f ./sqlproxy-service.yaml
Wait a few seconds and if you run kubectl get service
you should see your "sqlproxyservice" service listed as running.
Let's quickly connect Composer Scheduler pod (where your DAGs run on) and see if you are able to access your SQL Proxy pod from it and run a SQL query in your Cloud SQL database we created earlier.
Identify the Pod's name:
kubectl get pod | grep sched
And copy the name of the pod that contains the word "scheduler" in it, and then run:
kubectl exec -it [name-of-the-pod] -- /bin/bash
Once you are in the Pod's prompt of command, you can login to your mysql instance (which will be done automatically through the sqlproxyservice
on port 3306 we deployed earlier):s
mysql --host=sqlproxyservice --port=3306 -u [your_username] -p
If you see your mysql>
prompt, that is great sign, means you connected successfully.
Now, within your mysql>
prompt, try to query your "author" table from the "books" database you created in your Cloud SQL instance previously, type:
USE books;
Query the table to make sure results are inserted
SELECT * from author;
If you see as a result the names of the authors we inserted, such as John Doe and Mary Smith, we are ready to now do the same from our DAG within Cloud Composer.
At this point, your Clodu SQL Proxy is running and you are able to access it within the Composer scheduler Pod, where your DAGs environment is run. Although there are several ways to connect your Composer DAGs code to your Cloud SQL, but as long you provide the sqlproxyservice
as your database host and give your database credentials (user and pass), you are able to connect to it.
In this tutorial, for simplicity for a proof of concept, we will provide our database credentials from within the Composer UI.
Access the Composer page on GCP console and click on your Cloud Composer instance to view its details.
Under the "Environment Configuration" tab, look for the "Airflow web UI" which will provide the URL of the Composer web admin page.
On the top menu bar, click on Admin > Connections. This will open the connections page (read more about connections).
Click on "Create" and add your connection details (select MySQL for Conn Type
), including your database host sqlproxyservice
, username, and password and click Save.
If instead of the host alias sqlproxyservice
of your service container (that connects you do the Cloud SQL instance) you prefer to use the actual IP address, you can type
kubectl get service
in your cloud console and use the output IP address shown for the sqlproxyservice
service as your database host.
Now that we have everything setup, you can create and test your DAG within your Python file.
Here is an example where we use the mysql_to_gcs
Airflow operator see docs and provide to it the connection we provided (in this case my_connection
) in the Airflow UI.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 30),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'schedule_interval': '*/1 * * * *',
}
dag = DAG('mysql-demo', catchup=False, default_args=default_args)
cloud_storage_bucket_name = 'composerdemo-bucket'
t1 = MySqlToGoogleCloudStorageOperator(
task_id='import_mysql',
mysql_conn_id='my_connection',
google_cloud_storage_conn_id='google_cloud_storage_default',
provide_context=True,
sql=""" SELECT * FROM books.authors """,
bucket=cloud_storage_bucket_name,
filename='composer_output_file',
dag=dag)
t1
At which point did you create the "service-account-token" secret?