Skip to content

Instantly share code, notes, and snippets.

@amacal
Created November 2, 2021 08:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amacal/ea0aea2d2e2895c927c0180be92a0858 to your computer and use it in GitHub Desktop.
Save amacal/ea0aea2d2e2895c927c0180be92a0858 to your computer and use it in GitHub Desktop.
import json
import time
import uuid
import logging
import requests
class Connector:
def __init__(self, host, token, expires):
self.host = host
self.token = token
self.sequence = 1
self.expires = expires
self.logger = logging.getLogger("connector")
def next(self):
self.sequence += 1
return self.sequence
def timestamp(self):
return round(time.time() * 1000)
def expired(self):
return time.time() > self.expires
def execute(self, sql, binds=[]):
def binding(value):
return {'type': 'ANY', 'value': json.dumps(value)}
query_request = requests.post(
url=f"{self.host}/queries/v1/query-request?requestId={uuid.uuid4()}",
headers={
'content-type': 'application/json',
'accept': 'application/json',
'authorization': f"Snowflake Token=\"{self.token}\""
},
data=json.dumps({
'sqlText': sql,
'asyncExec': True,
'sequenceId': self.next(),
'querySubmissionTime': self.timestamp(),
'bindings': {str(index+1): binding(item) for index, item in enumerate(binds)}
})
)
query_request.raise_for_status()
query_response = query_request.json()
return {
'success': query_response['success'],
'query_id': query_response['data']['queryId']
}
def status(self, queryId):
status_request = requests.get(
url=f"{self.host}/monitoring/queries/{queryId}",
headers={
'content-type': 'application/json',
'accept': 'application/json',
'authorization': f"Snowflake Token=\"{self.token}\""
}
)
status_request.raise_for_status()
status_response = status_request.json()
try:
return {
'query_id': queryId,
'status': status_response['data']['queries'][0]['status'].lower(),
'success': status_response['data']['queries'][0]['status'].lower() == 'success',
'failed': status_response['data']['queries'][0]['status'].lower() == 'failed_with_error'
}
except IndexError:
self.logger.warning(f"Status endpoint returned unexpected response: '{json.dumps(status_response)}'")
raise
def result(self, queryId):
result_request = requests.post(
url=f"{self.host}/queries/v1/query-request?requestId={uuid.uuid4()}",
headers={
'content-type': 'application/json',
'accept': 'application/snowflake',
'authorization': f"Snowflake Token=\"{self.token}\""
},
data=json.dumps({
'sqlText': f"select * from table(result_scan('{queryId}'))",
'asyncExec': False,
'sequenceId': self.next(),
'querySubmissionTime': self.timestamp()
})
)
result_request.raise_for_status()
result_response = result_request.json()
data = {
'query_id': queryId,
'columns': [item['name'].lower() for item in result_response['data']['rowtype']]
}
if 'rowset' in result_response['data']:
data['rowset'] = result_response['data']['rowset']
if 'chunks' in result_response['data']:
data['headers'] = result_response['data']['chunkHeaders']
data['chunk'] = result_response['data']['chunks']
return data
@staticmethod
def open(account, region, warehouse, username, password):
host = f"https://{account}.{region}.snowflakecomputing.com"
login_request = requests.post(
url=f"{host}/session/v1/login-request?warehouse={warehouse}",
headers={'content-type': 'application/json', 'accept': 'application/json'},
data=json.dumps({
'data': {
"CLIENT_APP_ID": "connector",
"CLIENT_APP_VERSION": "0.0.1",
"ACCOUNT_NAME": account,
"LOGIN_NAME": username,
"PASSWORD": password
}
})
)
login_request.raise_for_status()
login_response = login_request.json()
validity = login_response['data']['validityInSeconds']
expires = time.time() + int(validity) - 300
return Connector(host, login_response['data']['token'], expires)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment