Skip to content

Instantly share code, notes, and snippets.

@mumrah
Created March 28, 2013 18:21
Embed
What would you like to do?
Java class to index NYTimes data into Solr. Data available here http://archive.ics.uci.edu/ml/datasets/Bag+of+Words. Beware of hard coded paths and urls!
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class Indexer {
public static class Payload {
public final int docId;
public final int wordId;
public final int count;
public Payload(int docId, int wordId, int count) {
this.docId = docId;
this.wordId = wordId;
this.count = count;
}
@Override
public String toString() {
return "Payload[docId=" + docId + ", wordId=" + wordId + ", count=" + count + "]";
}
}
public static class IndexerThread implements Runnable {
private final SolrServer solr;
private final AtomicLong id;
private final String[] dictionary;
private final BlockingQueue<Payload> queue;
private final int batchSize = 20000;
private final int commitSize = 2000000;
private final SolrInputDocument[] docs = new SolrInputDocument[batchSize];
private volatile long i = 0;
public IndexerThread(AtomicLong id, BlockingQueue<Payload> queue, String[] dictionary) {
this.solr = new HttpSolrServer("http://localhost:8983/solr/nytimes");
this.id = id;
this.queue = queue;
this.dictionary = dictionary;
for(int i=0; i<docs.length; i++) {
docs[i] = new SolrInputDocument();
}
}
@Override
public void run() {
long _id = 0;
while(true) {
try {
Payload payload = queue.poll(1, TimeUnit.SECONDS);
if(payload == null) {
break;
}
// Send and possibly commit a batch
if((i != 0) && (i % batchSize == 0)) {
System.err.println("add");
solr.add(Arrays.asList(docs));
}
if((_id != 0) && (_id % commitSize == 0)) { // only one thread does this
System.err.println("commit");
solr.commit(true, false);
}
SolrInputDocument doc = docs[(int)(i % batchSize)];
doc.clear();
_id = id.getAndIncrement();
System.err.println(_id);
doc.setField("id", _id);
doc.setField("docId", payload.docId);
doc.setField("threadId", Thread.currentThread().getName());
doc.setField("wordId", payload.wordId);
doc.setField("word", dictionary[payload.wordId]);
doc.setField("count", payload.count);
i++;
} catch (Exception e) {
System.err.println(e);
throw new RuntimeException(e);
}
}
try {
solr.add(Arrays.asList(docs));
solr.commit(true, false);
} catch (Exception e) {
System.err.println(e);
throw new RuntimeException(e);
}
}
}
public static class ReaderThread implements Runnable {
File file;
BlockingQueue<Payload> queue;
public ReaderThread(File file, BlockingQueue<Payload> queue) {
this.file = file;
this.queue = queue;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(file));
reader.readLine();
reader.readLine();
reader.readLine();
while (reader.ready()) {
String line = reader.readLine();
String[] toks = line.split(" ");
Payload p = new Payload(Integer.valueOf(toks[0]), Integer.valueOf(toks[1]), Integer.valueOf(toks[2]));
try {
queue.put(p);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
reader.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws Exception {
List<String> dictionary = new ArrayList<String>();
BufferedReader reader = new BufferedReader(new FileReader(new File("vocab.nytimes.txt")));
dictionary.add(""); // the dictionary is 1-indexed
while(reader.ready()) {
String word = reader.readLine();
dictionary.add(word);
}
BlockingQueue<Payload> queue = new LinkedBlockingQueue<Payload>(100000);
AtomicLong id = new AtomicLong();
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new ReaderThread(new File("docword.nytimes.txt"), queue));
for(int i=0; i<4; i++) {
executor.submit(new IndexerThread(id, queue, dictionary.toArray(new String[]{})));
Thread.sleep(5000);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment