Skip to content

Instantly share code, notes, and snippets.

@relud
Last active August 20, 2019 00:08
Show Gist options
  • Save relud/93ac0ef849746b0ce74d8a56c79e4826 to your computer and use it in GitHub Desktop.
Save relud/93ac0ef849746b0ce74d8a56c79e4826 to your computer and use it in GitHub Desktop.
export client probe counts v1 to firestore
#!/usr/bin/env bash
set -e
CLUSTER="${CLUSTER:-export-to-firestore}"
gcloud beta dataproc clusters create "$CLUSTER"
--max-idle=10m \
--metadata='PIP_PACKAGES=google-cloud-firestore==1.3.0' \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
gcloud dataproc jobs submit pyspark export_to_firestore.py \
--cluster="$CLUSTER" \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
#!/usr/bin/env python
"""Read a table from the BigQuery Storage API and write to Cloud Firestore."""
from pyspark.sql import SparkSession
from google.cloud import firestore
import hashlib
def batch_load(group):
client = firestore.Client()
batch = client.batch()
for row in group[1]:
doc = row.asDict()
collection = "{}-{}".format(doc.pop("channel"), doc.pop("app_version"))
doc_id = hashlib.blake2b(
"{metric}-{build_id}-{os}-{agg_type}".format(**doc).encode()
).hexdigest()
batch.set(FIRESTORE.collection(collection).document(doc_id), doc)
batch.commit()
# run spark job from parsed args
(
SparkSession.builder.appName("export_to_firestore")
.getOrCreate()
.read.format("bigquery")
.option("table", "telemetry.client_probe_counts_v1")
.load()
.selectExpr(
"app_version",
"channel",
"metric",
"app_build_id AS build_id",
"os",
"agg_type",
"aggregates.key_value AS aggregates",
)
.where("app_version IS NOT NULL AND channel IS NOT NULL")
.rdd.zipWithIndex()
.map(lambda pair: (pair[1] // 500, pair[0]))
.groupByKey()
.foreach(batch_load)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment