Created
May 3, 2017 23:46
-
-
Save acmiyaguchi/a8f18830a4d8ba3fbae0790ba4503658 to your computer and use it in GitHub Desktop.
Topline Backfill Validation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[1]: | |
new_path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/topline-dashboard-data/topline-weekly.csv" | |
old_path = "s3://telemetry-private-analysis-2/executive-report-weekly/data/v4-weekly.csv.gz" | |
topline = spark.read.parquet('s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/topline_summary/v1/mode=weekly') | |
subset = topline.withColumnRenamed('report_start', 'date').columns | |
new = spark.read.csv(new_path, header=True).select(subset) | |
old = spark.read.csv(old_path, header=True).select(subset) | |
# In[2]: | |
print new.count() | |
print old.count() | |
# In[52]: | |
from pyspark.sql import Row, functions as F | |
dimensions = set(['geo', 'channel', 'os', 'date']) | |
metrics = set(subset) - dimensions | |
new_columns = {c:c for c in new.columns} | |
for metric in metrics: | |
new_columns[metric] = 'test_' + metric | |
cols = [new_columns[c] for c in new.columns] | |
renamed = new.rdd.toDF(cols) | |
joined = old.join(renamed, on=list(dimensions)).na.fill(0) | |
def calc_error(row): | |
d = {} | |
for dim in dimensions: | |
d[dim] = row[dim] | |
for metric in metrics: | |
test_data = row['test_' + metric] | |
orig_data = row[metric] | |
try: | |
a = float(orig_data) | |
b = float(test_data) | |
abs_err = abs(a-b) | |
#rel_err = abs(a-b)/a | |
d[metric] = abs_err | |
except ValueError as e: | |
print("Error converting rdd: {}, {}, {}" | |
.format(metric, orig_data, test_data)) | |
print(e) | |
except ZeroDivisionError as e: | |
d[metric] = 0 | |
return Row(**d) | |
error = joined.rdd.map(calc_error).toDF().select(subset) | |
error.describe().show() | |
# There is a slight error in the number of hours. Every other field is exactly the same. | |
# In[53]: | |
breakdown = ( | |
error | |
.select('date', 'geo', 'channel', 'os', 'hours') | |
.where("geo<>'all' and channel<>'all' and os<>'all'") | |
.where('hours<>0.0') | |
.orderBy(F.desc('hours')) | |
) | |
print(breakdown.count()) | |
breakdown.describe().show(truncate=False) | |
breakdown.show() | |
# In[49]: | |
breakdown.cube(list(dimensions)).sum().orderBy(F.desc('sum(hours)')).show() | |
# Based on the breakdown, the dataset is off by a total of 0.2 hours, or 12 minutes. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment