Skip to content

Instantly share code, notes, and snippets.

@omegaml
Last active April 2, 2020 12:57
Show Gist options
  • Save omegaml/8979e42667803c5a938e7bdbe31bfb85 to your computer and use it in GitHub Desktop.
Save omegaml/8979e42667803c5a938e7bdbe31bfb85 to your computer and use it in GitHub Desktop.
omega|ml snowflake datasets plugin
from omegaml.backends.basedata import BaseDataBackend
from base64 import b64encode, b64decode
from sqlalchemy import create_engine
import pandas as pd
# version of this plugin
version = '0.1.3'
class SnowflakeDataBackend(BaseDataBackend):
"""
Snowflake plugin for omegaml
Installation:
copy/paste above into a cell, execute, then run this to register
Alternatively install getgist
!pip install getgist
!getgist
Pre-Requisites:
make sure you have the following packages installed
!pip install -U snowflake-sqlalchemy==1.2.1
Usage:
# define your snowflake connection
snowflake_constr = f'snowflake://{user}:{password}@{account}/'
# store in any of three ways
# -- just the connection
om.datasets.put(snowflake_constr, 'mysnowflake')
om.datasets.get('mysnowflake')
=> the sql connection object
# -- store connection with a predefined sql
om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ....')
om.datasets.get('mysnowflake')
=> will return a pandas dataframe. specify chunksize to return an interable of dataframes
# -- copy the result of the snowflake query to omegaml
om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ...', copy=True)
om.datasets.get('mysnowflake')
=> will return a pandas dataframe (without executing any additional queries)
=> can also use with om.datasets.getl('mysnowflake') to return a MDataFrame
Advanced:
om.datasets.put() supports the following additional keyword arguments
chunksize=int specify the number of rows to read from snowflake in one chunk.
defaults to 10000
parse_dates=['col', ...] list of column names to parse for date, time or datetime.
see pd.read_sql for details
transform=callable a callable, is passed the DataFrame of each chunk before it
is inserted into the database. use to provide custom transformations.
only works on copy=True
as well as other kwargs supported by pd.read_sql
"""
KIND = 'snowflake.conx'
@classmethod
def supports(self, obj, name, *args, **kwargs):
return isinstance(obj, str) and obj.startswith('snowflake')
def get(self, name, sql=None, chunksize=None, *args, **kwargs):
meta = self.data_store.metadata(name)
connection_str = meta.kind_meta.get('snowflake_connection')
sql = sql or meta.kind_meta.get('sql')
chunksize = chunksize or meta.kind_meta.get('chunksize')
if connection_str:
connection = self.get_connection(connection_str)
else:
raise ValueError('no connection string')
if sql:
return pd.read_sql(sql, connection, chunksize=chunksize)
return connection
def put(self, obj, name, sql=None, copy=False, append=True, chunksize=None,
transform=None, *args, **kwargs):
attributes = kwargs.pop('attributes', None) or {}
kind_meta = {
'snowflake_connection': str(obj),
'sql': sql,
'chunksize': chunksize,
}
if copy:
if not sql:
raise ValueError('a valid SQL statement is requirement with copy=True')
metadata = self.copy_from_sql(sql, obj, name, chunksize=chunksize,
append=append, transform=transform, **kwargs)
else:
metadata = self.data_store.metadata(name)
if metadata is not None:
metadata.kind_meta.update(kind_meta)
else:
metadata = self.data_store.make_metadata(name, self.KIND,
kind_meta=kind_meta,
attributes=attributes)
metadata.attributes.update(attributes)
return metadata.save()
def get_connection(self, connection_str):
connection = None
try:
engine = create_engine(connection_str)
connection = engine.connect()
results = connection.execute('select current_version()').fetchone()
except Exception as e:
if connection is not None:
connection.close()
raise
return connection
def copy_from_sql(self, sql, connstr, name, chunksize=10000,
append=False, transform=None, **kwargs):
connection = self.get_connection(connstr)
chunksize = chunksize or 10000 # avoid None
pditer = pd.read_sql(sql, connection, chunksize=chunksize, **kwargs)
try:
import tqdm
except:
meta = self._chunked_insert(pditer, name, append=append,
transform=transform)
else:
with tqdm.tqdm(unit='rows') as pbar:
meta = self._chunked_insert(pditer, name, append=append,
transform=transform, pbar=pbar)
return meta
def _chunked_insert(self, pditer, name, append=True, transform=None, pbar=None):
for i, df in enumerate(pditer):
if pbar is not None:
pbar.update(len(df))
should_append = (i > 0) or append
if transform:
df = transform(df)
try:
meta = self.data_store.put(df, name, append=should_append)
except Exception as e:
rows = df.iloc[0:10].to_dict()
raise ValueError("{e}: {rows}".format(**locals()))
return meta
"""
omegaml patch to fast_insert
version: 0.10.*, 0.11.*
"""
import os
import math
from multiprocessing import Pool
from omegaml.store.fastinsert import dfchunker, repeat, insert_chunk
# single instance multiprocessing pool
pool = None
def fast_insert(df, omstore, name, chunk_size=int(1e4)):
"""
fast insert of dataframe to mongodb
Depending on size use single-process or multiprocessing. Typically
multiprocessing is faster on datasets with > 10'000 data elements
(rows x columns). Note this may max out your CPU and may use
processor count * chunksize of additional memory. The chunksize is
set to 10'000. The processor count is the default used by multiprocessing,
typically the number of CPUs reported by the operating system.
:param df: dataframe
:param omstore: the OmegaStore to use. will be used to get the mongo_url
:param name: the dataset name in OmegaStore to use. will be used to get the
collection name from the omstore
"""
global pool
if len(df) * len(df.columns) > chunk_size:
mongo_url = omstore.mongo_url
collection_name = omstore.collection(name).name
# we crossed upper limits of single threaded processing, use a Pool
# use the cached pool
cores = max(1, math.ceil(os.cpu_count() / 2))
pool = pool or Pool(processes=cores)
jobs = zip(dfchunker(df, size=chunk_size),
repeat(mongo_url), repeat(collection_name))
pool.map(insert_chunk, (job for job in jobs))
else:
# still within bounds for single threaded inserts
omstore.collection(name).insert_many(df.to_dict(orient='records'))
# apply the fix
from omegaml import version as omversion
if any(omversion.startswith(v) for v in ('0.10', '0.11')):
print(f"*** applying fast_insert patch to omegaml-{omversion}")
from omegaml.store import base
base.fast_insert = fast_insert
# load sqlalchemy
# source: https://stackoverflow.com/a/60726909/890242
from sqlalchemy.dialects import registry
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')
print(f"snowflake plugin {version}: to install execute the following line of code")
print("> om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)")
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## omega|ml - snowflake plugin\n",
"\n",
"This plugin enables working with snowflake data sources directly in omega|ml\n",
"\n",
"* store & retrieve connections to snowflake for dynamic queries using SQL (dynamic: at runtime)\n",
"* store & retrieve views to snowflake (storing connection & static SQL)\n",
"* copy data from snowflake to omega|ml for further processing\n",
"\n",
"Installation\n",
"\n",
"1. install dependencies: `pip install -U snowflake-sqlalchemy==1.2.1`\n",
"2. install the plugin using getgist: `getgist -y omegaml omx_snowflake.py`\n",
"3. load the plugin: `import omx_snowflake`\n",
"4. register the plugin: `om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)`\n",
"\n",
"Details see below\n",
"\n",
"Usage\n",
"\n",
"`om.datasets.put('snowflake://user:password@account', 'omega-dataset-name', sql='select ...', copy=True)`\n",
"\n",
"details see `help(omx_snowflake)`\n",
"\n",
"Version history\n",
"\n",
"- 0.1.0 - initial version (without support for copying data)\n",
"- 0.1.1 - support copying of data\n",
"- 0.1.2 - provide more robustness in parallel inserts on copy "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# install dependencies\n",
"!pip install -q -U snowflake-sqlalchemy==1.2.1"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" Fetching https://api.github.com/users/omegaml/gists\u001b[0m\n",
" Reading https://gist.githubusercontent.com/omegaml/8979e42667803c5a938e7bdbe31bfb85/raw/1371141c2ba53a3a963ad3a7c1c58f53cf9878e5/omx_snowflake.py\u001b[0m\n",
" Saving omx_snowflake.py\u001b[0m\n",
"\u001b[32m Done!\u001b[0m\n"
]
}
],
"source": [
"# install the plugin\n",
"!pip install -q getgist\n",
"!rm -f *snowflake.py && getgist -y omegaml omx_snowflake.py"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"snowflake plugin 0.1.3: to install execute the following line of code\n",
"> om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)\n"
]
},
{
"data": {
"text/plain": [
"OmegaStore(bucket=omegaml, prefix=data/)"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# load the plugin\n",
"import omx_snowflake\n",
"from omx_snowflake import SnowflakeDataBackend\n",
"om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Help on module omx_snowflake:\n",
"\n",
"NAME\n",
" omx_snowflake\n",
"\n",
"CLASSES\n",
" omegaml.backends.basedata.BaseDataBackend(builtins.object)\n",
" SnowflakeDataBackend\n",
" \n",
" class SnowflakeDataBackend(omegaml.backends.basedata.BaseDataBackend)\n",
" | Snowflake plugin for omegaml\n",
" | \n",
" | Installation:\n",
" | copy/paste above into a cell, execute, then run this to register \n",
" | \n",
" | Alternatively install getgist\n",
" | \n",
" | !pip install getgist\n",
" | !getgist \n",
" | \n",
" | Pre-Requisites:\n",
" | make sure you have the following packages installed\n",
" | \n",
" | !pip install -U snowflake-sqlalchemy==1.2.1\n",
" | \n",
" | Usage:\n",
" | # define your snowflake connection\n",
" | snowflake_constr = f'snowflake://{user}:{password}@{account}/'\n",
" | \n",
" | # store in any of three ways\n",
" | \n",
" | # -- just the connection\n",
" | om.datasets.put(snowflake_constr, 'mysnowflake')\n",
" | om.datasets.get('mysnowflake')\n",
" | => the sql connection object\n",
" | \n",
" | # -- store connection with a predefined sql\n",
" | om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ....')\n",
" | om.datasets.get('mysnowflake')\n",
" | => will return a pandas dataframe. specify chunksize to return an interable of dataframes\n",
" | \n",
" | # -- copy the result of the snowflake query to omegaml\n",
" | om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ...', copy=True)\n",
" | om.datasets.get('mysnowflake')\n",
" | => will return a pandas dataframe (without executing any additional queries)\n",
" | => can also use with om.datasets.getl('mysnowflake') to return a MDataFrame \n",
" | \n",
" | Advanced:\n",
" | \n",
" | om.datasets.put() supports the following additional keyword arguments\n",
" | \n",
" | chunksize=int specify the number of rows to read from snowflake in one chunk. \n",
" | defaults to 10000\n",
" | \n",
" | parse_dates=['col', ...] list of column names to parse for date, time or datetime.\n",
" | see pd.read_sql for details\n",
" | \n",
" | transform=callable a callable, is passed the DataFrame of each chunk before it\n",
" | is inserted into the database. use to provide custom transformations.\n",
" | only works on copy=True\n",
" | \n",
" | as well as other kwargs supported by pd.read_sql\n",
" | \n",
" | Method resolution order:\n",
" | SnowflakeDataBackend\n",
" | omegaml.backends.basedata.BaseDataBackend\n",
" | builtins.object\n",
" | \n",
" | Methods defined here:\n",
" | \n",
" | copy_from_sql(self, sql, connstr, name, chunksize=10000, append=False, transform=None, **kwargs)\n",
" | \n",
" | get(self, name, sql=None, chunksize=None, *args, **kwargs)\n",
" | get an obj \n",
" | \n",
" | :param name: the name of the object (str)\n",
" | :return: the object as it was originally stored\n",
" | \n",
" | get_connection(self, connection_str)\n",
" | \n",
" | put(self, obj, name, sql=None, copy=False, append=True, chunksize=None, transform=None, *args, **kwargs)\n",
" | put an obj \n",
" | \n",
" | :param obj: the object to store (object)\n",
" | :param name: the name of the object (str)\n",
" | :param attributes: the attributes dict (dict, optional)\n",
" | :param kwargs: other kwargs to be passed to the Metadata object\n",
" | :return: the Metadata object\n",
" | \n",
" | ----------------------------------------------------------------------\n",
" | Class methods defined here:\n",
" | \n",
" | supports(obj, name, *args, **kwargs) from builtins.type\n",
" | test if this backend supports this obj\n",
" | \n",
" | ----------------------------------------------------------------------\n",
" | Data and other attributes defined here:\n",
" | \n",
" | KIND = 'snowflake.conx'\n",
" | \n",
" | ----------------------------------------------------------------------\n",
" | Methods inherited from omegaml.backends.basedata.BaseDataBackend:\n",
" | \n",
" | __init__(self, model_store=None, data_store=None, **kwargs)\n",
" | Initialize self. See help(type(self)) for accurate signature.\n",
" | \n",
" | getl(self, *args, **kwargs)\n",
" | get an lazy implementation to access the obj\n",
" | \n",
" | A lazy implementation is a proxy to the object that can be \n",
" | evaluated using the :code:`.value` property. The proxy should\n",
" | ensure that any operations applied on the object are delayed until\n",
" | the .value property is accessed. Typically this is to ensure that\n",
" | the actual computation is executed on the cluster, not on the local\n",
" | machine. \n",
" | \n",
" | :param name: the name of the object (str)\n",
" | :return: the proxy to the object as it was originally stored\n",
" | \n",
" | ----------------------------------------------------------------------\n",
" | Data descriptors inherited from omegaml.backends.basedata.BaseDataBackend:\n",
" | \n",
" | __dict__\n",
" | dictionary for instance variables (if defined)\n",
" | \n",
" | __weakref__\n",
" | list of weak references to the object (if defined)\n",
"\n",
"FUNCTIONS\n",
" fast_insert(df, omstore, name, chunk_size=10000)\n",
" fast insert of dataframe to mongodb\n",
" \n",
" Depending on size use single-process or multiprocessing. Typically \n",
" multiprocessing is faster on datasets with > 10'000 data elements\n",
" (rows x columns). Note this may max out your CPU and may use \n",
" processor count * chunksize of additional memory. The chunksize is\n",
" set to 10'000. The processor count is the default used by multiprocessing,\n",
" typically the number of CPUs reported by the operating system. \n",
" \n",
" :param df: dataframe\n",
" :param omstore: the OmegaStore to use. will be used to get the mongo_url\n",
" :param name: the dataset name in OmegaStore to use. will be used to get the \n",
" collection name from the omstore\n",
"\n",
"DATA\n",
" omversion = '0.12.1'\n",
" pool = None\n",
" version = '0.1.3'\n",
"\n",
"FILE\n",
" /tmp/user20/.jupyter/omx_snowflake.py\n",
"\n",
"\n"
]
}
],
"source": [
"# get more information\n",
"help(omx_snowflake)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<Metadata: Metadata(name=secrets,bucket=omegaml,prefix=data/,kind=python.data,created=2020-03-28 13:41:14.823000)>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"secrets = {\n",
" 'snowflake': {\n",
" 'user': 'USER',\n",
" 'password': 'PASSWORD',\n",
" 'account': 'ACCOUNT',\n",
" }\n",
"}\n",
"om.datasets.put(secrets, 'secrets', append=False)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"secrets = om.datasets.get('secrets')[0]"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# build connection string\n",
"from getpass import getpass\n",
"#user = input('snowflake user name> ')\n",
"#password = getpass('snowflake password> ')\n",
"#account = input('snowflake account (remove .snowflake.com)> ')\n",
"snowflake_cxstr = 'snowflake://{user}:{password}@{account}/'.format(**secrets['snowflake'])"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<sqlalchemy.engine.base.Connection at 0x7ffb755cbe80>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# store just the connection\n",
"om.datasets.drop('mysnowflake', force=True)\n",
"om.datasets.put(snowflake_cxstr, 'mysnowflake')\n",
"om.datasets.get('mysnowflake')"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>COUNT(*)</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>6001215</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" COUNT(*)\n",
"0 6001215"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# store a connection reference with sql \n",
"om.datasets.drop('mysnowflake', force=True)\n",
"om.datasets.put(snowflake_cxstr, 'mysnowflake', \n",
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')\n",
"om.datasets.get('mysnowflake')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>COUNT(*)</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>6001215</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" COUNT(*)\n",
"0 6001215"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# query the connection with a specific sql, returning a pandas dataframe\n",
"om.datasets.drop('mysnowflake', force=True)\n",
"om.datasets.put(snowflake_cxstr, 'mysnowflake')\n",
"om.datasets.get('mysnowflake', \n",
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"1rows [00:00, 7.71rows/s]\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>COUNT(*)</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>6001215</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" COUNT(*)\n",
"0 6001215"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# copy the dataset to a native omegaml dataset\n",
"om.datasets.put(snowflake_cxstr, \n",
" 'mysnowflake', \n",
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem',\n",
" copy=True)\n",
"om.datasets.get('mysnowflake')"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100000rows [00:09, 14153.82rows/s]\n"
]
},
{
"data": {
"text/plain": [
"100000"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# copy the dataset to a native omegaml dataset\n",
"om.datasets.drop('mysnowflake', force=True)\n",
"om.datasets.put(snowflake_cxstr, \n",
" 'mysnowflake', \n",
" sql='select * from snowflake_sample_data.tpch_sf1.lineitem limit 100000',\n",
" parse_dates=['l_shipdate', 'l_receiptdate', 'l_commitdate'],\n",
" chunksize=50000,\n",
" append=False,\n",
" copy=True)\n",
"len(om.datasets.getl('mysnowflake'))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment