Created
November 18, 2015 18:22
-
-
Save bsmedberg/1009fff7dfc3e22b61e8 to your computer and use it in GitHub Desktop.
Redshift Rollup active-users
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
activedays, | |
COUNT(*) | |
FROM | |
( SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
DENSE_RANK() OVER (PARTITION | |
BY | |
clientid | |
ORDER BY | |
subsessiondate ASC) AS activedays, | |
ROW_NUMBER() OVER (PARTITION | |
BY | |
clientid | |
ORDER BY | |
subsessiondate DESC) AS rownumber | |
FROM | |
( SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151103 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM main_summary_20151104 | |
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151105 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM main_summary_20151106 | |
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151107 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM main_summary_20151108 | |
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151109 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM main_summary_20151110 | |
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
\activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151111 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM main_summary_20151112 | |
WHERE subsessiondate <= '2015-11-11'::date AND subsessiondate > '2015-11-11'::date - '1 week'::interval UNION ALL | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM | |
main_summary_20151113 | |
WHERE | |
subsessiondate <= '2015-11-11'::date | |
AND subsessiondate > '2015-11-11'::date - '1 week'::interval ) | |
) | |
WHERE rownumber = 1 | |
GROUP BY | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
activedays |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Run TimeTime the query spent executing. 12m 51.22s | |
XN HashAggregate (cost=2000713134745.34..2000713136714.66 rows=787725 width=392) | |
-> XN Subquery Scan derived_table2 (cost=2000618332158.43..2000712859041.91 rows=7877241 width=392) | |
-> XN Window (cost=2000618332158.43..2000693165941.18 rows=1575448058 width=420) | |
-> XN Sort (cost=2000618332158.43..2000622270778.57 rows=1575448058 width=420) | |
-> XN Window (cost=1000306762767.01..1000377657929.62 rows=1575448058 width=420) | |
-> XN Sort (cost=1000306762767.01..1000310701387.15 rows=1575448058 width=420) | |
-> XN Network (cost=0.00..66088538.20 rows=1575448058 width=420) | |
-> XN Subquery Scan derived_table1 (cost=0.00..66088538.20 rows=1575448058 width=420) | |
-> XN Append (cost=0.00..50334057.62 rows=1575448058 width=134) | |
-> XN Subquery Scan "*SELECT* 1" (cost=0.00..669407.32 rows=700 width=134) | |
-> XN Subquery Scan "*SELECT* 2" (cost=0.00..1262759.27 rows=2150495 width=133) | |
-> XN Subquery Scan "*SELECT* 3" (cost=0.00..2671036.42 rows=88249006 width=133) | |
-> XN Subquery Scan "*SELECT* 4" (cost=0.00..3104343.45 rows=121095165 width=133) | |
-> XN Subquery Scan "*SELECT* 5" (cost=0.00..3809646.76 rows=150234292 width=133) | |
-> XN Subquery Scan "*SELECT* 6" (cost=0.00..4914730.34 rows=194861282 width=133) | |
-> XN Subquery Scan "*SELECT* 7" (cost=0.00..6878443.50 rows=273509358 width=133) | |
-> XN Subquery Scan "*SELECT* 8" (cost=0.00..7796914.55 rows=309879023 width=133) | |
-> XN Subquery Scan "*SELECT* 9" (cost=0.00..7811881.64 rows=308848772 width=133) | |
-> XN Subquery Scan "*SELECT* 10" (cost=0.00..6255473.21 rows=110897657 width=133) | |
-> XN Subquery Scan "*SELECT* 11" (cost=0.00..5159421.16 rows=15722308 width=133) | |
-> XN Seq Scan on main_summary_20151103 (cost=0.00..669400.32 rows=700 width=134) | |
-> XN Seq Scan on main_summary_20151104 (cost=0.00..1241254.32 rows=2150495 width=133) | |
-> XN Seq Scan on main_summary_20151105 (cost=0.00..1788546.36 rows=88249006 width=133) | |
-> XN Seq Scan on main_summary_20151106 (cost=0.00..1893391.80 rows=121095165 width=133) | |
-> XN Seq Scan on main_summary_20151107 (cost=0.00..2307303.84 rows=150234292 width=133) | |
-> XN Seq Scan on main_summary_20151108 (cost=0.00..2966117.52 rows=194861282 width=133) | |
-> XN Seq Scan on main_summary_20151109 (cost=0.00..4143349.92 rows=273509358 width=133) | |
-> XN Seq Scan on main_summary_20151110 (cost=0.00..4698124.32 rows=309879023 width=133) | |
-> XN Seq Scan on main_summary_20151111 (cost=0.00..4723393.92 rows=308848772 width=133) | |
-> XN Seq Scan on main_summary_20151112 (cost=0.00..5146496.64 rows=110897657 width=133) | |
-> XN Seq Scan on main_summary_20151113 (cost=0.00..5002198.08 rows=15722308 width=133) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
import os | |
import sys | |
import csv | |
from datetime import datetime, date, timedelta | |
from cStringIO import StringIO | |
from gzip import GzipFile | |
import boto3 | |
import shutil | |
latency_interval = 10 | |
default_bucket = 'telemetry-public-analysis-2' | |
current_cutoff = date(2015, 11, 13) | |
def compress_in_memory(): | |
"""Returns (write_fd, read_fd) to compress to gzip""" | |
read_fd = StringIO() | |
write_fd = GzipFile(mode="wb", fileobj=read_fd) | |
return write_fd, read_fd | |
def write_to_s3(fd, path, bucket, mimetype='text/plain'): | |
"""Write the contents of file-like object `fd` to S3. If any properties | |
are specified, write those to S3 as well.""" | |
s3 = boto3.resource('s3') | |
s3.Object(bucket, path).put(Body=fd, ContentEncoding='gzip', ContentType=mimetype) | |
def date_range(date, days, cutoff): | |
"""Iterate from `date` for the next `days`""" | |
for d in range(0, days): | |
curd = date + timedelta(days=d) | |
if curd > cutoff: | |
return | |
yield curd | |
def put_counts(cur, date): | |
from_template = ''' | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
reason, | |
subsessionlength, | |
abortsplugin, | |
abortscontent, | |
abortsgmplugin, | |
crashesdetectedplugin, | |
pluginhangs, | |
crashesdetectedcontent, | |
crashesdetectedgmplugin, | |
crashsubmitattemptmain, | |
crashsubmitattemptcontent, | |
crashsubmitattemptplugin, | |
crashsubmitsuccessmain, | |
crashsubmitsuccesscontent, | |
crashsubmitsuccessplugin | |
FROM {tablename} | |
WHERE subsessiondate = %(day)s''' | |
final_template = ''' | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
SUM(CASE WHEN reason = 'aborted-session' THEN 1 ELSE 0 END), | |
SUM(subsessionlength), | |
SUM(abortsplugin), | |
SUM(abortscontent), | |
SUM(abortsgmplugin), | |
SUM(crashesdetectedplugin), | |
SUM(pluginhangs), | |
SUM(crashesdetectedcontent), | |
SUM(crashesdetectedgmplugin), | |
SUM(crashsubmitattemptmain), | |
SUM(crashsubmitattemptcontent), | |
SUM(crashsubmitattemptplugin), | |
SUM(crashsubmitsuccessmain), | |
SUM(crashsubmitsuccesscontent), | |
SUM(crashsubmitsuccessplugin) | |
FROM ( {unionclause} ) | |
GROUP BY | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country''' | |
union_query = ' UNION ALL '.join( | |
from_template.format(tablename='main_summary_{}'.format(d.strftime('%Y%m%d'))) | |
for d in date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff)) | |
final_query = final_template.format(unionclause=union_query) | |
cur.execute(final_query, {'day': date}) | |
writefd, readfd = compress_in_memory() | |
with writefd: | |
outcsv = csv.writer(writefd) | |
outcsv.writerow(( | |
'buildversion', | |
'buildid', | |
'buildarchitecture', | |
'channel', | |
'os', | |
'osversion', | |
'osservicepackmajor', | |
'osservicepackminor', | |
'locale', | |
'activeexperimentid', | |
'activeexperimentbranch', | |
'country', | |
'abortedsessioncount' | |
'subsessionlengths', | |
'abortsplugin', | |
'abortscontent', | |
'abortsgmplugin', | |
'crashesdetectedplugin', | |
'pluginhangs', | |
'crashesdetectedcontent', | |
'crashesdetectedgmplugin', | |
'crashsubmitattemptmain', | |
'crashsubmitattemptcontent', | |
'crashsubmitattemptplugin', | |
'crashsubmitsuccessmain', | |
'crashsubmitsuccesscontent', | |
'crashsubmitsuccessplugin')) | |
for r in cur: | |
outcsv.writerow(r) | |
readfd.seek(0) | |
write_to_s3(readfd, 'stability-rollups/{year}/{date}-main.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d')), default_bucket) | |
def put_actives(cur, date, weekly): | |
if weekly: | |
where_clause = '''subsessiondate <= %(day)s::date AND subsessiondate > %(day)s::date - '1 week'::interval''' | |
else: | |
where_clause = '''subsessiondate = %(day)s''' | |
from_template = ''' | |
SELECT | |
subsessiondate, | |
clientid, | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country | |
FROM {tablename} | |
WHERE {whereclause}''' | |
final_template = ''' | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
activedays, | |
COUNT(*) | |
FROM ( | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
DENSE_RANK() OVER (PARTITION BY clientid ORDER BY subsessiondate ASC) AS activedays, | |
ROW_NUMBER() OVER (PARTITION BY clientid ORDER BY subsessiondate DESC) AS rownumber | |
FROM ( {unionclause} ) | |
) | |
WHERE rownumber = 1 | |
GROUP BY | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
activedays''' | |
if weekly: | |
dates = date_range(date - timedelta(days=8), latency_interval + 1 + 7, current_cutoff) | |
else: | |
dates = date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff) | |
union_query = ' UNION ALL '.join( | |
from_template.format(tablename='main_summary_{}'.format(d.strftime('%Y%m%d')), whereclause=where_clause) | |
for d in dates) | |
final_query = final_template.format(unionclause=union_query) | |
#print "Executing query: ", cur.mogrify(final_query, {'day': date}) | |
#sys.exit(1) | |
cur.execute(final_query, {'day': date}) | |
writefd, readfd = compress_in_memory() | |
with writefd: | |
outcsv = csv.writer(writefd) | |
outcsv.writerow(( | |
'buildversion', | |
'buildid', | |
'buildarchitecture', | |
'channel', | |
'os', | |
'osversion', | |
'osservicepackmajor', | |
'osservicepackminor', | |
'locale', | |
'activeexperimentid', | |
'activeexperimentbranch', | |
'country', | |
'active_days', | |
'active_users')) | |
for r in cur: | |
outcsv.writerow(r) | |
readfd.seek(0) | |
if weekly: | |
segment = 'weekly' | |
else: | |
segment = 'daily' | |
write_to_s3(readfd, 'stability-rollups/{year}/{date}-active-{segment}.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d'), segment=segment), default_bucket) | |
def put_crashes(cur, date): | |
from_template = ''' | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
hascrashenvironment | |
FROM {tablename} | |
WHERE crashdate = %(day)s''' | |
final_template = ''' | |
SELECT | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
hascrashenvironment, | |
COUNT(*) | |
FROM ( {unionclause} ) | |
GROUP BY | |
buildversion, | |
buildid, | |
buildarchitecture, | |
channel, | |
os, | |
osversion, | |
osservicepackmajor, | |
osservicepackminor, | |
locale, | |
activeexperimentid, | |
activeexperimentbranch, | |
country, | |
hascrashenvironment''' | |
union_query = ' UNION ALL '.join( | |
from_template.format(tablename='crash_summary_{}'.format(d.strftime('%Y%m%d'))) | |
for d in date_range(date - timedelta(days=1), latency_interval + 1, current_cutoff)) | |
final_query = final_template.format(unionclause=union_query) | |
cur.execute(final_query, {'day': date}) | |
writefd, readfd = compress_in_memory() | |
with writefd: | |
outcsv = csv.writer(writefd) | |
outcsv.writerow(( | |
'buildversion', | |
'buildid', | |
'buildarchitecture', | |
'channel', | |
'os', | |
'osversion', | |
'osservicepackmajor', | |
'osservicepackminor', | |
'locale', | |
'activeexperimentid', | |
'activeexperimentbranch', | |
'country', | |
'hascrashenvironment', | |
'crashes')) | |
for r in cur: | |
outcsv.writerow(r) | |
readfd.seek(0) | |
write_to_s3(readfd, 'stability-rollups/{year}/{date}-crashes.csv.gz'.format(year=date.year, date=date.strftime('%Y%m%d')), default_bucket) | |
weekday_saturday = 5 | |
def put_daily(cur, date): | |
put_counts(cur, date) | |
put_actives(cur, date, False) | |
if True: # date.weekday() == weekday_saturday: | |
put_actives(cur, date, True) | |
put_crashes(cur, date) | |
if __name__ == '__main__': | |
date, = sys.argv[1:] | |
date = datetime.strptime(date, '%Y%m%d').date() | |
import getpass | |
connectionstr = getpass.getpass("Database connection string: ") | |
conn = psycopg2.connect(connectionstr) | |
cur = conn.cursor() | |
put_daily(cur, date) | |
print "Done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment