Skip to content

Instantly share code, notes, and snippets.

@minheq
Last active September 25, 2015 11:30
Show Gist options
  • Save minheq/7383fb4c3e9c8ad54a56 to your computer and use it in GitHub Desktop.
Save minheq/7383fb4c3e9c8ad54a56 to your computer and use it in GitHub Desktop.
MapReduce Game 3
import java.util.*;
import java.io.*;
import java.text.*;
import java.lang.*;
import aa.*;
public class CalculateAnimalToys {
/////////////////////////////////////////////////////////
//
// Run Parameters - change these accordingly.
//
/////////////////////////////////////////////////////////
final static int NUMBER_OF_DEGREES = 4;
final static int MAPPER_NUMBER_INCREMENT = 1;
final static boolean VERBOSE = false;
final static DecimalFormat df2 = new DecimalFormat(".##");
//-----------------------------------------------------------------
//------------------ START OF MAIN METHOD ---------------------
//-----------------------------------------------------------------
public static void main(String[] args) {
// MapReduce pair for total price per order
Mapper pricesMapper = new generalMapper("prices");
Mapper countMapper = new generalMapper("counts");
Mapper quantityMapper = new generalMapper("quantity");
Reducer generalReducer = new generalReducer();
StopWatch watch = new StopWatch();
MapReduce mapReducer = new MapReduce();
List<String> animalToysList = readFromCSV();
for(int degree = 1; degree < NUMBER_OF_DEGREES; degree+=MAPPER_NUMBER_INCREMENT) {
int DEGREES_OF_PARALLELISM = degree;
System.out.println("For " + DEGREES_OF_PARALLELISM + " degrees of parallelism");
try {
watch.start();
HashMap<Object, List> animalToysTotalPrice = mapReducer.mapReduce(pricesMapper, generalReducer, animalToysList, DEGREES_OF_PARALLELISM, VERBOSE);
HashMap<Object, List> animalToysTotalCount = mapReducer.mapReduce(countMapper, generalReducer, animalToysList, DEGREES_OF_PARALLELISM, VERBOSE);
HashMap<Object, List> animalToysTotalQuantity = mapReducer.mapReduce(quantityMapper, generalReducer, animalToysList, DEGREES_OF_PARALLELISM, VERBOSE);
// Create a list of keys
Set<Object> keysObjectList = animalToysTotalCount.keySet();
Object[] objectList = keysObjectList.toArray(new Object[keysObjectList.size()]);
ArrayList<String> keysList = new ArrayList<String>();
for(int i = 0; i < objectList.length; i++) {
keysList.add(objectList[i].toString());
}
//
// Average Price per Order
//
Collection<List> priceList = animalToysTotalPrice.values();
Collection<List> countList = animalToysTotalCount.values();
double totalPrice = 0;
double average = 0;
int totalCount = 0;
for (List prices : priceList) {
totalPrice += (double)prices.get(0);
}
for (List count : countList) {
totalCount += ((Double)count.get(0)).intValue();
}
average = totalPrice/totalCount;
System.out.println("--------------------------------------------------------------");
System.out.println("The average price per order is $" + average);
//
// The number of records for each toy and category combination
//
System.out.println("--------------------------------------------------------------");
System.out.println("The number of records for each toy and category combination: ");
for(String toyCategory: keysList) {
int count = ((Double)animalToysTotalCount.get(toyCategory).get(0)).intValue();
System.out.println(toyCategory + ", " + count);
}
//
// The average price per individual toy and category combination.
//
System.out.println("--------------------------------------------------------------");
System.out.println("The average price per individual toy and category combination: ");
for(String toyCategory: keysList) {
int quantity = ((Double)animalToysTotalQuantity.get(toyCategory).get(0)).intValue();
double price = (double)animalToysTotalPrice.get(toyCategory).get(0);
average = price/quantity;
System.out.println(toyCategory + ", $" + df2.format(average));
}
} catch (Exception e) {
e.printStackTrace();
}
saveToDisk(DEGREES_OF_PARALLELISM, watch.getTime());
System.out.println("The combined mapreduce run ended in " + watch.getTime() + "ms");
System.out.println();
watch.reset();
}
}
//---------------------------------------------------------------
//------------------ END OF MAIN METHOD ---------------------
//---------------------------------------------------------------
public static void saveToDisk(int degreesOfParallelism, long time)
{
PrintWriter writer = null;
try{
writer = new PrintWriter(new BufferedWriter(new FileWriter("output.txt", true)));
writer.println(degreesOfParallelism + ", " + time);
writer.flush();
writer.close();
}catch(Exception e){
System.err.println(e.getMessage());
e.printStackTrace();
}
}
public static List<String> readFromCSV() {
BufferedReader br = null;
String line, text = null;
float score = 0;
List<String> animalToysList = new ArrayList<String>();
try {
br = new BufferedReader(new FileReader("./Lions-Tigers-Bears.csv"));
while((line = br.readLine()) != null) {
animalToysList.add(line);
}
br.close();
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
return animalToysList;
}
public static class generalMapper implements Mapper {
private String typeOfData = null;
public generalMapper(String typeOfData) {
this.typeOfData = typeOfData;
}
@Override
public HashMap map(List inputList) {
HashMap<String, Double> mapResult = new HashMap<String, Double>();
List<String> list = inputList;
for(int i = 0; i < list.size(); i++) {
String[] line = list.get(i).split(",");
String toy = line[0].trim();
String category = line[1];
double price = Double.parseDouble(line[2]);
double quantity = Double.parseDouble(line[3]);
String toyCategory = toy+"-"+category;
switch (typeOfData) {
case "prices":
double priceOfOrder = price * (double)quantity;
if(!mapResult.containsKey(toyCategory)) {
mapResult.put(toyCategory, priceOfOrder);
} else {
mapResult.computeIfPresent(toyCategory, (key, value) -> value + priceOfOrder);
}
break;
case "counts":
if(!mapResult.containsKey(toyCategory)) {
mapResult.put(toyCategory, 1.0);
} else {
mapResult.computeIfPresent(toyCategory, (key,value) -> value + 1.0);
}
break;
case "quantity":
if(!mapResult.containsKey(toyCategory)) {
mapResult.put(toyCategory, quantity);
} else {
mapResult.computeIfPresent(toyCategory, (key,value) -> value + quantity);
}
break;
default:
System.out.println("Invalid type of data constructor");
break;
}
}
return mapResult;
}
}
public static class generalReducer implements Reducer {
@Override
public HashMap reduce(Object key, List data) {
HashMap<Object, Double> reducerResults = new HashMap<Object, Double>();
List<Double> listOfData = data;
double sum = 0;
for(int i = 0; i < listOfData.size(); i++) {
sum += listOfData.get(i);
}
reducerResults.put(key, sum);
return reducerResults;
}
}
}
1, 218
2, 46
3, 47
4, 78
5, 78
6, 63
7, 156
8, 63
9, 47
10, 47
11, 78
12, 109
13, 62
14, 62
15, 93
16, 47
17, 94
18, 62
19, 47
20, 63
21, 63
22, 78
23, 63
24, 250
25, 78
26, 78
27, 109
28, 94
29, 78
30, 62
31, 79
32, 63
33, 63
34, 47
35, 63
36, 63
37, 93
38, 62
39, 63
40, 63
41, 63
42, 63
43, 219
44, 62
45, 78
46, 63
47, 47
48, 62
49, 63
50, 78
51, 62
52, 78
53, 78
54, 125
55, 78
56, 78
57, 125
58, 78
59, 63
60, 63
61, 63
62, 78
63, 78
64, 79
65, 63
66, 78
67, 78
68, 78
69, 93
70, 62
71, 62
72, 62
73, 46
74, 62
75, 63
76, 62
77, 47
78, 63
79, 63
80, 94
81, 78
82, 78
83, 62
84, 63
85, 79
86, 78
87, 62
88, 63
89, 47
90, 62
91, 47
92, 62
93, 62
94, 47
95, 47
96, 47
97, 47
98, 47
99, 47
@minheq
Copy link
Author

minheq commented Sep 24, 2015

The variance of runtime for different degree of parallelism is negligible after 2-4 number of mappers. The runtime is limited by number of threads that can run in parallel.

Getting the answers took 4 hours.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment