Skip to content

Instantly share code, notes, and snippets.

@jasonprado
Created May 5, 2021 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonprado/0c71196bf21398d3b79123b9172987dd to your computer and use it in GitHub Desktop.
Save jasonprado/0c71196bf21398d3b79123b9172987dd to your computer and use it in GitHub Desktop.
from typing import Any
from airtable import Airtable
import pandas as pd
from prefect import Task
from prefect.utilities.tasks import defaults_from_attrs
class ReadAirtableViewToDataframeTask(Task):
"""
Task to read entire Airtable table or view
"""
def __init__(
self,
base_key: str = None,
table_name: str = None,
view_name: str = None,
api_key: str = None,
**kwargs: Any,
):
self.base_key = base_key
self.table_name = table_name
self.api_key = api_key
self.view_name = view_name
super().__init__(**kwargs)
@defaults_from_attrs("base_key", "table_name", "view_name", "api_key")
def run(
self,
base_key: str = None,
table_name: str = None,
view_name: str = None,
api_key: str = None,
):
table = Airtable(base_key, table_name, api_key)
records = table.get_all(view=view_name)
df = pd.DataFrame.from_records(
({**r['fields'], **{'id': r['id']}} for r in records),
index='id',
)
return df
class ApplyDataframeUpdatesToAirtableTask(Task):
"""
Applies updates from a Dataframe indexed by ID
"""
def __init__(
self,
base_key: str = None,
table_name: str = None,
api_key: str = None,
**kwargs: Any,
):
self.base_key = base_key
self.table_name = table_name
self.api_key = api_key
super().__init__(**kwargs)
@defaults_from_attrs("base_key", "table_name", "api_key")
def run(
self,
updates: pd.DataFrame,
base_key: str = None,
table_name: str = None,
api_key: str = None,
):
table = Airtable(base_key, table_name, api_key)
updates_dict = updates.to_dict(orient='index')
updates_records = [
{
'id': record_id,
'fields': updates_dict[record_id],
}
for record_id in updates_dict
]
table.batch_update(updates_records)
class InsertRecordsIntoAirtable(Task):
"""
Applies updates from a Dataframe indexed by ID
"""
def __init__(
self,
base_key: str = None,
table_name: str = None,
api_key: str = None,
**kwargs: Any,
):
self.base_key = base_key
self.table_name = table_name
self.api_key = api_key
super().__init__(**kwargs)
@defaults_from_attrs("base_key", "table_name", "api_key")
def run(
self,
records,
base_key: str = None,
table_name: str = None,
api_key: str = None,
):
table = Airtable(base_key, table_name, api_key)
table.batch_insert(records)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment