Skip to content

Instantly share code, notes, and snippets.

@hsuyuming
Created June 8, 2019 13:01
Show Gist options
  • Save hsuyuming/8e6d000efd5e600c51d3e0de8528d7f7 to your computer and use it in GitHub Desktop.
Save hsuyuming/8e6d000efd5e600c51d3e0de8528d7f7 to your computer and use it in GitHub Desktop.
#import snowflake connector module
import snowflake.connector
import sys
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
# 確定輸入的參數是否給定正確
# Sample: python select_data.py <account> <user> <role>
if len(sys.argv) < 4 :
print("ERROR: Please pass the following command-line parameters in order:",end='\n')
print("account,user,role.")
sys.exit(-1)
else:
ACCOUNT = sys.argv[1]
USER = sys.argv[2]
ROLE = sys.argv[3]
with open("/Users/abehsu/Documents/Snowflake/Snowpipe_poc/rsa_key.p8", "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
con = snowflake.connector.connect(
account=ACCOUNT,
user=USER,
role=ROLE,
private_key=pkb
)
con.cursor().execute("""
USE WAREHOUSE tiny_warehouse_mg;
""")
con.cursor().execute("""
USE SCHEMA testdb_mg.testschema_mg;
""")
# # 傳統方式
# cur = con.cursor()
# try:
# cur.execute("""
# SELECT COL1,COL2 FROM test_table ORDER BY COL1
# """)
# for (col1,col2) in cur:
# print('%s,%s' %(col1,col2))
# finally:
# cur.close()
# # 比較便利的語法
# for (col1,col2) in con.cursor().execute("SELECT COL1,COL2 FROM test_table ORDER BY COL1"):
# print('%s,%s' %(col1,col2))
# # 如果只想得到一筆結果的話,可以使用fetchone()
# col1, col2 = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1").fetchone()
# print('%s,%s' %(col1,col2))
# # 如果想要一次取得特定的筆數,可以使用fetchmany()
# cur = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1")
# ret = cur.fetchmany(3)
# print(ret)
# while len(ret) > 0 :
# ret = cur.fetchmany(3)
# print(ret)
# 如果想要一次拿到所有結果
results = con.cursor().execute("SELECT col1, col2 FROM test_table").fetchall()
for rec in results:
print('%s, %s' % (rec[0], rec[1]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment