Skip to content

Instantly share code, notes, and snippets.

@mauliksoneji
Created October 28, 2019 06:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mauliksoneji/48d7d84976ee5957de90e03ba2314540 to your computer and use it in GitHub Desktop.
Save mauliksoneji/48d7d84976ee5957de90e03ba2314540 to your computer and use it in GitHub Desktop.
Bigquery client to read data from bigquery into spark dataframe
class BigQueryClient(object):
def __init__(self, project_id):
self.project_id = project_id
def _get_conf(self, bucket, dataset_id, table_id):
return {
"fs.gs.project.id": self.project_id,
"mapred.bq.project.id": self.project_id, # default project
"mapred.bq.gcs.bucket": bucket, # gcs bucket holding the temperory path
"mapred.bq.input.project.id": self.project_id,
"mapred.bq.input.dataset.id": dataset_id,
"mapred.bq.input.table.id": table_id,
"mapred.map.tasks": "360", #max number of shards, can be parametrized later for every table
"google.cloud.auth.service.account.enable": "true",
"mapred.bq.input.sharded.export.enable":"false",
"fs.gs.block.size":"268435456",
}
def get_data_frame(self, spark_session, bucket, dataset_id, table_id):
"""
Getting the data frame given the input params
:param sparkSession: given spark session
:param bucket: gcs bucket to hold the table data temporarily
:param dataset_id: input dataset id
:param table_id: table to read
:return: spark dataframe
"""
table_data = spark_session.sparkContext.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=self._get_conf(bucket, dataset_id, table_id))
table_json = table_data.map(lambda x: x[1])
return spark_session.read.json(table_json)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment