Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Created May 3, 2017 23:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/a8f18830a4d8ba3fbae0790ba4503658 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/a8f18830a4d8ba3fbae0790ba4503658 to your computer and use it in GitHub Desktop.
Topline Backfill Validation
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[1]:
new_path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/topline-dashboard-data/topline-weekly.csv"
old_path = "s3://telemetry-private-analysis-2/executive-report-weekly/data/v4-weekly.csv.gz"
topline = spark.read.parquet('s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/topline_summary/v1/mode=weekly')
subset = topline.withColumnRenamed('report_start', 'date').columns
new = spark.read.csv(new_path, header=True).select(subset)
old = spark.read.csv(old_path, header=True).select(subset)
# In[2]:
print new.count()
print old.count()
# In[52]:
from pyspark.sql import Row, functions as F
dimensions = set(['geo', 'channel', 'os', 'date'])
metrics = set(subset) - dimensions
new_columns = {c:c for c in new.columns}
for metric in metrics:
new_columns[metric] = 'test_' + metric
cols = [new_columns[c] for c in new.columns]
renamed = new.rdd.toDF(cols)
joined = old.join(renamed, on=list(dimensions)).na.fill(0)
def calc_error(row):
d = {}
for dim in dimensions:
d[dim] = row[dim]
for metric in metrics:
test_data = row['test_' + metric]
orig_data = row[metric]
try:
a = float(orig_data)
b = float(test_data)
abs_err = abs(a-b)
#rel_err = abs(a-b)/a
d[metric] = abs_err
except ValueError as e:
print("Error converting rdd: {}, {}, {}"
.format(metric, orig_data, test_data))
print(e)
except ZeroDivisionError as e:
d[metric] = 0
return Row(**d)
error = joined.rdd.map(calc_error).toDF().select(subset)
error.describe().show()
# There is a slight error in the number of hours. Every other field is exactly the same.
# In[53]:
breakdown = (
error
.select('date', 'geo', 'channel', 'os', 'hours')
.where("geo<>'all' and channel<>'all' and os<>'all'")
.where('hours<>0.0')
.orderBy(F.desc('hours'))
)
print(breakdown.count())
breakdown.describe().show(truncate=False)
breakdown.show()
# In[49]:
breakdown.cube(list(dimensions)).sum().orderBy(F.desc('sum(hours)')).show()
# Based on the breakdown, the dataset is off by a total of 0.2 hours, or 12 minutes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment