Created
June 8, 2019 13:01
-
-
Save hsuyuming/8e6d000efd5e600c51d3e0de8528d7f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#import snowflake connector module | |
import snowflake.connector | |
import sys | |
import os | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.primitives.asymmetric import dsa | |
from cryptography.hazmat.primitives import serialization | |
# 確定輸入的參數是否給定正確 | |
# Sample: python select_data.py <account> <user> <role> | |
if len(sys.argv) < 4 : | |
print("ERROR: Please pass the following command-line parameters in order:",end='\n') | |
print("account,user,role.") | |
sys.exit(-1) | |
else: | |
ACCOUNT = sys.argv[1] | |
USER = sys.argv[2] | |
ROLE = sys.argv[3] | |
with open("/Users/abehsu/Documents/Snowflake/Snowpipe_poc/rsa_key.p8", "rb") as key: | |
p_key = serialization.load_pem_private_key( | |
key.read(), | |
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), | |
backend=default_backend() | |
) | |
pkb = p_key.private_bytes( | |
encoding=serialization.Encoding.DER, | |
format=serialization.PrivateFormat.PKCS8, | |
encryption_algorithm=serialization.NoEncryption() | |
) | |
con = snowflake.connector.connect( | |
account=ACCOUNT, | |
user=USER, | |
role=ROLE, | |
private_key=pkb | |
) | |
con.cursor().execute(""" | |
USE WAREHOUSE tiny_warehouse_mg; | |
""") | |
con.cursor().execute(""" | |
USE SCHEMA testdb_mg.testschema_mg; | |
""") | |
# # 傳統方式 | |
# cur = con.cursor() | |
# try: | |
# cur.execute(""" | |
# SELECT COL1,COL2 FROM test_table ORDER BY COL1 | |
# """) | |
# for (col1,col2) in cur: | |
# print('%s,%s' %(col1,col2)) | |
# finally: | |
# cur.close() | |
# # 比較便利的語法 | |
# for (col1,col2) in con.cursor().execute("SELECT COL1,COL2 FROM test_table ORDER BY COL1"): | |
# print('%s,%s' %(col1,col2)) | |
# # 如果只想得到一筆結果的話,可以使用fetchone() | |
# col1, col2 = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1").fetchone() | |
# print('%s,%s' %(col1,col2)) | |
# # 如果想要一次取得特定的筆數,可以使用fetchmany() | |
# cur = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1") | |
# ret = cur.fetchmany(3) | |
# print(ret) | |
# while len(ret) > 0 : | |
# ret = cur.fetchmany(3) | |
# print(ret) | |
# 如果想要一次拿到所有結果 | |
results = con.cursor().execute("SELECT col1, col2 FROM test_table").fetchall() | |
for rec in results: | |
print('%s, %s' % (rec[0], rec[1])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment