Skip to content

Instantly share code, notes, and snippets.

@Denis1990
Last active September 18, 2016 17:50
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save Denis1990/2978c92567efa17060bb618f66d02ce0 to your computer and use it in GitHub Desktop.
KMeans driver code
/**
* Implementation of k-means algorithm using Map Reduce paradigm.
*/
public class KMeans extends Configured implements Tool {
/**
* Number of indexed documents
*/
private int indexedDocuments;
private List<ImmutableTriple<Integer, String, PointVector>> centers;
static {
// load the kmeans configuration file
Configuration.addDefaultResource("conf/kmeans-configuration.xml");
}
public KMeans(int indexedDocuments) {
this.indexedDocuments = indexedDocuments;
}
public static void main(String[] args) {
try {
// 25 is the number of document our index has
System.out.println(ToolRunner.run(new KMeans(25), args));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Path input = new Path(args[0]);
Path output = new Path(args[1]);
Path init = new Path(args[0]);
double error=0;
// initialize the clusters data
// id docTermVector document index
List<ImmutableTriple<Integer, String, PointVector>> centers = new ArrayList<>();
int clusters = Integer.valueOf(conf.get("clusters"));
Random rand = new Random(System.currentTimeMillis());
int [] temp = new int[clusters];
// pick random clusters
for (int i = 0; i < clusters; i++) {
temp[i] = rand.nextInt(indexedDocuments);
}
Arrays.sort(temp);
BufferedReader br = new BufferedReader(new FileReader("/home/denis/docTermVector"));
String line;
int j = 0; // index for the temp table
for (int i = 0; i < indexedDocuments && j < clusters; i++) {
line = br.readLine();
if (Arrays.binarySearch(temp, i) != -1) {
j++;
centers.add(DocumentRecordParser.parse(line));
}
}
br.close();
// write the centers to a file so that they are accesible in the mapper setup method
BufferedWriter bw = new BufferedWriter(new FileWriterWithEncoding("/home/denis/centers", "UTF-8"));
for (ImmutableTriple<Integer, String, PointVector> t : centers) {
bw.write(t.right.toString());
bw.write("\t");
bw.write(String.valueOf(t.left));
bw.write("\t");
bw.write(t.middle);
bw.write("\n");
bw.flush();
}
bw.close();
fs.delete(output, true);
Job job = Job.getInstance(conf);
job.setJobName("kmeans");
job.setJarByClass(KMeans.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setMapperClass(KMeansMapper.class);
job.setReducerClass(KMeansReducer.class);
job.setMapOutputKeyClass(PointVector.class);
job.setMapOutputValueClass(PointVector.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment