from airflow import DAG
from import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
import datetime
import logging
from dateutil.relativedelta import relativedelta
import twint
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
def scrapeTwitter(start_date, end_date, bucket_name, project, credentials_path: str=None, **kwargs):
"""setting up the google credentials"""
credentials = service_account.Credentials.from_service_account_file(credentials_path) if credentials_path else None
storage_client = storage.Client(project=project, credentials=credentials)
bucket = storage_client.bucket(bucket_name)
#setting up twitter scraper
tweetConfig = twint.Config()
searchTerm = "coronavirus"
tweetConfig.Search = searchTerm
tweetConfig.Since = start_date.strftime('%Y-%m-%d')
tweetConfig.Until = end_date.strftime('%Y-%m-%d')
tweetConfig.Lang = "en"
tweetConfig.Verified = True
#storing the result in the pandas dataframe
tweetConfig.Pandas = True
tweetConfig.Limit = 100
tweetConfig.Stats = False
tweetConfig.Hide_output = True
Tweets_df =
month = start_date.strftime("%b")
filename = f"tweet-{searchTerm}-{month}"
bucket.blob('{}/{}.csv'.format("airflowTweet", filename)).upload_from_string(Tweets_df.to_csv(), 'text/csv')
blob = bucket.get_blob('{}/{}.csv'.format("airflowTweet", filename))
blob.metadata = {'updatedTime':}
blob.patch()'{}/{}.csv has been uploaded.'.format("airflowTweet", filename))
def createTwitterDag(start_date, dag_id, dag):
end_date = start_date+relativedelta(months=1)
return PythonOperator(
op_kwargs={'start_date': start_date, 'end_date': end_date, 'bucket_name': 'airflowexample', 'project': 'trusty-charmer-276704', 'credentials_path': '/usr/local/airflow/dags/gcp.json'},
dag = DAG('blog2_example',default_args=default_args,catchup=False)
#I will do monthly
start_date = datetime.datetime(2020, 1, 1)
end_date =
with dag:
#I will do monthly
dummy_start_up = DummyOperator(
dummy_shut_down = DummyOperator(
#get the month difference between the two dates so we can create a monthly scraper.
num_months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
for n in range(num_months+1):
dag_name = f"tweeter-{start_date.strftime('%B')}"
dummy_start_up >> createTwitterDag(start_date, dag_name, dag) >> dummy_shut_down
start_date = start_date+relativedelta(months=1)
