Created
March 12, 2016 00:09
-
-
Save ramv-dailymotion/e6458f8e80e7d9b4d7bc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static JavaPairRDD<LongWritable, JsonObject> fetchBigQueryRDD(JavaSparkContext jsc, | |
String projectId, | |
String fullyQualifiedInputTableId, | |
String bucket, | |
int numPartitions, | |
double sampleSize) | |
throws Exception { | |
Configuration hadoopConfiguration = jsc.hadoopConfiguration(); | |
// Set the job-level projectId. | |
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); | |
// Use the systemBucket for temporary BigQuery export data used by the InputFormat. | |
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket); | |
// Configure input for BigQuery access | |
BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, fullyQualifiedInputTableId); | |
LOGGER.debug(hadoopConfiguration.get("fs.gs.system.bucket")); | |
JavaPairRDD<LongWritable, JsonObject> tableData = jsc.newAPIHadoopRDD(hadoopConfiguration, GsonBigQueryInputFormat.class, LongWritable.class, JsonObject.class); | |
LOGGER.info("Number of rows in the table {}", tableData.count()); | |
tableData = tableData.repartition(numPartitions).persist(StorageLevel.MEMORY_AND_DISK_SER_2()); | |
return tableData; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment