Skip to content

Instantly share code, notes, and snippets.

public static JavaPairRDD<LongWritable, JsonObject> fetchBigQueryRDD(JavaSparkContext jsc,
String projectId,
String fullyQualifiedInputTableId,
String bucket,
int numPartitions,
double sampleSize)
throws Exception {
Configuration hadoopConfiguration = jsc.hadoopConfiguration();
16/03/10 00:44:17 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Table 'dailymotion-data:video_catalog.snapshot2016030700' to be exported has 127841354 rows and 40712201343 bytes
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
[Stage 0:> (0 + 30) / 30]
@ramv-dailymotion
ramv-dailymotion / MahoutCoOccurrence.java
Created February 13, 2016 19:18
Calling Mahout Spark SimilarityAnalysis from Java
public static String similarityAnalysis(JavaRDD<DmRating> ratingJavaRDD,
JavaSparkContext javaSparkContext,
String outdir,
String modelName)
throws Exception{
JavaRDD<Tuple2<String, String>> userIdVideoId = ratingJavaRDD.mapPartitions( ratingIterator -> {
ArrayList<Tuple2<String, String>> list = new ArrayList<>();
while(ratingIterator.hasNext()){
DmRating rating = ratingIterator.next();
@ramv-dailymotion
ramv-dailymotion / RetryHttpInitializerWrapper.java
Created January 13, 2016 16:08
RetryInitializerWrapper for Resumable Upload
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.common.base.Preconditions;
@ramv-dailymotion
ramv-dailymotion / ResumableUpload.java
Created January 13, 2016 16:02
Resumable Upload of files to Google Cloud Storage
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener;
import com.google.api.client.http.*;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.storage.Storage;
public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
int numPartitions,
JavaSparkContext javaSparkContext,
String outdir){
JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() {
public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception {
//Integer userId = v1._1();
Tuple2<DmRating, Integer> values = v1._2();
DmRating rating = values._1();
public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating,
JavaPairRDD<Integer, Integer> userIdClusterId,
int numPartitions, String outDir){
/*
convert the JavaRDD<Rating> to JavaPairRDD<Integer,DmRating>
*/
JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() {
public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception {
return new Tuple2<>(dmRating.user(), (DmRating)dmRating);
}