Last active
September 28, 2023 16:55
-
-
Save van-william/fa02096f0baf3ab096ddc49b2a5c3c40 to your computer and use it in GitHub Desktop.
lambda function to query tulip tables and write to fivetran
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from tulip_api import TulipAPI,TulipTable, CachedTulipTable | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime | |
import os | |
def lambda_handler(event, context): | |
# instance url, api key, and api secret are stored as environment variables | |
instance_url = os.getenv('instance') | |
KEY = os.getenv('api_key') | |
SECRET = os.getenv('api_secret') | |
table_id = "aKzvoscgHCyd2CRu3_DEFAULT" | |
# query the tulip table using the Tulip Community API | |
api = TulipAPI(instance_url, | |
api_key=KEY, | |
api_key_secret=SECRET,) | |
tulip_table = TulipTable(api, table_id) | |
df = pd.DataFrame(tulip_table.stream_records()) | |
# Clean up the dataframe of tulip table data | |
clean_columns = [column if column in ['_createdAt', '_sequenceNumber', '_updatedAt', 'id'] else column[6:] for column in df.columns] | |
df.columns = clean_columns | |
df = df.fillna(0) | |
df_dict = df.to_dict(orient='records') | |
# Populate records in insert | |
insert = {} | |
insert['tulip_table'] = df_dict | |
delete = {} | |
state = {} | |
state['transactionsCursor'] = str(datetime.now()) | |
transactionsSchema = {} | |
transactionsSchema['primary_key'] = ['id'] | |
schema = {} | |
schema['table'] = transactionsSchema | |
tec_station_activity_response = {} | |
# Add updated state to response | |
tec_station_activity_response['state'] = state | |
# Add all the records to be inserted in response | |
tec_station_activity_response['insert'] = insert | |
# Add all the records to be marked as deleted in response | |
tec_station_activity_response['delete'] = delete | |
# Add schema defintion in response | |
tec_station_activity_response['schema'] = schema | |
# Add hasMore flag | |
tec_station_activity_response['hasMore'] = False | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment