Skip to content

Instantly share code, notes, and snippets.

@keith-turner
Last active June 27, 2017 20:47
Show Gist options
  • Save keith-turner/57e124c715c2542242f11eda85b3128c to your computer and use it in GitHub Desktop.
Save keith-turner/57e124c715c2542242f11eda85b3128c to your computer and use it in GitHub Desktop.
A solution to excercise 1 of the Fluo Tour http://fluo.apache.org/tour/exercise-1/
package ft;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Collections2;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.observer.StringObserver;
import static ft.DocLoader.CONTENT_COL;
import static ft.DocLoader.REF_COUNT_COL;
import static ft.DocLoader.REF_STATUS_COL;
public class ContentObserver implements StringObserver {
public static final Column PROCESSED_COL = new Column("doc", "processed");
public static final Column WORD_COUNT = new Column("word", "docCount");
/**
* Utility method to tokenize the content of a document into unique words.
*/
private List<String> tokenize(String content) {
return Arrays.asList(content.split("[\\W]+"));
}
/**
* Adds the passed to delta to the values for each word.
*/
private void adjustCounts(TransactionBase tx, int delta, List<String> words) {
Set<String> uniqueWords = new HashSet<String>(words);
Collection<RowColumn> wordKeys =
Collections2.transform(uniqueWords, s -> new RowColumn("w:" + s, WORD_COUNT));
Map<RowColumn, String> currentCounts = tx.gets(wordKeys);
for (RowColumn wordKey : wordKeys) {
int count = Integer.parseInt(currentCounts.getOrDefault(wordKey, "0")) + delta;
if (count == 0)
tx.delete(wordKey.getRow(), WORD_COUNT);
else
tx.set(wordKey.getsRow(), WORD_COUNT, count + "");
}
}
@Override
public void process(TransactionBase tx, String row, Column col) throws Exception {
Map<Column, String> colVals = tx.gets(row, CONTENT_COL, REF_STATUS_COL, PROCESSED_COL);
String content = colVals.get(CONTENT_COL);
String status = colVals.get(REF_STATUS_COL);
String processed = colVals.getOrDefault(PROCESSED_COL, "false");
if (status.equals("referenced") && processed.equals("false")) {
adjustCounts(tx, +1, tokenize(content));
tx.set(row, PROCESSED_COL, "true");
}
if (status.equals("unreferenced")) {
for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, REF_COUNT_COL, REF_STATUS_COL})
tx.delete(row, c);
if (processed.equals("true")) {
adjustCounts(tx, -1, tokenize(content));
}
}
}
}
package ft;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
public class DocLoader implements Loader {
private final Document doc;
public static final Column HASH_COL = new Column("uri", "hash");
public static final Column REF_COUNT_COL = new Column("doc", "refc");
public static final Column REF_STATUS_COL = new Column("doc", "refs");
public static final Column CONTENT_COL = new Column("doc", "content");
public DocLoader(Document doc) {
this.doc = doc;
}
@Override
public void load(TransactionBase tx, Context context) throws Exception {
String newHash = doc.hash();
String oldHash = tx.gets("u:" + doc.uri, HASH_COL);
if(newHash.equals(oldHash))
return;
tx.set("u:"+doc.uri, HASH_COL, newHash);
if (oldHash != null) {
int orc = Integer.parseInt(tx.gets("d:"+oldHash, REF_COUNT_COL)) - 1;
if(orc == 0)
tx.set("d:"+oldHash, REF_STATUS_COL, "unreferenced");
tx.set("d:"+oldHash, REF_COUNT_COL, orc+"");
}
int nrc = Integer.parseInt(tx.gets("d:"+newHash, REF_COUNT_COL, "0")) + 1;
if(nrc == 1){
tx.set("d:"+newHash, REF_STATUS_COL, "referenced");
tx.set("d:"+newHash, CONTENT_COL, doc.content);
}
tx.set("d:"+newHash, REF_COUNT_COL, nrc+"");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment