Skip to content

Instantly share code, notes, and snippets.

@ramv-dailymotion
Created March 12, 2016 00:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramv-dailymotion/e6458f8e80e7d9b4d7bc to your computer and use it in GitHub Desktop.
Save ramv-dailymotion/e6458f8e80e7d9b4d7bc to your computer and use it in GitHub Desktop.
public static JavaPairRDD<LongWritable, JsonObject> fetchBigQueryRDD(JavaSparkContext jsc,
String projectId,
String fullyQualifiedInputTableId,
String bucket,
int numPartitions,
double sampleSize)
throws Exception {
Configuration hadoopConfiguration = jsc.hadoopConfiguration();
// Set the job-level projectId.
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket);
// Configure input for BigQuery access
BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, fullyQualifiedInputTableId);
LOGGER.debug(hadoopConfiguration.get("fs.gs.system.bucket"));
JavaPairRDD<LongWritable, JsonObject> tableData = jsc.newAPIHadoopRDD(hadoopConfiguration, GsonBigQueryInputFormat.class, LongWritable.class, JsonObject.class);
LOGGER.info("Number of rows in the table {}", tableData.count());
tableData = tableData.repartition(numPartitions).persist(StorageLevel.MEMORY_AND_DISK_SER_2());
return tableData;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment