Created
October 28, 2019 06:33
-
-
Save mauliksoneji/48d7d84976ee5957de90e03ba2314540 to your computer and use it in GitHub Desktop.
Bigquery client to read data from bigquery into spark dataframe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BigQueryClient(object): | |
def __init__(self, project_id): | |
self.project_id = project_id | |
def _get_conf(self, bucket, dataset_id, table_id): | |
return { | |
"fs.gs.project.id": self.project_id, | |
"mapred.bq.project.id": self.project_id, # default project | |
"mapred.bq.gcs.bucket": bucket, # gcs bucket holding the temperory path | |
"mapred.bq.input.project.id": self.project_id, | |
"mapred.bq.input.dataset.id": dataset_id, | |
"mapred.bq.input.table.id": table_id, | |
"mapred.map.tasks": "360", #max number of shards, can be parametrized later for every table | |
"google.cloud.auth.service.account.enable": "true", | |
"mapred.bq.input.sharded.export.enable":"false", | |
"fs.gs.block.size":"268435456", | |
} | |
def get_data_frame(self, spark_session, bucket, dataset_id, table_id): | |
""" | |
Getting the data frame given the input params | |
:param sparkSession: given spark session | |
:param bucket: gcs bucket to hold the table data temporarily | |
:param dataset_id: input dataset id | |
:param table_id: table to read | |
:return: spark dataframe | |
""" | |
table_data = spark_session.sparkContext.newAPIHadoopRDD( | |
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", | |
"org.apache.hadoop.io.LongWritable", | |
"com.google.gson.JsonObject", | |
conf=self._get_conf(bucket, dataset_id, table_id)) | |
table_json = table_data.map(lambda x: x[1]) | |
return spark_session.read.json(table_json) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment