Created
June 17, 2010 20:24
-
-
Save ceocoder/442718 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2009 T Jake Luciani | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package lucandra; | |
import java.io.IOException; | |
import java.io.StringReader; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import org.apache.cassandra.thrift.Cassandra; | |
import org.apache.cassandra.thrift.ColumnOrSuperColumn; | |
import org.apache.cassandra.thrift.ColumnParent; | |
import org.apache.cassandra.thrift.ColumnPath; | |
import org.apache.cassandra.thrift.ConsistencyLevel; | |
import org.apache.cassandra.thrift.InvalidRequestException; | |
import org.apache.cassandra.thrift.Mutation; | |
import org.apache.cassandra.thrift.NotFoundException; | |
import org.apache.cassandra.thrift.SlicePredicate; | |
import org.apache.cassandra.thrift.SliceRange; | |
import org.apache.cassandra.thrift.TimedOutException; | |
import org.apache.cassandra.thrift.UnavailableException; | |
import org.apache.log4j.Logger; | |
import org.apache.lucene.analysis.Analyzer; | |
import org.apache.lucene.analysis.TokenStream; | |
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute; | |
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute; | |
import org.apache.lucene.analysis.tokenattributes.TermAttribute; | |
import org.apache.lucene.document.Document; | |
import org.apache.lucene.document.Fieldable; | |
import org.apache.lucene.index.CorruptIndexException; | |
import org.apache.lucene.index.Term; | |
import org.apache.lucene.search.Similarity; | |
import org.apache.thrift.TException; | |
public class IndexUpdater { | |
private final String indexName; | |
private final Cassandra.Iface client; | |
private final ColumnPath docAllColumnPath; | |
private boolean autoCommit; | |
private Map<String, Map<String, List<Mutation>>> mutationMap; | |
private List<String> allIndexedTerms; | |
private Similarity similarity = Similarity.getDefault(); // how to | |
// normalize; | |
private static final Logger logger = Logger.getLogger(IndexUpdater.class); | |
public IndexUpdater(String indexName, Cassandra.Iface client) { | |
this.indexName = indexName; | |
this.client = client; | |
autoCommit = true; | |
docAllColumnPath = new ColumnPath(CassandraUtils.docColumnFamily); | |
mutationMap = new HashMap<String, Map<String, List<Mutation>>>(); | |
} | |
@SuppressWarnings("unchecked") | |
private void addDocument(Document doc, Analyzer analyzer, String docId, boolean update) throws CorruptIndexException, IOException { | |
if (!update) | |
allIndexedTerms = new ArrayList<String>(); | |
// check for special field name | |
if (docId == null) | |
docId = doc.get(CassandraUtils.documentIdField); | |
if (docId == null) | |
docId = Long.toHexString((long) (System.nanoTime() + (Math.random() * System.nanoTime()))); | |
int position = 0; | |
for (Fieldable field : (List<Fieldable>) doc.getFields()) { | |
// Indexed field | |
if (field.isIndexed()) { | |
TokenStream tokens = field.tokenStreamValue(); | |
if (tokens == null) { | |
tokens = analyzer.tokenStream(field.name(), new StringReader(field.stringValue())); | |
} | |
// collect term information per field | |
Map<String, Map<String, List<Number>>> allTermInformation = new HashMap<String, Map<String, List<Number>>>(); | |
int lastOffset = 0; | |
if (position > 0) { | |
position += analyzer.getPositionIncrementGap(field.name()); | |
} | |
// Build the termPositions vector for all terms | |
tokens.reset(); // reset the TokenStream to the first token | |
// set up token attributes we are working on | |
// offsets | |
OffsetAttribute offsetAttribute = null; | |
if (field.isStoreOffsetWithTermVector()) | |
offsetAttribute = (OffsetAttribute) tokens.addAttribute(OffsetAttribute.class); | |
// positions | |
PositionIncrementAttribute posIncrAttribute = null; | |
if (field.isStorePositionWithTermVector()) | |
posIncrAttribute = (PositionIncrementAttribute) tokens.addAttribute(PositionIncrementAttribute.class); | |
TermAttribute termAttribute = (TermAttribute) tokens.addAttribute(TermAttribute.class); | |
// store normalizations of field per term per document rather | |
// than per field. | |
// this adds more to write but less to read on other side | |
Integer tokensInField = new Integer(0); | |
while (tokens.incrementToken()) { | |
tokensInField++; | |
String term = CassandraUtils.createColumnName(field.name(), termAttribute.term()); | |
allIndexedTerms.add(term); | |
// fetch all collected information for this term | |
Map<String, List<Number>> termInfo = allTermInformation.get(term); | |
if (termInfo == null) { | |
termInfo = new HashMap<String, List<Number>>(); | |
allTermInformation.put(term, termInfo); | |
} | |
// term frequency | |
{ | |
List<Number> termFrequency = termInfo.get(CassandraUtils.termFrequencyKey); | |
if (termFrequency == null) { | |
termFrequency = new ArrayList<Number>(); | |
termFrequency.add(new Integer(0)); | |
termInfo.put(CassandraUtils.termFrequencyKey, termFrequency); | |
} | |
// increment | |
termFrequency.set(0, termFrequency.get(0).intValue() + 1); | |
} | |
// position vector | |
if (field.isStorePositionWithTermVector()) { | |
position += (posIncrAttribute.getPositionIncrement() - 1); | |
List<Number> positionVector = termInfo.get(CassandraUtils.positionVectorKey); | |
if (positionVector == null) { | |
positionVector = new ArrayList<Number>(); | |
termInfo.put(CassandraUtils.positionVectorKey, positionVector); | |
} | |
positionVector.add(++position); | |
} | |
// term offsets | |
if (field.isStoreOffsetWithTermVector()) { | |
List<Number> offsetVector = termInfo.get(CassandraUtils.offsetVectorKey); | |
if (offsetVector == null) { | |
offsetVector = new ArrayList<Number>(); | |
termInfo.put(CassandraUtils.offsetVectorKey, offsetVector); | |
} | |
offsetVector.add(lastOffset + offsetAttribute.startOffset()); | |
offsetVector.add(lastOffset + offsetAttribute.endOffset()); | |
} | |
} | |
List<Number> bnorm = null; | |
if (!field.getOmitNorms()) { | |
bnorm = new ArrayList<Number>(); | |
float norm = doc.getBoost(); | |
norm *= field.getBoost(); | |
norm *= similarity.lengthNorm(field.name(), tokensInField); | |
bnorm.add(Similarity.encodeNorm(norm)); | |
} | |
for (Map.Entry<String, Map<String, List<Number>>> term : allTermInformation.entrySet()) { | |
// Terms are stored within a unique key combination | |
// This is required since cassandra loads all columns | |
// in a key/column family into memory | |
String key = indexName + CassandraUtils.delimeter + term.getKey(); | |
// Mix in the norm for this field alongside each term | |
// more writes but faster on read side. | |
if (!field.getOmitNorms()) { | |
term.getValue().put(CassandraUtils.normsKey, bnorm); | |
} | |
CassandraUtils | |
.addToMutationMap(mutationMap, CassandraUtils.termVecColumnFamily, docId.getBytes(), CassandraUtils.hashKey(key), null, term.getValue()); | |
} | |
} | |
// Untokenized fields go in without a termPosition | |
if (field.isIndexed() && !field.isTokenized()) { | |
String term = CassandraUtils.createColumnName(field.name(), field.stringValue()); | |
allIndexedTerms.add(term); | |
String key = indexName + CassandraUtils.delimeter + term; | |
Map<String, List<Number>> termMap = new HashMap<String, List<Number>>(); | |
termMap.put(CassandraUtils.positionVectorKey, CassandraUtils.emptyArray); | |
CassandraUtils.addToMutationMap(mutationMap, CassandraUtils.termVecColumnFamily, docId.getBytes(), CassandraUtils.hashKey(key), null, termMap); | |
} | |
// Stores each field as a column under this doc key | |
if (field.isStored()) { | |
byte[] _value = field.isBinary() ? field.getBinaryValue() : field.stringValue().getBytes("UTF-8"); | |
// first byte flags if binary or not | |
byte[] value = new byte[_value.length + 1]; | |
System.arraycopy(_value, 0, value, 0, _value.length); | |
value[value.length - 1] = (byte) (field.isBinary() ? Byte.MAX_VALUE : Byte.MIN_VALUE); | |
String key = indexName + CassandraUtils.delimeter + docId; | |
CassandraUtils.addToMutationMap(mutationMap, CassandraUtils.docColumnFamily, field.name().getBytes(), CassandraUtils.hashKey(key), value, null); | |
} | |
} | |
// Finally, Store meta-data so we can delete this document | |
String key = indexName + CassandraUtils.delimeter + docId; | |
CassandraUtils.addToMutationMap(mutationMap, CassandraUtils.docColumnFamily, CassandraUtils.documentMetaField.getBytes(), CassandraUtils.hashKey(key), | |
CassandraUtils.toBytes(allIndexedTerms), null); | |
if (autoCommit) | |
CassandraUtils.robustBatchInsert(client, mutationMap); | |
} | |
private List<String> deleteDocuments(Term term, Document doc) throws CorruptIndexException, IOException { | |
List<String> docIds = new ArrayList<String>(); | |
try { | |
ColumnParent cp = new ColumnParent(CassandraUtils.termVecColumnFamily); | |
String key = indexName + CassandraUtils.delimeter + CassandraUtils.createColumnName(term); | |
List<ColumnOrSuperColumn> docs = client.get_slice(CassandraUtils.keySpace, CassandraUtils.hashKey(key), cp, new SlicePredicate() | |
.setSlice_range(new SliceRange(new byte[] {}, new byte[] {}, true, Integer.MAX_VALUE)), ConsistencyLevel.ONE); | |
// delete by documentId | |
for (ColumnOrSuperColumn docInfo : docs) { | |
docIds.add(new String(docInfo.getSuper_column().getName())); | |
deleteLucandraDocument(docInfo.getSuper_column().getName(), term, doc); | |
} | |
} catch (InvalidRequestException e) { | |
throw new RuntimeException(e); | |
} catch (UnavailableException e) { | |
throw new RuntimeException(e); | |
} catch (TException e) { | |
throw new RuntimeException(e); | |
} catch (TimedOutException e) { | |
throw new RuntimeException(e); | |
} catch (NotFoundException e) { | |
throw new RuntimeException(e); | |
} catch (ClassNotFoundException e) { | |
throw new RuntimeException(e); | |
} | |
return docIds; | |
} | |
@SuppressWarnings("unchecked") | |
private void deleteLucandraDocument(byte[] docId, Term term, Document doc) throws InvalidRequestException, NotFoundException, UnavailableException, | |
TimedOutException, TException, IOException, ClassNotFoundException { | |
List<String> lstDelTerms = new ArrayList<String>(); | |
for (Fieldable field : (List<Fieldable>) doc.getFields()) { | |
lstDelTerms.add(field.name() + CassandraUtils.delimeter); | |
} | |
String key = indexName + CassandraUtils.delimeter + new String(docId); | |
String delKey; | |
ColumnOrSuperColumn column = client.get(CassandraUtils.keySpace, CassandraUtils.hashKey(key), CassandraUtils.metaColumnPath, ConsistencyLevel.ONE); | |
// String del = indexName + CassandraUtils.delimeter + term.field() + | |
// CassandraUtils.delimeter + term.text(); | |
allIndexedTerms = (List<String>) CassandraUtils.fromBytes(column.column.value); | |
//removing newly added fields from cassandra | |
for (String termStr : allIndexedTerms) { | |
delKey = termStr.substring(0, termStr.indexOf(CassandraUtils.delimeter)) + CassandraUtils.delimeter; | |
key = indexName+CassandraUtils.delimeter+termStr; | |
if (lstDelTerms.contains(delKey)) { | |
CassandraUtils.addToMutationMap(mutationMap, CassandraUtils.termVecColumnFamily, docId, CassandraUtils.hashKey(key), null, null); | |
} | |
} | |
//removing newly added fields from ones already indexed | |
for (String delTerm : allIndexedTerms) { | |
if (lstDelTerms.contains(delTerm.substring(0, delTerm.indexOf(CassandraUtils.delimeter)) + CassandraUtils.delimeter)) | |
lstDelTerms.add(delTerm); | |
} | |
allIndexedTerms.removeAll(lstDelTerms); | |
if (autoCommit) | |
CassandraUtils.robustBatchInsert(client, mutationMap); | |
// finally delete ourselves | |
} | |
public void updateDocument(Term updateTerm, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { | |
for (String docId : deleteDocuments(updateTerm, doc)) { | |
addDocument(doc, analyzer, docId, true); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment