Skip to content

Instantly share code, notes, and snippets.

@mauliksoneji
Created October 28, 2019 06:37
Show Gist options
  • Save mauliksoneji/0a8c12d3c7ecbe2c4794dbd039e03815 to your computer and use it in GitHub Desktop.
Save mauliksoneji/0a8c12d3c7ecbe2c4794dbd039e03815 to your computer and use it in GitHub Desktop.
Client to read gcs data into spark dataframe
class GCSClient(object):
def __init__(self, spark, projectId):
self.spark = spark
self.spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
self.spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
self.spark._jsc.hadoopConfiguration().set("fs.gs.project.id", projectId)
#spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.email", "/hadoop/bq/key.json")
self.spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
def get_json_as_df(self, path):
return self.spark.read.json(path=path)
def get_parquet_as_df(self,path):
if isinstance(path,list):
return self.spark.read.parquet(*path)
else:
return self.spark.read.parquet(path)
def export_to_gcs(self, data_frame,export_dir):
output_path = self.spark._jvm.org.apache.hadoop.fs.Path(export_dir)
output_path.getFileSystem(self.spark._jsc.hadoopConfiguration()).delete(output_path, True)
data_frame.write.format('parquet').save(export_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment