Skip to content

Instantly share code, notes, and snippets.

@bsmedberg
Created November 18, 2015 18:22
Show Gist options
  • Save bsmedberg/1009fff7dfc3e22b61e8 to your computer and use it in GitHub Desktop.
Save bsmedberg/1009fff7dfc3e22b61e8 to your computer and use it in GitHub Desktop.
Redshift Rollup active-users
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
activedays,
COUNT(*)
FROM
( SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
DENSE_RANK() OVER (PARTITION
BY
clientid
ORDER BY
subsessiondate ASC) AS activedays,
ROW_NUMBER() OVER (PARTITION
BY
clientid
ORDER BY
subsessiondate DESC) AS rownumber
FROM
( SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151103
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM main_summary_20151104
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151105
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM main_summary_20151106
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151107
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM main_summary_20151108
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151109
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM main_summary_20151110
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
\activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151111
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM main_summary_20151112
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM
main_summary_20151113
WHERE
subsessiondate <= '2015-11-11'::date
AND subsessiondate > '2015-11-11'::date - '1 week'::interval )
)
WHERE rownumber = 1
GROUP BY
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
activedays
Run TimeTime the query spent executing. 12m 51.22s
XN HashAggregate (cost=2000713134745.34..2000713136714.66 rows=787725 width=392)
-> XN Subquery Scan derived_table2 (cost=2000618332158.43..2000712859041.91 rows=7877241 width=392)
-> XN Window (cost=2000618332158.43..2000693165941.18 rows=1575448058 width=420)
-> XN Sort (cost=2000618332158.43..2000622270778.57 rows=1575448058 width=420)
-> XN Window (cost=1000306762767.01..1000377657929.62 rows=1575448058 width=420)
-> XN Sort (cost=1000306762767.01..1000310701387.15 rows=1575448058 width=420)
-> XN Network (cost=0.00..66088538.20 rows=1575448058 width=420)
-> XN Subquery Scan derived_table1 (cost=0.00..66088538.20 rows=1575448058 width=420)
-> XN Append (cost=0.00..50334057.62 rows=1575448058 width=134)
-> XN Subquery Scan "*SELECT* 1" (cost=0.00..669407.32 rows=700 width=134)
-> XN Subquery Scan "*SELECT* 2" (cost=0.00..1262759.27 rows=2150495 width=133)
-> XN Subquery Scan "*SELECT* 3" (cost=0.00..2671036.42 rows=88249006 width=133)
-> XN Subquery Scan "*SELECT* 4" (cost=0.00..3104343.45 rows=121095165 width=133)
-> XN Subquery Scan "*SELECT* 5" (cost=0.00..3809646.76 rows=150234292 width=133)
-> XN Subquery Scan "*SELECT* 6" (cost=0.00..4914730.34 rows=194861282 width=133)
-> XN Subquery Scan "*SELECT* 7" (cost=0.00..6878443.50 rows=273509358 width=133)
-> XN Subquery Scan "*SELECT* 8" (cost=0.00..7796914.55 rows=309879023 width=133)
-> XN Subquery Scan "*SELECT* 9" (cost=0.00..7811881.64 rows=308848772 width=133)
-> XN Subquery Scan "*SELECT* 10" (cost=0.00..6255473.21 rows=110897657 width=133)
-> XN Subquery Scan "*SELECT* 11" (cost=0.00..5159421.16 rows=15722308 width=133)
-> XN Seq Scan on main_summary_20151103 (cost=0.00..669400.32 rows=700 width=134)
-> XN Seq Scan on main_summary_20151104 (cost=0.00..1241254.32 rows=2150495 width=133)
-> XN Seq Scan on main_summary_20151105 (cost=0.00..1788546.36 rows=88249006 width=133)
-> XN Seq Scan on main_summary_20151106 (cost=0.00..1893391.80 rows=121095165 width=133)
-> XN Seq Scan on main_summary_20151107 (cost=0.00..2307303.84 rows=150234292 width=133)
-> XN Seq Scan on main_summary_20151108 (cost=0.00..2966117.52 rows=194861282 width=133)
-> XN Seq Scan on main_summary_20151109 (cost=0.00..4143349.92 rows=273509358 width=133)
-> XN Seq Scan on main_summary_20151110 (cost=0.00..4698124.32 rows=309879023 width=133)
-> XN Seq Scan on main_summary_20151111 (cost=0.00..4723393.92 rows=308848772 width=133)
-> XN Seq Scan on main_summary_20151112 (cost=0.00..5146496.64 rows=110897657 width=133)
-> XN Seq Scan on main_summary_20151113 (cost=0.00..5002198.08 rows=15722308 width=133)
import psycopg2
import os
import sys
import csv
from datetime import datetime, date, timedelta
from cStringIO import StringIO
from gzip import GzipFile
import boto3
import shutil
latency_interval = 10
default_bucket = 'telemetry-public-analysis-2'
current_cutoff = date(2015, 11, 13)
def compress_in_memory():
"""Returns (write_fd, read_fd) to compress to gzip"""
read_fd = StringIO()
write_fd = GzipFile(mode="wb", fileobj=read_fd)
return write_fd, read_fd
def write_to_s3(fd, path, bucket, mimetype='text/plain'):
"""Write the contents of file-like object `fd` to S3. If any properties
are specified, write those to S3 as well."""
s3 = boto3.resource('s3')
s3.Object(bucket, path).put(Body=fd, ContentEncoding='gzip', ContentType=mimetype)
def date_range(date, days, cutoff):
"""Iterate from `date` for the next `days`"""
for d in range(0, days):
curd = date + timedelta(days=d)
if curd > cutoff:
return
yield curd
def put_counts(cur, date):
from_template = '''
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
reason,
subsessionlength,
abortsplugin,
abortscontent,
abortsgmplugin,
crashesdetectedplugin,
pluginhangs,
crashesdetectedcontent,
crashesdetectedgmplugin,
crashsubmitattemptmain,
crashsubmitattemptcontent,
crashsubmitattemptplugin,
crashsubmitsuccessmain,
crashsubmitsuccesscontent,
crashsubmitsuccessplugin
FROM {tablename}
WHERE subsessiondate = %(day)s'''
final_template = '''
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
SUM(CASE WHEN reason = 'aborted-session' THEN 1 ELSE 0 END),
SUM(subsessionlength),
SUM(abortsplugin),
SUM(abortscontent),
SUM(abortsgmplugin),
SUM(crashesdetectedplugin),
SUM(pluginhangs),
SUM(crashesdetectedcontent),
SUM(crashesdetectedgmplugin),
SUM(crashsubmitattemptmain),
SUM(crashsubmitattemptcontent),
SUM(crashsubmitattemptplugin),
SUM(crashsubmitsuccessmain),
SUM(crashsubmitsuccesscontent),
SUM(crashsubmitsuccessplugin)
FROM ( {unionclause} )
GROUP BY
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country'''
union_query = ' UNION ALL '.join(
from_template.format(tablename='main_summary_{}'.format(d.strftime('%Y%m%d')))
for d in date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff))
final_query = final_template.format(unionclause=union_query)
cur.execute(final_query, {'day': date})
writefd, readfd = compress_in_memory()
with writefd:
outcsv = csv.writer(writefd)
outcsv.writerow((
'buildversion',
'buildid',
'buildarchitecture',
'channel',
'os',
'osversion',
'osservicepackmajor',
'osservicepackminor',
'locale',
'activeexperimentid',
'activeexperimentbranch',
'country',
'abortedsessioncount'
'subsessionlengths',
'abortsplugin',
'abortscontent',
'abortsgmplugin',
'crashesdetectedplugin',
'pluginhangs',
'crashesdetectedcontent',
'crashesdetectedgmplugin',
'crashsubmitattemptmain',
'crashsubmitattemptcontent',
'crashsubmitattemptplugin',
'crashsubmitsuccessmain',
'crashsubmitsuccesscontent',
'crashsubmitsuccessplugin'))
for r in cur:
outcsv.writerow(r)
readfd.seek(0)
write_to_s3(readfd, 'stability-rollups/{year}/{date}-main.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d')), default_bucket)
def put_actives(cur, date, weekly):
if weekly:
where_clause = '''subsessiondate <= %(day)s::date AND subsessiondate > %(day)s::date - '1 week'::interval'''
else:
where_clause = '''subsessiondate = %(day)s'''
from_template = '''
SELECT
subsessiondate,
clientid,
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country
FROM {tablename}
WHERE {whereclause}'''
final_template = '''
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
activedays,
COUNT(*)
FROM (
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
DENSE_RANK() OVER (PARTITION BY clientid ORDER BY subsessiondate ASC) AS activedays,
ROW_NUMBER() OVER (PARTITION BY clientid ORDER BY subsessiondate DESC) AS rownumber
FROM ( {unionclause} )
)
WHERE rownumber = 1
GROUP BY
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
activedays'''
if weekly:
dates = date_range(date - timedelta(days=8), latency_interval + 1 + 7, current_cutoff)
else:
dates = date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff)
union_query = ' UNION ALL '.join(
from_template.format(tablename='main_summary_{}'.format(d.strftime('%Y%m%d')), whereclause=where_clause)
for d in dates)
final_query = final_template.format(unionclause=union_query)
#print "Executing query: ", cur.mogrify(final_query, {'day': date})
#sys.exit(1)
cur.execute(final_query, {'day': date})
writefd, readfd = compress_in_memory()
with writefd:
outcsv = csv.writer(writefd)
outcsv.writerow((
'buildversion',
'buildid',
'buildarchitecture',
'channel',
'os',
'osversion',
'osservicepackmajor',
'osservicepackminor',
'locale',
'activeexperimentid',
'activeexperimentbranch',
'country',
'active_days',
'active_users'))
for r in cur:
outcsv.writerow(r)
readfd.seek(0)
if weekly:
segment = 'weekly'
else:
segment = 'daily'
write_to_s3(readfd, 'stability-rollups/{year}/{date}-active-{segment}.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d'), segment=segment), default_bucket)
def put_crashes(cur, date):
from_template = '''
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
hascrashenvironment
FROM {tablename}
WHERE crashdate = %(day)s'''
final_template = '''
SELECT
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
hascrashenvironment,
COUNT(*)
FROM ( {unionclause} )
GROUP BY
buildversion,
buildid,
buildarchitecture,
channel,
os,
osversion,
osservicepackmajor,
osservicepackminor,
locale,
activeexperimentid,
activeexperimentbranch,
country,
hascrashenvironment'''
union_query = ' UNION ALL '.join(
from_template.format(tablename='crash_summary_{}'.format(d.strftime('%Y%m%d')))
for d in date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff))
final_query = final_template.format(unionclause=union_query)
cur.execute(final_query, {'day': date})
writefd, readfd = compress_in_memory()
with writefd:
outcsv = csv.writer(writefd)
outcsv.writerow((
'buildversion',
'buildid',
'buildarchitecture',
'channel',
'os',
'osversion',
'osservicepackmajor',
'osservicepackminor',
'locale',
'activeexperimentid',
'activeexperimentbranch',
'country',
'hascrashenvironment',
'crashes'))
for r in cur:
outcsv.writerow(r)
readfd.seek(0)
write_to_s3(readfd, 'stability-rollups/{year}/{date}-crashes.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d')), default_bucket)
weekday_saturday = 5
def put_daily(cur, date):
put_counts(cur, date)
put_actives(cur, date, False)
if True: # date.weekday() == weekday_saturday:
put_actives(cur, date, True)
put_crashes(cur, date)
if __name__ == '__main__':
date, = sys.argv[1:]
date = datetime.strptime(date, '%Y%m%d').date()
import getpass
connectionstr = getpass.getpass("Database connection string: ")
conn = psycopg2.connect(connectionstr)
cur = conn.cursor()
put_daily(cur, date)
print "Done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment