Skip to content

Instantly share code, notes, and snippets.

@jaisontj
Created May 16, 2018 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaisontj/725f8bf6a038d805958efa1168672972 to your computer and use it in GitHub Desktop.
Save jaisontj/725f8bf6a038d805958efa1168672972 to your computer and use it in GitHub Desktop.
import requests
import json
data_url = "https://data.hasura"
headers = {
"Content-Type": "application/json",
"X-Hasura-Role": "admin"
}
if (os.environ["env"] == "dev"):
# Replace <cluster-name> with the name of your cluster.
data_url = "https://data.<cluster_name>.hasura-app.io"
# Replace <auth_token> with the admin token. You can get it from the Api Console
headers["Authorization"] = "Bearer <auth_token>"
url = data_url + "/v1/query"
def getLastUpdatedTimestamp() :
tableBRequestPayload = {
"type": "select",
"args": {
"table": "tableB",
"columns": [
"last_updated"
],
"order_by": ["-last_updated"],
"limit": "1"
}
}
lastUpdatedTableBData = requests.request("POST", url, data=json.dumps(tableBRequestPayload), headers=headers)
if (lastUpdatedTableBData.status_code != 200) :
print("Failed to fetch data from tableB")
return
jsonData = lastUpdatedTableBData.json()
print(jsonData)
lastUpdatedTimestamp = jsonData[0]['last_updated'] if len(jsonData) > 0 else None
return lastUpdatedTimestamp
def getAllDataFromTimestamp(lastUpdatedTimestamp):
requestPayload = {
"type": "select",
"args": {
"table": "tableA",
"columns": [
"*"
]
}
}
if (lastUpdatedTimestamp is not None):
requestPayload['args']['where'] = {
"last_updated": {
"$gt": lastUpdatedTimestamp
}
}
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers)
if (response.status_code != 200) :
print("Failed to fetch data from tableA")
print(response.content)
return
jsonData = response.json()
return jsonData
def upsertDataToTableB(newData):
# Perform operations on new data if required or sync as is
requestPayload = {
"type": "insert",
"args": {
"table": "tableB",
"objects": newData,
"on_conflict": {
"action": "update",
"constraint_on": [
"id"
]
}
}
}
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers)
if (response.status_code != 200) :
print("Failed to insert data into tableB")
print(response.content)
return
jsonData = response.json()
return jsonData
def syncTables():
print("Running Sync")
lastUpdatedTimestamp = getLastUpdatedTimestamp()
print("Last Updated Timestamp: " + lastUpdatedTimestamp)
newData = getAllDataFromTimestamp(lastUpdatedTimestamp)
print("New Data: " + str(newData))
if (len(newData) == 0):
print("Nothing to sync")
return
insertResponse = insertDataToTableB(newData)
print("Sync completed")
print("Synced: " + str(len(newData)) + " row(s)")
syncTables()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment