Created
March 28, 2013 18:21
-
-
Save mumrah/5265594 to your computer and use it in GitHub Desktop.
Java class to index NYTimes data into Solr. Data available here http://archive.ics.uci.edu/ml/datasets/Bag+of+Words. Beware of hard coded paths and urls!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ExecutorService; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.File; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.solr.client.solrj.SolrServer; | |
import org.apache.solr.client.solrj.impl.HttpSolrServer; | |
import org.apache.solr.common.SolrInputDocument; | |
public class Indexer { | |
public static class Payload { | |
public final int docId; | |
public final int wordId; | |
public final int count; | |
public Payload(int docId, int wordId, int count) { | |
this.docId = docId; | |
this.wordId = wordId; | |
this.count = count; | |
} | |
@Override | |
public String toString() { | |
return "Payload[docId=" + docId + ", wordId=" + wordId + ", count=" + count + "]"; | |
} | |
} | |
public static class IndexerThread implements Runnable { | |
private final SolrServer solr; | |
private final AtomicLong id; | |
private final String[] dictionary; | |
private final BlockingQueue<Payload> queue; | |
private final int batchSize = 20000; | |
private final int commitSize = 2000000; | |
private final SolrInputDocument[] docs = new SolrInputDocument[batchSize]; | |
private volatile long i = 0; | |
public IndexerThread(AtomicLong id, BlockingQueue<Payload> queue, String[] dictionary) { | |
this.solr = new HttpSolrServer("http://localhost:8983/solr/nytimes"); | |
this.id = id; | |
this.queue = queue; | |
this.dictionary = dictionary; | |
for(int i=0; i<docs.length; i++) { | |
docs[i] = new SolrInputDocument(); | |
} | |
} | |
@Override | |
public void run() { | |
long _id = 0; | |
while(true) { | |
try { | |
Payload payload = queue.poll(1, TimeUnit.SECONDS); | |
if(payload == null) { | |
break; | |
} | |
// Send and possibly commit a batch | |
if((i != 0) && (i % batchSize == 0)) { | |
System.err.println("add"); | |
solr.add(Arrays.asList(docs)); | |
} | |
if((_id != 0) && (_id % commitSize == 0)) { // only one thread does this | |
System.err.println("commit"); | |
solr.commit(true, false); | |
} | |
SolrInputDocument doc = docs[(int)(i % batchSize)]; | |
doc.clear(); | |
_id = id.getAndIncrement(); | |
System.err.println(_id); | |
doc.setField("id", _id); | |
doc.setField("docId", payload.docId); | |
doc.setField("threadId", Thread.currentThread().getName()); | |
doc.setField("wordId", payload.wordId); | |
doc.setField("word", dictionary[payload.wordId]); | |
doc.setField("count", payload.count); | |
i++; | |
} catch (Exception e) { | |
System.err.println(e); | |
throw new RuntimeException(e); | |
} | |
} | |
try { | |
solr.add(Arrays.asList(docs)); | |
solr.commit(true, false); | |
} catch (Exception e) { | |
System.err.println(e); | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static class ReaderThread implements Runnable { | |
File file; | |
BlockingQueue<Payload> queue; | |
public ReaderThread(File file, BlockingQueue<Payload> queue) { | |
this.file = file; | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
try { | |
BufferedReader reader = new BufferedReader(new FileReader(file)); | |
reader.readLine(); | |
reader.readLine(); | |
reader.readLine(); | |
while (reader.ready()) { | |
String line = reader.readLine(); | |
String[] toks = line.split(" "); | |
Payload p = new Payload(Integer.valueOf(toks[0]), Integer.valueOf(toks[1]), Integer.valueOf(toks[2])); | |
try { | |
queue.put(p); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
reader.close(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
List<String> dictionary = new ArrayList<String>(); | |
BufferedReader reader = new BufferedReader(new FileReader(new File("vocab.nytimes.txt"))); | |
dictionary.add(""); // the dictionary is 1-indexed | |
while(reader.ready()) { | |
String word = reader.readLine(); | |
dictionary.add(word); | |
} | |
BlockingQueue<Payload> queue = new LinkedBlockingQueue<Payload>(100000); | |
AtomicLong id = new AtomicLong(); | |
ExecutorService executor = Executors.newCachedThreadPool(); | |
executor.submit(new ReaderThread(new File("docword.nytimes.txt"), queue)); | |
for(int i=0; i<4; i++) { | |
executor.submit(new IndexerThread(id, queue, dictionary.toArray(new String[]{}))); | |
Thread.sleep(5000); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment