Skip to content

Instantly share code, notes, and snippets.

@TiGaI
Created May 9, 2020 21:32
Show Gist options
  • Save TiGaI/091219e21d89eba7ea2154e2a1c8d469 to your computer and use it in GitHub Desktop.
Save TiGaI/091219e21d89eba7ea2154e2a1c8d469 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import numpy as np
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
def simpleNumpyToGCS(csv_name: str, folder_name: str,
bucket_name="airflowexample", **kwargs):
hook = GoogleCloudStorageHook()
data = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data=data)
df.to_csv('example1.csv', index=False)
hook.upload(bucket_name,
object='{}/{}.csv'.format(folder_name, csv_name),
filename='example1.csv',
mime_type='text/csv')
dag = DAG('exampleDag',
default_args=default_args,
catchup=False)
with dag:
simpleNumpyToGCS_task = PythonOperator(
task_id='simpleNumpyToGCS',
python_callable=simpleNumpyToGCS,
provide_context=True,
op_kwargs={'csv_name': 'example_airflow', 'folder_name': 'airflow'},
)
simpleNumpyToGCS_task
@nrivasg
Copy link

nrivasg commented Sep 11, 2020

hi, I would appreciate if you can help me, I think I have followed the previous steps well, however when I load this script, I get the following
mistake :
Broken DAG [/usr/local/airflow/dags/simpleExample.py] No module named ' google ',
you know what could be happening?

@TiGaI
Copy link
Author

TiGaI commented Sep 11, 2020

It is saying that your airflow docker container dont have the gcs plugin. Double check if your docker image has the plugin for google install and The new image is stated in your dockerfile and docker compose file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment