Created
May 16, 2018 13:52
-
-
Save jaisontj/725f8bf6a038d805958efa1168672972 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
data_url = "https://data.hasura" | |
headers = { | |
"Content-Type": "application/json", | |
"X-Hasura-Role": "admin" | |
} | |
if (os.environ["env"] == "dev"): | |
# Replace <cluster-name> with the name of your cluster. | |
data_url = "https://data.<cluster_name>.hasura-app.io" | |
# Replace <auth_token> with the admin token. You can get it from the Api Console | |
headers["Authorization"] = "Bearer <auth_token>" | |
url = data_url + "/v1/query" | |
def getLastUpdatedTimestamp() : | |
tableBRequestPayload = { | |
"type": "select", | |
"args": { | |
"table": "tableB", | |
"columns": [ | |
"last_updated" | |
], | |
"order_by": ["-last_updated"], | |
"limit": "1" | |
} | |
} | |
lastUpdatedTableBData = requests.request("POST", url, data=json.dumps(tableBRequestPayload), headers=headers) | |
if (lastUpdatedTableBData.status_code != 200) : | |
print("Failed to fetch data from tableB") | |
return | |
jsonData = lastUpdatedTableBData.json() | |
print(jsonData) | |
lastUpdatedTimestamp = jsonData[0]['last_updated'] if len(jsonData) > 0 else None | |
return lastUpdatedTimestamp | |
def getAllDataFromTimestamp(lastUpdatedTimestamp): | |
requestPayload = { | |
"type": "select", | |
"args": { | |
"table": "tableA", | |
"columns": [ | |
"*" | |
] | |
} | |
} | |
if (lastUpdatedTimestamp is not None): | |
requestPayload['args']['where'] = { | |
"last_updated": { | |
"$gt": lastUpdatedTimestamp | |
} | |
} | |
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers) | |
if (response.status_code != 200) : | |
print("Failed to fetch data from tableA") | |
print(response.content) | |
return | |
jsonData = response.json() | |
return jsonData | |
def upsertDataToTableB(newData): | |
# Perform operations on new data if required or sync as is | |
requestPayload = { | |
"type": "insert", | |
"args": { | |
"table": "tableB", | |
"objects": newData, | |
"on_conflict": { | |
"action": "update", | |
"constraint_on": [ | |
"id" | |
] | |
} | |
} | |
} | |
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers) | |
if (response.status_code != 200) : | |
print("Failed to insert data into tableB") | |
print(response.content) | |
return | |
jsonData = response.json() | |
return jsonData | |
def syncTables(): | |
print("Running Sync") | |
lastUpdatedTimestamp = getLastUpdatedTimestamp() | |
print("Last Updated Timestamp: " + lastUpdatedTimestamp) | |
newData = getAllDataFromTimestamp(lastUpdatedTimestamp) | |
print("New Data: " + str(newData)) | |
if (len(newData) == 0): | |
print("Nothing to sync") | |
return | |
insertResponse = insertDataToTableB(newData) | |
print("Sync completed") | |
print("Synced: " + str(len(newData)) + " row(s)") | |
syncTables() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment