Skip to content

Instantly share code, notes, and snippets.

@mdellavo
Created August 19, 2011 12:20
Show Gist options
  • Select an option

  • Save mdellavo/1156683 to your computer and use it in GitHub Desktop.

Select an option

Save mdellavo/1156683 to your computer and use it in GitHub Desktop.
sqlalchemy-cte-mssql
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import FromClause, column, Select
class CommonTableExpression(FromClause):
"""Represent the 'inside' of a common table
expression."""
def __init__(self, name, colnames):
self.name = name
self.colnames = colnames
def _populate_column_collection(self):
self._columns.update(
(name, column(name))
for name in self.colnames
)
@classmethod
def create(cls, name, colnames):
target = CommonTableExpression(name, colnames)
class ctx(object):
def __enter__(self):
return target
def __exit__(self, *arg, **kw):
pass
return ctx()
class SelectFromCTE(FromClause):
"""Represent the 'outside' of the CTE.
Ultimately this would be integrated into Select
itself, since we just want a Select with an
extra clause on top. "CommonTableExpression" objects
would be pulled from the FROM clause
and rendered on top.
"""
def __init__(self, inner_thing, stmt):
self.inner_thing = inner_thing
self.stmt = stmt
def _populate_column_collection(self):
for name, c in zip(self.inner_thing.colnames, self.stmt.c):
c._make_proxy(self, name)
@compiles(CommonTableExpression)
def _recur_inner_thing(element, compiler, **kw):
return element.name
@compiles(SelectFromCTE)
def _recur_outer_thing(element, compiler, **kw):
cte = (
"WITH %s(%s) AS (\n"
"%s\n"
")\n"% (
element.inner_thing.name,
", ".join(element.inner_thing.colnames),
compiler.process(element.stmt, **kw) )
)
compiler._cte = cte
text = "SELECT * FROM %s" % element.inner_thing.name
# FIXME
if kw.get('asfrom'):
text = "(%s) as x" % text
return text
@compiles(Select)
def visit_select_cte(element, compiler, **kw):
text = compiler.visit_select(element)
if hasattr(compiler, '_cte') and not bool(compiler.stack):
text = compiler._cte + text + "\n OPTION (MAXRECURSION 0)"
if kw.get('asfrom'):
text = "(%s)" % text
return text
from sqlalchemy import select, DateTime, Integer
from sqlalchemy.sql.expression import func, cast, bindparam, text
columns = ['date', 'year', 'month', 'day', 'day_of_week', 'day_of_year',
'week', 'quarter']
def column_exps(exp):
return [ exp, func.YEAR(exp), func.MONTH(exp), func.DAY(exp),
func.DATEPART( text('dw'), exp),
func.DATEPART( text('dy'), exp),
func.DATEPART( text('wk'), exp),
func.DATEPART( text('q'), exp) ]
with CommonTableExpression.create('all_dates', columns) as all_dates:
start_exp = cast(bindparam('start', type_=DateTime), DateTime)
stop_exp = cast(bindparam('stop', type_=DateTime), DateTime)
step_exp = bindparam('step', type_=Integer)
next_exp = func.DATEADD( text('dd'), step_exp, all_dates.c.date )
s1 = select(column_exps(start_exp))
s2 = select(column_exps(next_exp), from_obj=all_dates).where(next_exp <= stop_exp)
s = s1.union_all(s2)
all_dates = SelectFromCTE(all_dates, s)
from sqlalchemy.orm import mapper
class Date(object):
query = Session.query_property()
@classmethod
def range(cls, start, stop, step=1):
return cls.query.params(start=start, stop=stop, step=step)
mapper(Date, all_dates, primary_key=[all_dates.c.date])
query = Date.range('2011-01-01 00:00:00', '2012-02-01 00:00:00') \
.filter(Date.day_of_week == 7) \
.order_by(Date.date.desc())
for d in query:
print d.date, "\t", d.year, "\t", d.month, "\t", d.day, "\t", \
d.day_of_week, "\t", d.day_of_year, "\t", d.week, "\t", d.quarter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment