Skip to content

Instantly share code, notes, and snippets.

@TiGaI
Last active May 15, 2020 03:08
Show Gist options
  • Save TiGaI/73a25590d6ed9ed6ff7ae7f2f562cbbc to your computer and use it in GitHub Desktop.
Save TiGaI/73a25590d6ed9ed6ff7ae7f2f562cbbc to your computer and use it in GitHub Desktop.
from airflow import DAG
from google.cloud import storage
from google.oauth2 import service_account
from airflow.operators.python_operator import PythonOperator
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
from datetime import datetime
import logging
import twint
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
def scrapeTwitter(bucket_name, project, credentials_path: str=None, **kwargs):
"""setting up the google credentials"""
credentials = service_account.Credentials.from_service_account_file(credentials_path) if credentials_path else None
storage_client = storage.Client(project=project, credentials=credentials)
bucket = storage_client.bucket(bucket_name)
#setting up twitter scraper
tweetConfig = twint.Config()
searchTerm = "coronavirus"
tweetConfig.Search = searchTerm
tweetConfig.Since = "2020-05-01"
tweetConfig.Until = "2020-05-05"
tweetConfig.Lang = "en"
tweetConfig.Verified = True
#storing the result in the pandas dataframe
tweetConfig.Pandas = True
twint.run.Search(tweetConfig)
dateStart = datetime.datetime(2020, 1, 1)
dateEnd = datetime.datetime(2020, 2, 1)
Tweets_df = twint.storage.panda.Tweets_df
filename = f"tweet-{searchTerm}-{dateStart.strftime('%B')}"
bucket.blob('{}/{}.csv'.format("airflowTweet", filename)).upload_from_string(Tweets_df.to_csv(), 'text/csv')
logging.info('{}/{}.csv has been uploaded.'.format("airflowTweet", ""))
logging.info(Tweets_df.head(5))
dag = DAG('blog2_example1',default_args=default_args,catchup=False)
with dag:
scrapeTwitter = PythonOperator(
task_id='scrapeTwitter1',
python_callable=scrapeTwitter,
provide_context=True,
op_kwargs={'bucket_name': 'airflowexample', 'project': 'trusty-charmer-276704', 'credentials_path': '/usr/local/airflow/dags/gcp.json'},
)
scrapeTwitter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment