This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| logger = logging.getLogger(__name__) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from airflow import DAG | |
| from datetime import datetime, timedelta | |
| from airflow.operators.python import PythonOperator | |
| from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook | |
| from airflow.providers.microsoft.azure.hooks.wasb import WasbHook | |
| import pandas as pd | |
| import logging | |
| import tempfile |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| default_args = { | |
| 'owner': 'TheOwnerName', | |
| 'start_date': datetime.today(), | |
| 'retries': 0, | |
| 'retry_delay': timedelta(minutes=5) | |
| } | |
| with DAG( | |
| default_args=default_args, | |
| dag_id="azure_upload_five", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def az_upload(): | |
| AZURE_CONN_ID = "adls-blob" # The connection you've created in Airflow Connections | |
| az_hook = WasbHook(wasb_conn_id=AZURE_CONN_ID) | |
| logging.info("Exporting SQL data to Azure Blob Storage") | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| df = get_data_from_sql() | |
| logger.info("TempFile") | |
| df.to_parquet(f"{tmpdirname}/TableName.parquet") | |
| logger.info("Writing data into the Azure Blob Storage / Data Lake") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Reading data from on-premise MSSQL Server database and saving it into a Pandas dataframe | |
| def get_data_from_sql(): | |
| mssql_hook = MsSqlHook(mssql_conn_id='MSSQL_CONNECTION') # The connectino you've created in Airflow Connections | |
| connection = mssql_hook.get_conn() | |
| cursor = connection.cursor() | |
| logger.info("Querying data.") | |
| # Execute SQL query | |
| cursor.execute('SELECT TOP(3000000) * FROM DataBase.Schema.Table') | |
| #print(cursor.description) |