Skip to content

Instantly share code, notes, and snippets.

View TiGaI's full-sized avatar
🏠
Working from home

Junjie TiGaI

🏠
Working from home
View GitHub Profile
from airflow import DAG, settings
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from airflow.models import Connection
import json
def add_gcp_connection(**kwargs):
new_conn = Connection(
conn_id="google_cloud_default",
from airflow import DAG
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import numpy as np
from datetime import datetime
default_args = {
'owner': 'airflow',
from airflow import DAG
from google.cloud import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from airflow import DAG
from google.cloud import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from airflow import DAG
from google.cloud import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
import datetime
class GoogleCloudStorageBigQueryUpdateSensor(BaseSensorOperator):
"""
See if the modified date of the big query dataset is less than modified date of the GCS files.
from airflow import DAG
from google.cloud import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
def checkingYesterdayTweet(bucket_name, project, credentials_path, **kwargs):
credentials = service_account.Credentials.from_service_account_file(credentials_path) if credentials_path else None
storage_client = storage.Client(project=project, credentials=credentials)
bucket_name = "airflowexample"
bucket = storage_client.get_bucket(bucket_name)
yesterday = datetime.datetime.today() - datetime.timedelta(days=1)
searchTerm = "coronavirus"
filename = f"tweet-{searchTerm}-{yesterday.strftime('%Y-%m-%d')}"
class CheckBQDuplication(BaseOperator):
"""
Check if a specific table in BigQuery contains duplicated data after the load
"""
@apply_defaults
def __init__(
self,
dataset_name,
bigquery_table_name,
bigquery_table_key,
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
import logging
import datetime
import twint
#directly import
# from sensors.gcs_bq_custom_sensor import GoogleCloudStorageBigQueryUpdateSensor