Skip to content

Instantly share code, notes, and snippets.

@TiGaI
Last active May 19, 2020 02:16
Show Gist options
  • Save TiGaI/ccd32e73935f08ca42434bd3ee324b7a to your computer and use it in GitHub Desktop.
Save TiGaI/ccd32e73935f08ca42434bd3ee324b7a to your computer and use it in GitHub Desktop.
def checkingYesterdayTweet(bucket_name, project, credentials_path, **kwargs):
credentials = service_account.Credentials.from_service_account_file(credentials_path) if credentials_path else None
storage_client = storage.Client(project=project, credentials=credentials)
bucket_name = "airflowexample"
bucket = storage_client.get_bucket(bucket_name)
yesterday = datetime.datetime.today() - datetime.timedelta(days=1)
searchTerm = "coronavirus"
filename = f"tweet-{searchTerm}-{yesterday.strftime('%Y-%m-%d')}"
if bucket.blob(filename).exists():
logging.info('this file exist: {}/{}.csv'.format("airflowTweet", filename))
return "All_jobs_end"
logging.info('this file does not exist: {}/{}.csv'.format("airflowTweet", filename))
return "tweeter-today-scraper"
with dag:
checkingTodayTweet = BranchPythonOperator(
task_id='branching',
python_callable=checkingYesterdayTweet,
op_kwargs={'bucket_name': 'airflowexample', 'project': 'trusty-charmer-276704', 'credentials_path': '/usr/local/airflow/dags/gcp.json'},
provide_context=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment