Skip to content

Instantly share code, notes, and snippets.

@tzolov
Last active July 19, 2016 18:36
Show Gist options
  • Save tzolov/68ccc8aaecc42a97e9f6 to your computer and use it in GitHub Desktop.
Save tzolov/68ccc8aaecc42a97e9f6 to your computer and use it in GitHub Desktop.
import com.pivotal.hawq.mapreduce.HAWQInputFormat;
import com.pivotal.tzolov.poc.hif.HAWQInputFormat2;
import com.pivotal.hawq.mapreduce.HAWQRecord;
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil};
// Create separate Hadoop configuration instances for each input table
val hadoopConf1 = SparkHadoopUtil.get.newConfiguration();
val hadoopConf2 = SparkHadoopUtil.get.newConfiguration();
// Retrieve the HAWQ tables metadata and store this metadata the dedicated hadoop configuration.
HAWQInputFormat2.setInput(hadoopConf1, "phd1.localdomain:5432/postgres", "gpadmin", "", "employees");
HAWQInputFormat2.setInput(hadoopConf2, "phd1.localdomain:5432/postgres", "gpadmin", "", "employee_expenditure");
val rdd1 = sc.newAPIHadoopRDD(hadoopConf1, classOf[HAWQInputFormat2], classOf[Void], classOf[HAWQRecord]).map(_._2);
val rdd2 = sc.newAPIHadoopRDD(hadoopConf2, classOf[HAWQInputFormat2], classOf[Void], classOf[HAWQRecord]).map(_._2);
val employees = rdd1.map(p => (p.getInt(1), p.getString(2)));
val employee_expenditure = rdd2.map(p => (p.getInt(1), (p.getInt(2), p.getString(3))));
// Group the employee_expenditure dataset by employee ID (e.g. _._1) and then
// join the result with the employees dataset.
val employee_expenditure_join = employees.join(employee_expenditure.groupBy(_._1))
employee_expenditure_join.collect.foreach(println)
(1,(Botev,ArrayBuffer((1,(20000,Jan)), (1,(15000,Feb)), (1,(20000,Mar)))))
(3,(Rakovski,ArrayBuffer((3,(500,Mar)))))
(2,(Levski,ArrayBuffer((2,(10000,Jan)), (2,(10000,Feb)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment