package com.spnotes.spark;

import com.twitter.chill.Externalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.WritableArrayWritable;
import scala.Tuple2;

/**
 * Created by user on 8/25/14.
 */
public class HelloESInputSpark {

    public static void main(String[] argv){
        System.setProperty("hadoop.home.dir","/usr/local/hadoop");
        SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");

        conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());

        JavaSparkContext sc = new JavaSparkContext(conf);

        Configuration hadoopConfiguration = new Configuration();
        hadoopConfiguration.set("es.nodes","localhost:9200");
        hadoopConfiguration.set("es.resource","hadoop/contact");

        JavaPairRDD<Text,MapWritable> esRDD = sc.newAPIHadoopRDD(hadoopConfiguration, EsInputFormat.class, Text.class, MapWritable.class);
        System.out.println("Count of records founds is " + esRDD.count());

        //This function will get ES record key as first parameter and the ES record as second parameter, it will return {city,1} tuple for each city in the record
        JavaPairRDD<String, Integer> cityCountMap = esRDD.mapToPair(new PairFunction<Tuple2<Text, MapWritable>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Text, MapWritable> currentEntry) throws Exception {
                MapWritable valueMap = currentEntry._2();
                WritableArrayWritable address =(WritableArrayWritable) valueMap.get(new Text("address"));
                MapWritable addressMap = (MapWritable)address.get()[0];
                Text city = (Text)addressMap.get(new Text("city"));
                return new Tuple2<String, Integer>(city.toString(),1);
            }
        });

        //This is reducer which will maintain running count of city vs count
        JavaPairRDD<String, Integer> cityCount = cityCountMap.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer first, Integer second) throws Exception {
                return first + second;
            }
        });

        cityCount.saveAsTextFile("file:///tmp/sparkes");


    }
}