Skip to content

Instantly share code, notes, and snippets.

@quasiben
Created July 16, 2020 17:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save quasiben/08a7f291039db2b04c2e28e1a6c21e3b to your computer and use it in GitHub Desktop.
Save quasiben/08a7f291039db2b04c2e28e1a6c21e3b to your computer and use it in GitHub Desktop.
SQL and Dask
"""
conda install -c conda-forge postgresql psycopg2 sqlalchemy
initdb -D /tmp/dask_db
postgresql -D /tmp/dask_db
pg_ctl -D /tmp/dask_db -l logfile start
createuser --encrypted --pwprompt dask
createdb --owner=dask dask_db
"""
import pandas as pd
import dask
import dask.dataframe as dd
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
uri = 'postgres+psycopg2://dask:dask@localhost:5432/dask_db'
engine = create_engine(uri, echo=True)
conn = engine.connect()
ddf_1 = dask.datasets.timeseries(start='2000-01-01', end='2000-01-10')
ddf_2 = dask.datasets.timeseries(start='2000-01-01', end='2000-01-02', freq='1h')
# GOAL -- perform an std on a column after filtering/groupby
# and assume data is very large
df = df1.join(df2, on='id')
df1 = df[df.y > 0]
df2 = df1.groupby('name').x.std()
# create empty table
# ddf._meta.to_sql('timeseries', engine, if_exists="replace", index=True)
# write data (takes about a 1 minute)
ddf_1.to_sql('timeseries_1', engine, if_exists="replace", index=True)
ddf_2.to_sql('timeseries_2', engine, if_exists="replace", index=True)
# full query
query = "select * from %s where y > 0" % ('timeseries_1')
df = pd.read_sql(query, conn)
# split query on something
start = '2000-01-01'
end = '2000-01-02 23:59:59'
query = "select * from %s where y > 0 and timestamp > %s and timestamp < %s " % ('timeseries_1', start, end)
query = "select * from %s where y > 0 and timestamp between '%s' and '%s'" % ('timeseries_1', start, end)
df = pd.read_sql(query, conn)
# > splitting is chunking!
def load_data(table, start, end):
query = "select * from %s where y > 0 and timestamp between '%s' and '%s'" % ('timeseries_1', start, end)
df = pd.read_sql(query, conn)
return df
dates = list(pd.date_range(start='2000-01-01', end='2000-01-11', freq='1d'))
dates = zip(dates[::2], dates[1::2])
dfs = []
for start, end in dates:
dfs.append(dask.delayed(load_data)('timeseries_1', start, end))
ddf = dd.from_delayed(dfs)
ddf.compute()
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy import Column, Text, TIMESTAMP, BigInteger, Float, and_
class TimeSeries1(Base):
__tablename__ = 'timeseries_1'
id = Column(BigInteger, primary_key=True)
timestamp = Column(TIMESTAMP)
name = Column(Text)
x = Column(Float)
y = Column(Float)
def __repr__(self):
return "<TimeSeries1(name='%s', id='%d')>" % (self.name, self.id)
class TimeSeries2(Base):
__tablename__ = 'timeseries_2'
id = Column(BigInteger, primary_key=True)
timestamp = Column(TIMESTAMP)
name = Column(Text)
x = Column(Float)
y = Column(Float)
def __repr__(self):
return "<TimeSeries2(name='%s', id='%d')>" % (self.name, self.id)
Session = sessionmaker(bind=conn)
session = Session()
print(session.query(TimeSeries1).first())
print(session.query(TimeSeries1).filter(TimeSeries1.y > 0).first().y)
query = session.query(TimeSeries1).filter(TimeSeries1.y > 0)
query = session.query(TimeSeries1).filter(TimeSeries1.y > 0)\
.filter(and_(TimeSeries1.timestamp > start, TimeSeries1.timestamp < end))
df = pd.read_sql_query(query.statement, conn)
# sqlachemy let's you programatically build query
query = session.query(TimeSeries1).join(TimeSeries2, TimeSeries1.id == TimeSeries2.id).filter(TimeSeries1.y > 0)
# > splitting is chunking!
def load_data(start, end):
query = session.query(TimeSeries1).join(TimeSeries2, TimeSeries1.id == TimeSeries2.id) \
.filter(TimeSeries1.y > 0) \
.filter(and_(TimeSeries1.timestamp > start, TimeSeries1.timestamp < end))
df = pd.read_sql(query.statement, conn)
return df
dates = list(pd.date_range(start='2000-01-01', end='2000-01-11', freq='1d'))
dates = zip(dates[::2], dates[1::2])
dfs = []
for start, end in dates:
dfs.append(dask.delayed(load_data)(start, end))
ddf = dd.from_delayed(dfs)
ddf.groupby('name').x.std().compute()
# Social Engineering
# Do joins in the database and query the entire table with materialized views
mat_view_query = """
CREATE MATERIALIZED VIEW ts_filtered AS
SELECT timeseries_1.id, timeseries_1.timestamp, timeseries_1.name, timeseries_1.x, timeseries_1.y
FROM timeseries_1 JOIN timeseries_2 ON timeseries_1.id = timeseries_2.id
WHERE timeseries_1.y > 0
"""
conn.execute(mat_view_query)
# would be nice if i could hand read_sql_table a connection rather than uri
ddf = dd.read_sql_table('ts_filtered', uri, 'timestamp')
ddf.groupby('name').x.std().compute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment