Skip to content

Instantly share code, notes, and snippets.

@tdsmith
Created October 26, 2020 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tdsmith/28c645f0c387013e7929b0a2d07a45c6 to your computer and use it in GitHub Desktop.
Save tdsmith/28c645f0c387013e7929b0a2d07a45c6 to your computer and use it in GitHub Desktop.
import datetime as dt
from functools import partial
import json
from pathlib import Path
import logging
import attr
from google.cloud import bigquery as bq
from google.cloud import storage as storage
from google.api_core import exceptions as bq_ex
import luigi
from pytz import utc
PROJECT = "moz-fx-data-bq-data-science"
logger = logging.getLogger("luigi-interface")
@attr.s
class BigQueryTarget(luigi.Target):
table: bq.table.TableReference = attr.ib()
client: bq.Client = attr.ib(default=attr.Factory(partial(bq.Client, PROJECT)))
_table_object: bq.table.Table = attr.ib(default=None)
@property
def table_object(self):
if self._table_object:
return self._table_object
try:
self._table_object = self.client.get_table(self.table)
except bq_ex.NotFound:
self._table_object = None
return self._table_object
def exists(self):
return self.table_object is not None
class CsvFromStalableSql(luigi.Task):
"""Return a small result set from an anonymous query."""
@property
def query_filename(self):
raise NotImplementedError
@property
def output_filename(self):
raise NotImplementedError
query_parameters = None
__client = None
@property
def client(self):
self.__client = self.__client or bq.Client(PROJECT)
return self.__client
def run(self):
query = Path(self.query_filename).read_text()
query_parameters = []
if self.query_parameters:
query_parameters = [
bq.query._query_param_from_api_repr(i)
for i in json.loads(Path(self.query_parameters).read_text())
]
job = self.client.query(
query,
bq.job.QueryJobConfig(
query_parameters=query_parameters,
use_legacy_sql=False,
),
)
df = job.to_dataframe()
df.to_csv(self.output_filename, index=False)
def output(self):
return luigi.LocalTarget(self.output_filename)
def complete(self):
for task in luigi.task.flatten(self.requires()):
if not task.complete():
return False
output_path = Path(self.output().path)
if not output_path.exists():
return False
output_mtime = output_path.stat().st_mtime
input_mtime = Path(self.query_filename).stat().st_mtime
return input_mtime < output_mtime
class TableFromStalableSql(luigi.Task):
@property
def query_filename(self):
raise NotImplementedError
@property
def destination_table(self):
raise NotImplementedError
query_parameters = None
__client = None
@property
def client(self):
self.__client = self.__client or bq.Client(PROJECT)
return self.__client
def run(self):
logger.info("Running query")
query = Path(self.query_filename).read_text()
query_parameters = []
if self.query_parameters:
query_parameters = [
bq.query._query_param_from_api_repr(i)
for i in json.loads(Path(self.query_parameters).read_text())
]
job = self.client.query(
query,
bq.job.QueryJobConfig(
destination=self.destination_table,
query_parameters=query_parameters,
use_legacy_sql=False,
write_disposition=bq.job.WriteDisposition.WRITE_TRUNCATE,
)
)
job.result(max_results=1)
def output(self):
ref = bq.table.TableReference.from_string(self.destination_table)
return BigQueryTarget(ref, self.client)
def complete(self):
for task in luigi.task.flatten(self.requires()):
if not task.complete():
return False
table = self.output().table_object
if table is None:
logger.info("Destination table does not exist")
return False
query_mtime = Path(self.query_filename).stat().st_mtime
query_mtime_dt = dt.datetime.fromtimestamp(query_mtime, utc)
logger.info(f"Local: {query_mtime_dt} Remote: {table.modified}")
return query_mtime_dt < table.modified
class ExtractedCsvFromStalableTable(luigi.Task):
@property
def source_table(self):
raise NotImplementedError
@property
def output_filename(self):
raise NotImplementedError
@property
def storage_url(self):
"""gs://bucket/filename.csv.gz"""
raise NotImplementedError
__client = None
@property
def client(self):
self.__client = self.__client or bq.Client(PROJECT)
return self.__client
def run(self):
job = self.client.extract_table(
self.source_table,
self.storage_url,
job_config=bq.job.ExtractJobConfig(
compression=bq.job.Compression.GZIP,
destination_format=bq.job.DestinationFormat.CSV,
)
)
job.result()
storage_client = storage.client.Client(self.client.project)
with open(self.output_filename, "wb") as f:
storage_client.download_blob_to_file(self.storage_url, f)
def output(self):
return luigi.LocalTarget(self.output_filename)
def complete(self):
for task in luigi.task.flatten(self.requires()):
if not task.complete():
return False
output = Path(self.output_filename)
if not output.exists():
return False
output_mtime = dt.datetime.fromtimestamp(output.stat().st_mtime, utc)
source_table_obj = self.client.get_table(self.source_table)
return source_table_obj.modified < output_mtime
####
# Experiment-specific stuff goes here.
# Run this like `PYTHONPATH=$PWD luigi --module build BuildReport --local-scheduler`
class ExtractJob(TableFromStalableSql):
query_filename = "extract.sql"
query_parameters = "extract_params.json"
destination_table = "moz-fx-data-bq-data-science.tdsmith.20191219_doh_enrolled"
class SummaryTable(TableFromStalableSql):
query_filename = "summarize.sql"
destination_table = "moz-fx-data-bq-data-science.tdsmith.20191219_doh_engagement_summary"
def requires(self):
return ExtractJob()
class SummaryCsv(ExtractedCsvFromStalableTable):
source_table = SummaryTable().destination_table
storage_url = "gs://moz-datascience-tdsmith/20191219_doh_engagement_summary.csv.gz"
output_filename = "20191219_doh_engagement_summary.csv.gz"
def requires(self):
return SummaryTable()
class EnrollmentsCsv(CsvFromStalableSql):
query_filename = "enrollments.sql"
output_filename = "enrollments.csv"
class HeuristicsConsistencyCsv(CsvFromStalableSql):
query_filename = "heuristics_consistency.sql"
query_parameters = "extract_params.json"
output_filename = "heuristics_consistency.csv"
class HeuristicsDailyCsv(CsvFromStalableSql):
query_filename = "heuristics_daily.sql"
query_parameters = "extract_params.json"
output_filename = "heuristics_daily.csv"
class HeuristicsEverCsv(CsvFromStalableSql):
query_filename = "heuristics_ever.sql"
query_parameters = "extract_params.json"
output_filename = "heuristics_ever.csv"
class OptoutCsv(CsvFromStalableSql):
query_filename = "optout.sql"
query_parameters = "extract_params.json"
output_filename = "optout.csv"
class PerfSummaryTable(TableFromStalableSql):
query_filename = "summarize_perf.sql"
destination_table = "moz-fx-data-bq-data-science.tdsmith.20191219_doh_perf_summary"
def requires(self):
return ExtractJob()
class PerfSummaryCsv(ExtractedCsvFromStalableTable):
source_table = PerfSummaryTable().destination_table
storage_url = "gs://moz-datascience-tdsmith/20191219_doh_perf_summary.csv.gz"
output_filename = "20191219_doh_perf_summary.csv.gz"
def requires(self):
return PerfSummaryTable()
class CrashCsv(CsvFromStalableSql):
query_filename = "crashes.sql"
query_parameters = "extract_params.json"
output_filename = "crashes.csv"
class BuildReport(luigi.Task):
def requires(self):
return [
SummaryCsv(),
PerfSummaryCsv(),
EnrollmentsCsv(),
HeuristicsConsistencyCsv(),
HeuristicsDailyCsv(),
HeuristicsEverCsv(),
OptoutCsv(),
CrashCsv(),
]
# PYTHONPATH=$PWD python -m luigi --module query Query --local-scheduler
from dscontrib.tdsmith.luigi import *
import luigi
class EnrollmentsTable(TableFromStalableSql):
query_filename = 'enrollments.sql'
query_parameters = 'params.json'
destination_table = 'moz-fx-data-bq-data-science.tdsmith.2020_03_doh_fake_enrollments'
class ExtractedTable(TableFromStalableSql):
query_filename = 'extract.sql'
query_parameters = 'params.json'
destination_table = 'moz-fx-data-bq-data-science.tdsmith.2020_03_doh_pings'
def requires(self):
return EnrollmentsTable()
class SummaryTable(TableFromStalableSql):
query_filename = 'summary.sql'
destination_table = 'moz-fx-data-bq-data-science.tdsmith.2020_03_doh_summary'
def requires(self): return ExtractedTable()
class EngagementTable(TableFromStalableSql):
query_filename = 'engagement.sql'
destination_table = 'moz-fx-data-bq-data-science.tdsmith.2020_03_doh_engagement'
def requires(self): return ExtractedTable()
class SummaryCsv(ExtractedCsvFromStalableTable):
source_table = SummaryTable.destination_table
storage_url = "gs://moz-datascience-tdsmith/2020_03_doh_summary.csv.gz"
output_filename = "summary.csv.gz"
def requires(self): return SummaryTable()
class EngagementCsv(ExtractedCsvFromStalableTable):
source_table = EngagementTable.destination_table
storage_url = "gs://moz-datascience-tdsmith/2020_03_doh_engagement.csv.gz"
output_filename = "engagement.csv.gz"
def requires(self): return EngagementTable()
class Query(luigi.Task):
def requires(self): return (SummaryCsv(), EngagementCsv())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment