Skip to content

Instantly share code, notes, and snippets.

@fillipo
Last active November 26, 2020 06:13
Show Gist options
  • Save fillipo/712c382a4e1faaab6ec4d5114b48b1eb to your computer and use it in GitHub Desktop.
Save fillipo/712c382a4e1faaab6ec4d5114b48b1eb to your computer and use it in GitHub Desktop.

Query Cloud SQL from Cloud Composer (Apache Airflow) task on GCP

If you need to do a proof of concept on Cloud Composer and Cloud SQL, here are some steps on how to do a SELECT query in Cloud SQL database from Cloud Composer instance - an Apache AirFlow managed service by Google Cloud Platform.

Heads up

The following steps will incur you costs depending on the usage time you have for each instance created (both Cloud SQL and Composer).

1. Create a Cloud Composer cluster (environment)

Before you start, make sure you have created and selected a GCP project to work with.

To create your Composer cluster, follow official instructions here.

This step can take several minutes, so I suggest you do this first, while we complete the initial steps. We will get back to it once the service it's up and running.

Behind the scenes, your Cloud Composer is running on a GKE cluster - to which you will connect in latter steps.

2. Create a Cloud SQL (MySQL) Instance

To create a Cloud SQL (MySQL) instance, follow official instructions here.

3. Setup a Cloud SQL Proxy to access your SQL instance

Cloud SQL Proxy allows your applications to connect securely to a Cloud SQL instance running on GCP without having to manually configuring security measures such as whitelisting IPs or SSL.

It simply runs a small service close do your application, exposes a local IP and port to which your app connects to, and it automatically and securely forwards that connection to your Cloud SQL instance on GCP.

To test it and connect to our MySQL instance for the first time, we will install and run Cloud SQL Proxy on your local machine (this can also be done on Google Cloud shell)

To run Cloud SQL Proxy locally, follow the official steps here. You can also follow this video tutorial.

4. Create an example Database and Table

Once you connect to your Cloud SQL instance using the Proxy, you should be able to run SQL commands in your terminal.

Let's create an example database, table, and insert some example data for us query from Composer later.

Inside your CLoud SQL instance, on your mysql> prompt of command type the commands.

Create a database named "books"

CREATE DATABASE books;

Let's use the "books" database

USE books;

Create a the first table for that database named "author"

CREATE TABLE author (
    id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(50) NOT NULL
);

Insert some example data

INSERT INTO author (name) VALUES ("Joe Doe"), ("Mary Smith"), ("Greg Jensen");

Query the table to make sure results are inserted

SELECT * from author;

If you see as a result the names we inserted, and we are good.

5. Service Account and credentials yaml

From now on you will be using the command line to call GCP's commands. You can either use the Cloud Shell online or install the gcloud SDK on your local machine.

You need a service-account token with "Project Editor" privilegies, and we will assume the file is in $HOME/credentials.json. Warning: A service account with Project Editor priviledges is very permissive, so make sure you handle that credentials file safely.

Refer to the documentation to get the json credential file. For Cloud Shell, you can simply copy the contents of the file you downloaded to a file named credentials.json.

6. Create GKE Deployment / Service yaml files for Cloud SQL Proxy

We need to create a Deployment and Service yaml files for the Cloud SQL Proxy to run in your Cloud Composer GKE cluster.

Create two files: sqlproxy-deployment.yaml and sqlproxy-service.yaml

Pre-Requisites:

You will need your Cloud SQL instance connection name ( in the format PROJECT-ID:REGION:INSTANCE-ID) to replace the placeholder [YOUR_CLUD_SQL_CONNECTION_STRING] in your file below. You can see your connection string by going to your Cloud SQL Instance details page.

Here are the content of the two files.

sqlproxy-deployment.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: cloudsqlproxy
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: cloudsqlproxy
    spec:
      containers:
       # Make sure to specify image tag in production
       # Check out the newest version in release page
       # https://github.com/GoogleCloudPlatform/cloudsql-proxy/releases
      - image: b.gcr.io/cloudsql-docker/gce-proxy:latest
       # 'Always' if imageTag is 'latest', else set to 'IfNotPresent'
        imagePullPolicy: Always
        name: cloudsqlproxy
        command:
        - /cloud_sql_proxy
        - -dir=/cloudsql
        # Make sure you have the right CLoud SQL Connection string  here
        - -instances=[YOUR_CLUD_SQL_CONNECTION_STRING]=tcp:0.0.0.0:3306
        - -credential_file=/credentials/credentials.json
        # set term_timeout if require graceful handling of shutdown
        # NOTE: proxy will stop accepting new connections; only wait on existing connections
        - term_timeout=10s
        lifecycle:
          preStop:
            exec:
              # (optional) add a preStop hook so that termination is delayed
              # this is required if your server still require new connections (e.g., connection pools)
              command: ['sleep', '10']
        ports:
        - name: port-database1
          containerPort: 3306
        volumeMounts:
        - mountPath: /cloudsql
          name: cloudsql
        - mountPath: /credentials
          name: service-account-token
      volumes:
      - name: cloudsql
        emptyDir:
      - name: service-account-token
        secret:
          secretName: service-account-token

sqlproxy-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: sqlproxyservice
spec:
  ports:
  - port: 3306
    targetPort: port-database1
  selector:
    app: cloudsqlproxy

7. Connect to Cloud Composer GKE cluster and select appropriate namespace

By now you should have your Cloud Composer up and running - make sure you see your Composer environment on your GCP console.

Like mentioned earlier, your Cloud Composer environment is actually running on a GKE cluster and we need to have access to it to install the Cloud SQL Proxy.

Go to your GCP Kubernetes overview page and, based on the name (which will most likely have your composer instance name) click on your Cloud Composer GKE cluster to view its details.

You will need to click on the Connect button to connect to your cluster and allow you to run kubectl in it.

Query Cloud SQL from Composer on GCP

Copy the command string that looks something like (replace values in brackets with your own cluster details):

gcloud container clusters get-credentials [your_composer_gke_cluster_name] --zone [zone_of_your_cluster] --project [your_project]

And run it on your terminal or Cloud Shell.

On the GKE console, click on "Workloads" on the left menu to be able to view Composer's Deployments and identify the namespace your Composer Scheduler Deployment is running - refer to the image below.

Query Cloud SQL from Composer on GCP

You can also see it by running kubectl get namespaces command.

Copy the the namespace of your composer scheduler pod, so we can run the remaining commands on that specific namespace (instead of the default one). Once you copied, set the new namespace by running the following:

kubectl config set-context --current --namespace=<insert-namespace-name-here>

To check your current context namespace, run

kubectl config view | grep namespace:

and you should see the name of your Composer scheduler namespace listed.

8. Running Cloud SQL Deployment & Service

Create Secret

In the same directory of your credentials.json lets create a GKE secret to store your credentials file securely in the cluster. Run:

kubectl create secret generic service-account-token --from-file=credentials.json=./credentials.json

Apply deployment & service yaml files

To apply deployment, run

kubectl apply -f ./sqlproxy-deployment.yaml

Wait a few seconds and if you run kubectl get deployment you should see your "cloudsqlproxy" deployment listed as deployed.

To apply service, run:

kubectl apply -f ./sqlproxy-service.yaml

Wait a few seconds and if you run kubectl get service you should see your "sqlproxyservice" service listed as running.

Verify your Cloud SQL Proxy is running on GKE

Let's quickly connect Composer Scheduler pod (where your DAGs run on) and see if you are able to access your SQL Proxy pod from it and run a SQL query in your Cloud SQL database we created earlier.

Identify the Pod's name:

kubectl get pod | grep sched

And copy the name of the pod that contains the word "scheduler" in it, and then run:

kubectl exec -it [name-of-the-pod] -- /bin/bash

Once you are in the Pod's prompt of command, you can login to your mysql instance (which will be done automatically through the sqlproxyservice on port 3306 we deployed earlier):s

mysql --host=sqlproxyservice --port=3306 -u [your_username] -p

If you see your mysql> prompt, that is great sign, means you connected successfully.

Now, within your mysql> prompt, try to query your "author" table from the "books" database you created in your Cloud SQL instance previously, type:

USE books;

Query the table to make sure results are inserted

SELECT * from author;

If you see as a result the names of the authors we inserted, such as John Doe and Mary Smith, we are ready to now do the same from our DAG within Cloud Composer.

9. Connect from Composer DAG to Cloud SQL

At this point, your Clodu SQL Proxy is running and you are able to access it within the Composer scheduler Pod, where your DAGs environment is run. Although there are several ways to connect your Composer DAGs code to your Cloud SQL, but as long you provide the sqlproxyservice as your database host and give your database credentials (user and pass), you are able to connect to it.

Connect to Cloud SQL from the Composer Web UI

In this tutorial, for simplicity for a proof of concept, we will provide our database credentials from within the Composer UI.

Access the Composer page on GCP console and click on your Cloud Composer instance to view its details.

Under the "Environment Configuration" tab, look for the "Airflow web UI" which will provide the URL of the Composer web admin page.

Query Cloud SQL from Composer on GCP

On the top menu bar, click on Admin > Connections. This will open the connections page (read more about connections).

Query Cloud SQL from Composer on GCP

Click on "Create" and add your connection details (select MySQL for Conn Type ), including your database host sqlproxyservice, username, and password and click Save.

Query Cloud SQL from Composer on GCP

Getting IP address from your SQL Proxy Host instead

If instead of the host alias sqlproxyservice of your service container (that connects you do the Cloud SQL instance) you prefer to use the actual IP address, you can type

kubectl get service

in your cloud console and use the output IP address shown for the sqlproxyservice service as your database host.

10. Create DAG python

Now that we have everything setup, you can create and test your DAG within your Python file.

Here is an example where we use the mysql_to_gcs Airflow operator see docs and provide to it the connection we provided (in this case my_connection) in the Airflow UI.

from airflow import DAG

from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 30),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'schedule_interval': '*/1 * * * *',
}

dag = DAG('mysql-demo', catchup=False, default_args=default_args)

cloud_storage_bucket_name = 'composerdemo-bucket'

t1 = MySqlToGoogleCloudStorageOperator(
    task_id='import_mysql',
    mysql_conn_id='my_connection',
    google_cloud_storage_conn_id='google_cloud_storage_default',
    provide_context=True,
    sql=""" SELECT * FROM books.authors """,
    bucket=cloud_storage_bucket_name,
    filename='composer_output_file',
    dag=dag)


t1
@johnwesonga
Copy link

At which point did you create the "service-account-token" secret?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment