-
-
Save yu-iskw/42f9f0aa6f2ff0a2a375d43881e13b49 to your computer and use it in GitHub Desktop.
#!/bin/bash | |
airflow run add_gcp_connection add_gcp_connection_python 2001-01-01 |
import json | |
from airflow import DAG, settings | |
from airflow.models import Connection | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import datetime | |
from common.utils import get_default_google_cloud_connection_id | |
default_args = { | |
'owner': 'airflow', | |
'email': ['airflow@example.com'], | |
'depends_on_past': False, | |
'start_date': datetime(2001, 01, 01), | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 5, | |
'priority_weight': 1000, | |
} | |
def add_gcp_connection(ds, **kwargs): | |
""""Add a airflow connection for GCP""" | |
new_conn = Connection( | |
conn_id=get_default_google_cloud_connection_id(), | |
conn_type='google_cloud_platform', | |
) | |
scopes = [ | |
"https://www.googleapis.com/auth/pubsub", | |
"https://www.googleapis.com/auth/datastore", | |
"https://www.googleapis.com/auth/bigquery", | |
"https://www.googleapis.com/auth/devstorage.read_write", | |
"https://www.googleapis.com/auth/logging.write", | |
"https://www.googleapis.com/auth/cloud-platform", | |
] | |
conn_extra = { | |
"extra__google_cloud_platform__scope": ",".join(scopes), | |
"extra__google_cloud_platform__project": "your-gcp-project", | |
"extra__google_cloud_platform__key_path": '/var/local/google_cloud_default.json' | |
} | |
conn_extra_json = json.dumps(conn_extra) | |
new_conn.set_extra(conn_extra_json) | |
session = settings.Session() | |
if not (session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first()): | |
session.add(new_conn) | |
session.commit() | |
else: | |
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n' | |
msg = msg.format(conn_id=new_conn.conn_id) | |
print(msg) | |
dag = DAG( | |
'add_gcp_connection', | |
default_args=default_args, | |
schedule_interval="@once") | |
# Task to add a connection | |
t1 = PythonOperator( | |
dag=dag, | |
task_id='add_gcp_connection_python', | |
python_callable=add_gcp_connection, | |
provide_context=True, | |
) | |
@yu-iskw
Many thanks for this example.
@Salad-King
You don't have to use get_default_google_cloud_connection_id() function, just replace it with a string like "get_default_google_cloud_connection_id".
It will be the value of the google_cloud_conn_id in airflow.
I was using DAG from this example for some time with postgress backend DB and everything worked perfectly well. Then we switched to cloudsql database and now running add_gcp_connection DAG does not insert anything into connection table. I am pretty new to Airflow and I would appreciate any suggestion what could be the reason and where I could look for an answer.
Hi Team,
When iam trying to excute the similar to the above script , iam facing the below error.
Attributerror:'sts' object has no attribute 'update'.
Kindly help me on this
Is there any way to connect with gcp without providing - /var/local/google_cloud_default.json ?
Help: I'm getting this error when i run the script. ImportError: No module named common.utils