Last active
April 2, 2020 12:57
-
-
Save omegaml/8979e42667803c5a938e7bdbe31bfb85 to your computer and use it in GitHub Desktop.
omega|ml snowflake datasets plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from omegaml.backends.basedata import BaseDataBackend | |
from base64 import b64encode, b64decode | |
from sqlalchemy import create_engine | |
import pandas as pd | |
# version of this plugin | |
version = '0.1.3' | |
class SnowflakeDataBackend(BaseDataBackend): | |
""" | |
Snowflake plugin for omegaml | |
Installation: | |
copy/paste above into a cell, execute, then run this to register | |
Alternatively install getgist | |
!pip install getgist | |
!getgist | |
Pre-Requisites: | |
make sure you have the following packages installed | |
!pip install -U snowflake-sqlalchemy==1.2.1 | |
Usage: | |
# define your snowflake connection | |
snowflake_constr = f'snowflake://{user}:{password}@{account}/' | |
# store in any of three ways | |
# -- just the connection | |
om.datasets.put(snowflake_constr, 'mysnowflake') | |
om.datasets.get('mysnowflake') | |
=> the sql connection object | |
# -- store connection with a predefined sql | |
om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ....') | |
om.datasets.get('mysnowflake') | |
=> will return a pandas dataframe. specify chunksize to return an interable of dataframes | |
# -- copy the result of the snowflake query to omegaml | |
om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ...', copy=True) | |
om.datasets.get('mysnowflake') | |
=> will return a pandas dataframe (without executing any additional queries) | |
=> can also use with om.datasets.getl('mysnowflake') to return a MDataFrame | |
Advanced: | |
om.datasets.put() supports the following additional keyword arguments | |
chunksize=int specify the number of rows to read from snowflake in one chunk. | |
defaults to 10000 | |
parse_dates=['col', ...] list of column names to parse for date, time or datetime. | |
see pd.read_sql for details | |
transform=callable a callable, is passed the DataFrame of each chunk before it | |
is inserted into the database. use to provide custom transformations. | |
only works on copy=True | |
as well as other kwargs supported by pd.read_sql | |
""" | |
KIND = 'snowflake.conx' | |
@classmethod | |
def supports(self, obj, name, *args, **kwargs): | |
return isinstance(obj, str) and obj.startswith('snowflake') | |
def get(self, name, sql=None, chunksize=None, *args, **kwargs): | |
meta = self.data_store.metadata(name) | |
connection_str = meta.kind_meta.get('snowflake_connection') | |
sql = sql or meta.kind_meta.get('sql') | |
chunksize = chunksize or meta.kind_meta.get('chunksize') | |
if connection_str: | |
connection = self.get_connection(connection_str) | |
else: | |
raise ValueError('no connection string') | |
if sql: | |
return pd.read_sql(sql, connection, chunksize=chunksize) | |
return connection | |
def put(self, obj, name, sql=None, copy=False, append=True, chunksize=None, | |
transform=None, *args, **kwargs): | |
attributes = kwargs.pop('attributes', None) or {} | |
kind_meta = { | |
'snowflake_connection': str(obj), | |
'sql': sql, | |
'chunksize': chunksize, | |
} | |
if copy: | |
if not sql: | |
raise ValueError('a valid SQL statement is requirement with copy=True') | |
metadata = self.copy_from_sql(sql, obj, name, chunksize=chunksize, | |
append=append, transform=transform, **kwargs) | |
else: | |
metadata = self.data_store.metadata(name) | |
if metadata is not None: | |
metadata.kind_meta.update(kind_meta) | |
else: | |
metadata = self.data_store.make_metadata(name, self.KIND, | |
kind_meta=kind_meta, | |
attributes=attributes) | |
metadata.attributes.update(attributes) | |
return metadata.save() | |
def get_connection(self, connection_str): | |
connection = None | |
try: | |
engine = create_engine(connection_str) | |
connection = engine.connect() | |
results = connection.execute('select current_version()').fetchone() | |
except Exception as e: | |
if connection is not None: | |
connection.close() | |
raise | |
return connection | |
def copy_from_sql(self, sql, connstr, name, chunksize=10000, | |
append=False, transform=None, **kwargs): | |
connection = self.get_connection(connstr) | |
chunksize = chunksize or 10000 # avoid None | |
pditer = pd.read_sql(sql, connection, chunksize=chunksize, **kwargs) | |
try: | |
import tqdm | |
except: | |
meta = self._chunked_insert(pditer, name, append=append, | |
transform=transform) | |
else: | |
with tqdm.tqdm(unit='rows') as pbar: | |
meta = self._chunked_insert(pditer, name, append=append, | |
transform=transform, pbar=pbar) | |
return meta | |
def _chunked_insert(self, pditer, name, append=True, transform=None, pbar=None): | |
for i, df in enumerate(pditer): | |
if pbar is not None: | |
pbar.update(len(df)) | |
should_append = (i > 0) or append | |
if transform: | |
df = transform(df) | |
try: | |
meta = self.data_store.put(df, name, append=should_append) | |
except Exception as e: | |
rows = df.iloc[0:10].to_dict() | |
raise ValueError("{e}: {rows}".format(**locals())) | |
return meta | |
""" | |
omegaml patch to fast_insert | |
version: 0.10.*, 0.11.* | |
""" | |
import os | |
import math | |
from multiprocessing import Pool | |
from omegaml.store.fastinsert import dfchunker, repeat, insert_chunk | |
# single instance multiprocessing pool | |
pool = None | |
def fast_insert(df, omstore, name, chunk_size=int(1e4)): | |
""" | |
fast insert of dataframe to mongodb | |
Depending on size use single-process or multiprocessing. Typically | |
multiprocessing is faster on datasets with > 10'000 data elements | |
(rows x columns). Note this may max out your CPU and may use | |
processor count * chunksize of additional memory. The chunksize is | |
set to 10'000. The processor count is the default used by multiprocessing, | |
typically the number of CPUs reported by the operating system. | |
:param df: dataframe | |
:param omstore: the OmegaStore to use. will be used to get the mongo_url | |
:param name: the dataset name in OmegaStore to use. will be used to get the | |
collection name from the omstore | |
""" | |
global pool | |
if len(df) * len(df.columns) > chunk_size: | |
mongo_url = omstore.mongo_url | |
collection_name = omstore.collection(name).name | |
# we crossed upper limits of single threaded processing, use a Pool | |
# use the cached pool | |
cores = max(1, math.ceil(os.cpu_count() / 2)) | |
pool = pool or Pool(processes=cores) | |
jobs = zip(dfchunker(df, size=chunk_size), | |
repeat(mongo_url), repeat(collection_name)) | |
pool.map(insert_chunk, (job for job in jobs)) | |
else: | |
# still within bounds for single threaded inserts | |
omstore.collection(name).insert_many(df.to_dict(orient='records')) | |
# apply the fix | |
from omegaml import version as omversion | |
if any(omversion.startswith(v) for v in ('0.10', '0.11')): | |
print(f"*** applying fast_insert patch to omegaml-{omversion}") | |
from omegaml.store import base | |
base.fast_insert = fast_insert | |
# load sqlalchemy | |
# source: https://stackoverflow.com/a/60726909/890242 | |
from sqlalchemy.dialects import registry | |
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect') | |
print(f"snowflake plugin {version}: to install execute the following line of code") | |
print("> om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## omega|ml - snowflake plugin\n", | |
"\n", | |
"This plugin enables working with snowflake data sources directly in omega|ml\n", | |
"\n", | |
"* store & retrieve connections to snowflake for dynamic queries using SQL (dynamic: at runtime)\n", | |
"* store & retrieve views to snowflake (storing connection & static SQL)\n", | |
"* copy data from snowflake to omega|ml for further processing\n", | |
"\n", | |
"Installation\n", | |
"\n", | |
"1. install dependencies: `pip install -U snowflake-sqlalchemy==1.2.1`\n", | |
"2. install the plugin using getgist: `getgist -y omegaml omx_snowflake.py`\n", | |
"3. load the plugin: `import omx_snowflake`\n", | |
"4. register the plugin: `om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)`\n", | |
"\n", | |
"Details see below\n", | |
"\n", | |
"Usage\n", | |
"\n", | |
"`om.datasets.put('snowflake://user:password@account', 'omega-dataset-name', sql='select ...', copy=True)`\n", | |
"\n", | |
"details see `help(omx_snowflake)`\n", | |
"\n", | |
"Version history\n", | |
"\n", | |
"- 0.1.0 - initial version (without support for copying data)\n", | |
"- 0.1.1 - support copying of data\n", | |
"- 0.1.2 - provide more robustness in parallel inserts on copy " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# install dependencies\n", | |
"!pip install -q -U snowflake-sqlalchemy==1.2.1" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
" Fetching https://api.github.com/users/omegaml/gists\u001b[0m\n", | |
" Reading https://gist.githubusercontent.com/omegaml/8979e42667803c5a938e7bdbe31bfb85/raw/1371141c2ba53a3a963ad3a7c1c58f53cf9878e5/omx_snowflake.py\u001b[0m\n", | |
" Saving omx_snowflake.py\u001b[0m\n", | |
"\u001b[32m Done!\u001b[0m\n" | |
] | |
} | |
], | |
"source": [ | |
"# install the plugin\n", | |
"!pip install -q getgist\n", | |
"!rm -f *snowflake.py && getgist -y omegaml omx_snowflake.py" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"snowflake plugin 0.1.3: to install execute the following line of code\n", | |
"> om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"OmegaStore(bucket=omegaml, prefix=data/)" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# load the plugin\n", | |
"import omx_snowflake\n", | |
"from omx_snowflake import SnowflakeDataBackend\n", | |
"om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Help on module omx_snowflake:\n", | |
"\n", | |
"NAME\n", | |
" omx_snowflake\n", | |
"\n", | |
"CLASSES\n", | |
" omegaml.backends.basedata.BaseDataBackend(builtins.object)\n", | |
" SnowflakeDataBackend\n", | |
" \n", | |
" class SnowflakeDataBackend(omegaml.backends.basedata.BaseDataBackend)\n", | |
" | Snowflake plugin for omegaml\n", | |
" | \n", | |
" | Installation:\n", | |
" | copy/paste above into a cell, execute, then run this to register \n", | |
" | \n", | |
" | Alternatively install getgist\n", | |
" | \n", | |
" | !pip install getgist\n", | |
" | !getgist \n", | |
" | \n", | |
" | Pre-Requisites:\n", | |
" | make sure you have the following packages installed\n", | |
" | \n", | |
" | !pip install -U snowflake-sqlalchemy==1.2.1\n", | |
" | \n", | |
" | Usage:\n", | |
" | # define your snowflake connection\n", | |
" | snowflake_constr = f'snowflake://{user}:{password}@{account}/'\n", | |
" | \n", | |
" | # store in any of three ways\n", | |
" | \n", | |
" | # -- just the connection\n", | |
" | om.datasets.put(snowflake_constr, 'mysnowflake')\n", | |
" | om.datasets.get('mysnowflake')\n", | |
" | => the sql connection object\n", | |
" | \n", | |
" | # -- store connection with a predefined sql\n", | |
" | om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ....')\n", | |
" | om.datasets.get('mysnowflake')\n", | |
" | => will return a pandas dataframe. specify chunksize to return an interable of dataframes\n", | |
" | \n", | |
" | # -- copy the result of the snowflake query to omegaml\n", | |
" | om.datasets.put(snowflake_constr, 'mysnowflake', sql='select ...', copy=True)\n", | |
" | om.datasets.get('mysnowflake')\n", | |
" | => will return a pandas dataframe (without executing any additional queries)\n", | |
" | => can also use with om.datasets.getl('mysnowflake') to return a MDataFrame \n", | |
" | \n", | |
" | Advanced:\n", | |
" | \n", | |
" | om.datasets.put() supports the following additional keyword arguments\n", | |
" | \n", | |
" | chunksize=int specify the number of rows to read from snowflake in one chunk. \n", | |
" | defaults to 10000\n", | |
" | \n", | |
" | parse_dates=['col', ...] list of column names to parse for date, time or datetime.\n", | |
" | see pd.read_sql for details\n", | |
" | \n", | |
" | transform=callable a callable, is passed the DataFrame of each chunk before it\n", | |
" | is inserted into the database. use to provide custom transformations.\n", | |
" | only works on copy=True\n", | |
" | \n", | |
" | as well as other kwargs supported by pd.read_sql\n", | |
" | \n", | |
" | Method resolution order:\n", | |
" | SnowflakeDataBackend\n", | |
" | omegaml.backends.basedata.BaseDataBackend\n", | |
" | builtins.object\n", | |
" | \n", | |
" | Methods defined here:\n", | |
" | \n", | |
" | copy_from_sql(self, sql, connstr, name, chunksize=10000, append=False, transform=None, **kwargs)\n", | |
" | \n", | |
" | get(self, name, sql=None, chunksize=None, *args, **kwargs)\n", | |
" | get an obj \n", | |
" | \n", | |
" | :param name: the name of the object (str)\n", | |
" | :return: the object as it was originally stored\n", | |
" | \n", | |
" | get_connection(self, connection_str)\n", | |
" | \n", | |
" | put(self, obj, name, sql=None, copy=False, append=True, chunksize=None, transform=None, *args, **kwargs)\n", | |
" | put an obj \n", | |
" | \n", | |
" | :param obj: the object to store (object)\n", | |
" | :param name: the name of the object (str)\n", | |
" | :param attributes: the attributes dict (dict, optional)\n", | |
" | :param kwargs: other kwargs to be passed to the Metadata object\n", | |
" | :return: the Metadata object\n", | |
" | \n", | |
" | ----------------------------------------------------------------------\n", | |
" | Class methods defined here:\n", | |
" | \n", | |
" | supports(obj, name, *args, **kwargs) from builtins.type\n", | |
" | test if this backend supports this obj\n", | |
" | \n", | |
" | ----------------------------------------------------------------------\n", | |
" | Data and other attributes defined here:\n", | |
" | \n", | |
" | KIND = 'snowflake.conx'\n", | |
" | \n", | |
" | ----------------------------------------------------------------------\n", | |
" | Methods inherited from omegaml.backends.basedata.BaseDataBackend:\n", | |
" | \n", | |
" | __init__(self, model_store=None, data_store=None, **kwargs)\n", | |
" | Initialize self. See help(type(self)) for accurate signature.\n", | |
" | \n", | |
" | getl(self, *args, **kwargs)\n", | |
" | get an lazy implementation to access the obj\n", | |
" | \n", | |
" | A lazy implementation is a proxy to the object that can be \n", | |
" | evaluated using the :code:`.value` property. The proxy should\n", | |
" | ensure that any operations applied on the object are delayed until\n", | |
" | the .value property is accessed. Typically this is to ensure that\n", | |
" | the actual computation is executed on the cluster, not on the local\n", | |
" | machine. \n", | |
" | \n", | |
" | :param name: the name of the object (str)\n", | |
" | :return: the proxy to the object as it was originally stored\n", | |
" | \n", | |
" | ----------------------------------------------------------------------\n", | |
" | Data descriptors inherited from omegaml.backends.basedata.BaseDataBackend:\n", | |
" | \n", | |
" | __dict__\n", | |
" | dictionary for instance variables (if defined)\n", | |
" | \n", | |
" | __weakref__\n", | |
" | list of weak references to the object (if defined)\n", | |
"\n", | |
"FUNCTIONS\n", | |
" fast_insert(df, omstore, name, chunk_size=10000)\n", | |
" fast insert of dataframe to mongodb\n", | |
" \n", | |
" Depending on size use single-process or multiprocessing. Typically \n", | |
" multiprocessing is faster on datasets with > 10'000 data elements\n", | |
" (rows x columns). Note this may max out your CPU and may use \n", | |
" processor count * chunksize of additional memory. The chunksize is\n", | |
" set to 10'000. The processor count is the default used by multiprocessing,\n", | |
" typically the number of CPUs reported by the operating system. \n", | |
" \n", | |
" :param df: dataframe\n", | |
" :param omstore: the OmegaStore to use. will be used to get the mongo_url\n", | |
" :param name: the dataset name in OmegaStore to use. will be used to get the \n", | |
" collection name from the omstore\n", | |
"\n", | |
"DATA\n", | |
" omversion = '0.12.1'\n", | |
" pool = None\n", | |
" version = '0.1.3'\n", | |
"\n", | |
"FILE\n", | |
" /tmp/user20/.jupyter/omx_snowflake.py\n", | |
"\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"# get more information\n", | |
"help(omx_snowflake)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<Metadata: Metadata(name=secrets,bucket=omegaml,prefix=data/,kind=python.data,created=2020-03-28 13:41:14.823000)>" | |
] | |
}, | |
"execution_count": 15, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"secrets = {\n", | |
" 'snowflake': {\n", | |
" 'user': 'USER',\n", | |
" 'password': 'PASSWORD',\n", | |
" 'account': 'ACCOUNT',\n", | |
" }\n", | |
"}\n", | |
"om.datasets.put(secrets, 'secrets', append=False)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"secrets = om.datasets.get('secrets')[0]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# build connection string\n", | |
"from getpass import getpass\n", | |
"#user = input('snowflake user name> ')\n", | |
"#password = getpass('snowflake password> ')\n", | |
"#account = input('snowflake account (remove .snowflake.com)> ')\n", | |
"snowflake_cxstr = 'snowflake://{user}:{password}@{account}/'.format(**secrets['snowflake'])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<sqlalchemy.engine.base.Connection at 0x7ffb755cbe80>" | |
] | |
}, | |
"execution_count": 13, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# store just the connection\n", | |
"om.datasets.drop('mysnowflake', force=True)\n", | |
"om.datasets.put(snowflake_cxstr, 'mysnowflake')\n", | |
"om.datasets.get('mysnowflake')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>COUNT(*)</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>6001215</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" COUNT(*)\n", | |
"0 6001215" | |
] | |
}, | |
"execution_count": 14, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# store a connection reference with sql \n", | |
"om.datasets.drop('mysnowflake', force=True)\n", | |
"om.datasets.put(snowflake_cxstr, 'mysnowflake', \n", | |
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')\n", | |
"om.datasets.get('mysnowflake')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>COUNT(*)</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>6001215</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" COUNT(*)\n", | |
"0 6001215" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# query the connection with a specific sql, returning a pandas dataframe\n", | |
"om.datasets.drop('mysnowflake', force=True)\n", | |
"om.datasets.put(snowflake_cxstr, 'mysnowflake')\n", | |
"om.datasets.get('mysnowflake', \n", | |
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"1rows [00:00, 7.71rows/s]\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>COUNT(*)</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>6001215</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" COUNT(*)\n", | |
"0 6001215" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# copy the dataset to a native omegaml dataset\n", | |
"om.datasets.put(snowflake_cxstr, \n", | |
" 'mysnowflake', \n", | |
" sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem',\n", | |
" copy=True)\n", | |
"om.datasets.get('mysnowflake')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"100000rows [00:09, 14153.82rows/s]\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"100000" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# copy the dataset to a native omegaml dataset\n", | |
"om.datasets.drop('mysnowflake', force=True)\n", | |
"om.datasets.put(snowflake_cxstr, \n", | |
" 'mysnowflake', \n", | |
" sql='select * from snowflake_sample_data.tpch_sf1.lineitem limit 100000',\n", | |
" parse_dates=['l_shipdate', 'l_receiptdate', 'l_commitdate'],\n", | |
" chunksize=50000,\n", | |
" append=False,\n", | |
" copy=True)\n", | |
"len(om.datasets.getl('mysnowflake'))" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.8" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment