Created
November 2, 2021 08:24
-
-
Save amacal/ea0aea2d2e2895c927c0180be92a0858 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
import uuid | |
import logging | |
import requests | |
class Connector: | |
def __init__(self, host, token, expires): | |
self.host = host | |
self.token = token | |
self.sequence = 1 | |
self.expires = expires | |
self.logger = logging.getLogger("connector") | |
def next(self): | |
self.sequence += 1 | |
return self.sequence | |
def timestamp(self): | |
return round(time.time() * 1000) | |
def expired(self): | |
return time.time() > self.expires | |
def execute(self, sql, binds=[]): | |
def binding(value): | |
return {'type': 'ANY', 'value': json.dumps(value)} | |
query_request = requests.post( | |
url=f"{self.host}/queries/v1/query-request?requestId={uuid.uuid4()}", | |
headers={ | |
'content-type': 'application/json', | |
'accept': 'application/json', | |
'authorization': f"Snowflake Token=\"{self.token}\"" | |
}, | |
data=json.dumps({ | |
'sqlText': sql, | |
'asyncExec': True, | |
'sequenceId': self.next(), | |
'querySubmissionTime': self.timestamp(), | |
'bindings': {str(index+1): binding(item) for index, item in enumerate(binds)} | |
}) | |
) | |
query_request.raise_for_status() | |
query_response = query_request.json() | |
return { | |
'success': query_response['success'], | |
'query_id': query_response['data']['queryId'] | |
} | |
def status(self, queryId): | |
status_request = requests.get( | |
url=f"{self.host}/monitoring/queries/{queryId}", | |
headers={ | |
'content-type': 'application/json', | |
'accept': 'application/json', | |
'authorization': f"Snowflake Token=\"{self.token}\"" | |
} | |
) | |
status_request.raise_for_status() | |
status_response = status_request.json() | |
try: | |
return { | |
'query_id': queryId, | |
'status': status_response['data']['queries'][0]['status'].lower(), | |
'success': status_response['data']['queries'][0]['status'].lower() == 'success', | |
'failed': status_response['data']['queries'][0]['status'].lower() == 'failed_with_error' | |
} | |
except IndexError: | |
self.logger.warning(f"Status endpoint returned unexpected response: '{json.dumps(status_response)}'") | |
raise | |
def result(self, queryId): | |
result_request = requests.post( | |
url=f"{self.host}/queries/v1/query-request?requestId={uuid.uuid4()}", | |
headers={ | |
'content-type': 'application/json', | |
'accept': 'application/snowflake', | |
'authorization': f"Snowflake Token=\"{self.token}\"" | |
}, | |
data=json.dumps({ | |
'sqlText': f"select * from table(result_scan('{queryId}'))", | |
'asyncExec': False, | |
'sequenceId': self.next(), | |
'querySubmissionTime': self.timestamp() | |
}) | |
) | |
result_request.raise_for_status() | |
result_response = result_request.json() | |
data = { | |
'query_id': queryId, | |
'columns': [item['name'].lower() for item in result_response['data']['rowtype']] | |
} | |
if 'rowset' in result_response['data']: | |
data['rowset'] = result_response['data']['rowset'] | |
if 'chunks' in result_response['data']: | |
data['headers'] = result_response['data']['chunkHeaders'] | |
data['chunk'] = result_response['data']['chunks'] | |
return data | |
@staticmethod | |
def open(account, region, warehouse, username, password): | |
host = f"https://{account}.{region}.snowflakecomputing.com" | |
login_request = requests.post( | |
url=f"{host}/session/v1/login-request?warehouse={warehouse}", | |
headers={'content-type': 'application/json', 'accept': 'application/json'}, | |
data=json.dumps({ | |
'data': { | |
"CLIENT_APP_ID": "connector", | |
"CLIENT_APP_VERSION": "0.0.1", | |
"ACCOUNT_NAME": account, | |
"LOGIN_NAME": username, | |
"PASSWORD": password | |
} | |
}) | |
) | |
login_request.raise_for_status() | |
login_response = login_request.json() | |
validity = login_response['data']['validityInSeconds'] | |
expires = time.time() + int(validity) - 300 | |
return Connector(host, login_response['data']['token'], expires) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment