Skip to content

Instantly share code, notes, and snippets.

@keith-turner
Last active November 30, 2016 20:35
Show Gist options
  • Save keith-turner/84e644eb78c9b00c620acb797c64b2b1 to your computer and use it in GitHub Desktop.
Save keith-turner/84e644eb78c9b00c620acb797c64b2b1 to your computer and use it in GitHub Desktop.
Modify http://fluo.apache.org/tour/exercise-1/ part 3 to create inverted index in external table.
package ft;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
import org.apache.fluo.recipes.core.export.ExportQueue;
import org.apache.fluo.recipes.core.export.SequencedExport;
import org.apache.fluo.recipes.test.FluoITHelper;
public class ExternalIndex {
public static String EXPORT_QUEUE_ID = "eq";
public static class CountChange {
private final Long oldCount;
private final Long newCount;
public CountChange(){
oldCount = null;
newCount = null;
}
public CountChange(Optional<Long> oldCount, Optional<Long> newCount) {
this.oldCount = oldCount.orElse(null);
this.newCount = newCount.orElse(null);
}
public Long getOldCount() {
return oldCount;
}
public Long getNewCount() {
return newCount;
}
}
public static class SimpleExporter extends AccumuloExporter<String, CountChange> {
@Override
protected void translate(SequencedExport<String, CountChange> export,
Consumer<Mutation> consumer) {
CountChange change = export.getValue();
String word = export.getKey();
//use row of <count>:<word>
//TODO create a mutation that deletes old count (use seq for timestamp)
//TODO create a mutation that inserts new count (use seq for timestamp)
}
}
/**
* Create the external export table and configure an export queue to write to it.
*/
public static void setup(FluoConfiguration config) {
String instance = config.getAccumuloInstance();
String zookeepers = config.getAccumuloZookeepers();
String user = config.getAccumuloUser();
String pass = config.getAccumuloPassword();
String table = "queryTable";
try {
ZooKeeperInstance zk = new ZooKeeperInstance(instance, zookeepers);
Connector conn = zk.getConnector(user, new PasswordToken(pass));
conn.tableOperations().create(table);
} catch (Exception e) {
throw new RuntimeException(e);
}
// Create config for export table.
AccumuloExporter.Configuration exportTableCfg =
new AccumuloExporter.Configuration(instance, zookeepers, user, pass, table);
// Create config for export queue.
ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class,
String.class, CountChange.class, 3).setExporterConfiguration(exportTableCfg);
// Configure export queue. This will modify fluoConfig.
ExportQueue.configure(config, eqOpts);
}
public static void printTable(FluoConfiguration config){
String table = "queryTable";
try {
ZooKeeperInstance zk = new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers());
Connector conn = zk.getConnector(config.getAccumuloUser(), new PasswordToken(config.getAccumuloPassword()));
FluoITHelper.printAccumuloTable(conn, table);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private ExportQueue<String, CountChange> exportQueue;
ExternalIndex(SimpleConfiguration appConfig) {
exportQueue = ExportQueue.getInstance(EXPORT_QUEUE_ID, appConfig);
}
public void exportTo(TransactionBase tx, String word, Optional<Long> oldCount,
Optional<Long> newCount) {
exportQueue.add(tx, word, new CountChange(oldCount, newCount));
}
}
public static void main(String[] args) throws Exception {
String tmpDir = Files.createTempDirectory(Paths.get("target"), "mini").toString();
// System.out.println("tmp dir : "+tmpDir);
FluoConfiguration fluoConfig = new FluoConfiguration();
//must start mini accumulo before mini fluo so we can create export table
System.out.print("Starting MiniAccumulo ... ");
MiniAccumuloCluster mac = new MiniAccumuloCluster(new MiniAccumuloConfig(new File(tmpDir), "secret"));
mac.start();
AccumuloExportITBase.configureFromMAC(fluoConfig, mac);
System.out.println("started.");
fluoConfig.setApplicationName("ftour");
fluoConfig.setAccumuloTable("ftour");
preInit(fluoConfig);
//when MiniFluo does not create MiniAccumulo, then MiniFluo does not initialize
FluoFactory.newAdmin(fluoConfig).initialize(
new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true));
System.out.print("Starting MiniFluo ... ");
try (MiniFluo mini = FluoFactory.newMiniFluo(fluoConfig);
FluoClient client = FluoFactory.newClient(mini.getClientConfiguration())) {
System.out.println("started.");
excercise(mini, client);
//TODO print after each load
ExternalIndex.printTable(fluoConfig);
}finally{
mac.stop();
}
}
static void preInit(FluoConfiguration fluoConfig) {
fluoConfig.addObserver(new ObserverSpecification(ContentObserver.class.getName()));
WordCounter.configure(fluoConfig, 3);
ExternalIndex.setup(fluoConfig);
//TODO setup KryoFactory
}
public class WordCounter {
//change WordObserver to look like the following inorder to export count changes
public static class WordObserver extends UpdateObserver<String, Long> {
private ExternalIndex extIdx;
public void init(String mapId, Context observerContext) {
extIdx = new ExternalIndex(observerContext.getAppConfiguration());
}
@Override
public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
while(updates.hasNext()) {
Update<String, Long> u = updates.next();
extIdx.exportTo(tx, u.getKey(), u.getOldValue(), u.getNewValue());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment