Last active
June 27, 2017 20:47
-
-
Save keith-turner/57e124c715c2542242f11eda85b3128c to your computer and use it in GitHub Desktop.
A solution to excercise 1 of the Fluo Tour http://fluo.apache.org/tour/exercise-1/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ft; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import com.google.common.collect.Collections2; | |
import org.apache.fluo.api.client.TransactionBase; | |
import org.apache.fluo.api.data.Column; | |
import org.apache.fluo.api.data.RowColumn; | |
import org.apache.fluo.api.observer.StringObserver; | |
import static ft.DocLoader.CONTENT_COL; | |
import static ft.DocLoader.REF_COUNT_COL; | |
import static ft.DocLoader.REF_STATUS_COL; | |
public class ContentObserver implements StringObserver { | |
public static final Column PROCESSED_COL = new Column("doc", "processed"); | |
public static final Column WORD_COUNT = new Column("word", "docCount"); | |
/** | |
* Utility method to tokenize the content of a document into unique words. | |
*/ | |
private List<String> tokenize(String content) { | |
return Arrays.asList(content.split("[\\W]+")); | |
} | |
/** | |
* Adds the passed to delta to the values for each word. | |
*/ | |
private void adjustCounts(TransactionBase tx, int delta, List<String> words) { | |
Set<String> uniqueWords = new HashSet<String>(words); | |
Collection<RowColumn> wordKeys = | |
Collections2.transform(uniqueWords, s -> new RowColumn("w:" + s, WORD_COUNT)); | |
Map<RowColumn, String> currentCounts = tx.gets(wordKeys); | |
for (RowColumn wordKey : wordKeys) { | |
int count = Integer.parseInt(currentCounts.getOrDefault(wordKey, "0")) + delta; | |
if (count == 0) | |
tx.delete(wordKey.getRow(), WORD_COUNT); | |
else | |
tx.set(wordKey.getsRow(), WORD_COUNT, count + ""); | |
} | |
} | |
@Override | |
public void process(TransactionBase tx, String row, Column col) throws Exception { | |
Map<Column, String> colVals = tx.gets(row, CONTENT_COL, REF_STATUS_COL, PROCESSED_COL); | |
String content = colVals.get(CONTENT_COL); | |
String status = colVals.get(REF_STATUS_COL); | |
String processed = colVals.getOrDefault(PROCESSED_COL, "false"); | |
if (status.equals("referenced") && processed.equals("false")) { | |
adjustCounts(tx, +1, tokenize(content)); | |
tx.set(row, PROCESSED_COL, "true"); | |
} | |
if (status.equals("unreferenced")) { | |
for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, REF_COUNT_COL, REF_STATUS_COL}) | |
tx.delete(row, c); | |
if (processed.equals("true")) { | |
adjustCounts(tx, -1, tokenize(content)); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ft; | |
import org.apache.fluo.api.client.Loader; | |
import org.apache.fluo.api.client.TransactionBase; | |
import org.apache.fluo.api.data.Column; | |
public class DocLoader implements Loader { | |
private final Document doc; | |
public static final Column HASH_COL = new Column("uri", "hash"); | |
public static final Column REF_COUNT_COL = new Column("doc", "refc"); | |
public static final Column REF_STATUS_COL = new Column("doc", "refs"); | |
public static final Column CONTENT_COL = new Column("doc", "content"); | |
public DocLoader(Document doc) { | |
this.doc = doc; | |
} | |
@Override | |
public void load(TransactionBase tx, Context context) throws Exception { | |
String newHash = doc.hash(); | |
String oldHash = tx.gets("u:" + doc.uri, HASH_COL); | |
if(newHash.equals(oldHash)) | |
return; | |
tx.set("u:"+doc.uri, HASH_COL, newHash); | |
if (oldHash != null) { | |
int orc = Integer.parseInt(tx.gets("d:"+oldHash, REF_COUNT_COL)) - 1; | |
if(orc == 0) | |
tx.set("d:"+oldHash, REF_STATUS_COL, "unreferenced"); | |
tx.set("d:"+oldHash, REF_COUNT_COL, orc+""); | |
} | |
int nrc = Integer.parseInt(tx.gets("d:"+newHash, REF_COUNT_COL, "0")) + 1; | |
if(nrc == 1){ | |
tx.set("d:"+newHash, REF_STATUS_COL, "referenced"); | |
tx.set("d:"+newHash, CONTENT_COL, doc.content); | |
} | |
tx.set("d:"+newHash, REF_COUNT_COL, nrc+""); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment