Created
July 16, 2020 17:01
-
-
Save quasiben/08a7f291039db2b04c2e28e1a6c21e3b to your computer and use it in GitHub Desktop.
SQL and Dask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
conda install -c conda-forge postgresql psycopg2 sqlalchemy | |
initdb -D /tmp/dask_db | |
postgresql -D /tmp/dask_db | |
pg_ctl -D /tmp/dask_db -l logfile start | |
createuser --encrypted --pwprompt dask | |
createdb --owner=dask dask_db | |
""" | |
import pandas as pd | |
import dask | |
import dask.dataframe as dd | |
from sqlalchemy import create_engine, MetaData | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy.dialects import postgresql | |
uri = 'postgres+psycopg2://dask:dask@localhost:5432/dask_db' | |
engine = create_engine(uri, echo=True) | |
conn = engine.connect() | |
ddf_1 = dask.datasets.timeseries(start='2000-01-01', end='2000-01-10') | |
ddf_2 = dask.datasets.timeseries(start='2000-01-01', end='2000-01-02', freq='1h') | |
# GOAL -- perform an std on a column after filtering/groupby | |
# and assume data is very large | |
df = df1.join(df2, on='id') | |
df1 = df[df.y > 0] | |
df2 = df1.groupby('name').x.std() | |
# create empty table | |
# ddf._meta.to_sql('timeseries', engine, if_exists="replace", index=True) | |
# write data (takes about a 1 minute) | |
ddf_1.to_sql('timeseries_1', engine, if_exists="replace", index=True) | |
ddf_2.to_sql('timeseries_2', engine, if_exists="replace", index=True) | |
# full query | |
query = "select * from %s where y > 0" % ('timeseries_1') | |
df = pd.read_sql(query, conn) | |
# split query on something | |
start = '2000-01-01' | |
end = '2000-01-02 23:59:59' | |
query = "select * from %s where y > 0 and timestamp > %s and timestamp < %s " % ('timeseries_1', start, end) | |
query = "select * from %s where y > 0 and timestamp between '%s' and '%s'" % ('timeseries_1', start, end) | |
df = pd.read_sql(query, conn) | |
# > splitting is chunking! | |
def load_data(table, start, end): | |
query = "select * from %s where y > 0 and timestamp between '%s' and '%s'" % ('timeseries_1', start, end) | |
df = pd.read_sql(query, conn) | |
return df | |
dates = list(pd.date_range(start='2000-01-01', end='2000-01-11', freq='1d')) | |
dates = zip(dates[::2], dates[1::2]) | |
dfs = [] | |
for start, end in dates: | |
dfs.append(dask.delayed(load_data)('timeseries_1', start, end)) | |
ddf = dd.from_delayed(dfs) | |
ddf.compute() | |
from sqlalchemy.ext.declarative import declarative_base | |
Base = declarative_base() | |
from sqlalchemy import Column, Text, TIMESTAMP, BigInteger, Float, and_ | |
class TimeSeries1(Base): | |
__tablename__ = 'timeseries_1' | |
id = Column(BigInteger, primary_key=True) | |
timestamp = Column(TIMESTAMP) | |
name = Column(Text) | |
x = Column(Float) | |
y = Column(Float) | |
def __repr__(self): | |
return "<TimeSeries1(name='%s', id='%d')>" % (self.name, self.id) | |
class TimeSeries2(Base): | |
__tablename__ = 'timeseries_2' | |
id = Column(BigInteger, primary_key=True) | |
timestamp = Column(TIMESTAMP) | |
name = Column(Text) | |
x = Column(Float) | |
y = Column(Float) | |
def __repr__(self): | |
return "<TimeSeries2(name='%s', id='%d')>" % (self.name, self.id) | |
Session = sessionmaker(bind=conn) | |
session = Session() | |
print(session.query(TimeSeries1).first()) | |
print(session.query(TimeSeries1).filter(TimeSeries1.y > 0).first().y) | |
query = session.query(TimeSeries1).filter(TimeSeries1.y > 0) | |
query = session.query(TimeSeries1).filter(TimeSeries1.y > 0)\ | |
.filter(and_(TimeSeries1.timestamp > start, TimeSeries1.timestamp < end)) | |
df = pd.read_sql_query(query.statement, conn) | |
# sqlachemy let's you programatically build query | |
query = session.query(TimeSeries1).join(TimeSeries2, TimeSeries1.id == TimeSeries2.id).filter(TimeSeries1.y > 0) | |
# > splitting is chunking! | |
def load_data(start, end): | |
query = session.query(TimeSeries1).join(TimeSeries2, TimeSeries1.id == TimeSeries2.id) \ | |
.filter(TimeSeries1.y > 0) \ | |
.filter(and_(TimeSeries1.timestamp > start, TimeSeries1.timestamp < end)) | |
df = pd.read_sql(query.statement, conn) | |
return df | |
dates = list(pd.date_range(start='2000-01-01', end='2000-01-11', freq='1d')) | |
dates = zip(dates[::2], dates[1::2]) | |
dfs = [] | |
for start, end in dates: | |
dfs.append(dask.delayed(load_data)(start, end)) | |
ddf = dd.from_delayed(dfs) | |
ddf.groupby('name').x.std().compute() | |
# Social Engineering | |
# Do joins in the database and query the entire table with materialized views | |
mat_view_query = """ | |
CREATE MATERIALIZED VIEW ts_filtered AS | |
SELECT timeseries_1.id, timeseries_1.timestamp, timeseries_1.name, timeseries_1.x, timeseries_1.y | |
FROM timeseries_1 JOIN timeseries_2 ON timeseries_1.id = timeseries_2.id | |
WHERE timeseries_1.y > 0 | |
""" | |
conn.execute(mat_view_query) | |
# would be nice if i could hand read_sql_table a connection rather than uri | |
ddf = dd.read_sql_table('ts_filtered', uri, 'timestamp') | |
ddf.groupby('name').x.std().compute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment