Last active
November 30, 2016 20:35
-
-
Save keith-turner/84e644eb78c9b00c620acb797c64b2b1 to your computer and use it in GitHub Desktop.
Modify http://fluo.apache.org/tour/exercise-1/ part 3 to create inverted index in external table.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ft; | |
import java.util.Optional; | |
import java.util.function.Consumer; | |
import org.apache.accumulo.core.client.Connector; | |
import org.apache.accumulo.core.client.ZooKeeperInstance; | |
import org.apache.accumulo.core.client.security.tokens.PasswordToken; | |
import org.apache.accumulo.core.data.Mutation; | |
import org.apache.fluo.api.client.TransactionBase; | |
import org.apache.fluo.api.config.FluoConfiguration; | |
import org.apache.fluo.api.config.SimpleConfiguration; | |
import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; | |
import org.apache.fluo.recipes.core.export.ExportQueue; | |
import org.apache.fluo.recipes.core.export.SequencedExport; | |
import org.apache.fluo.recipes.test.FluoITHelper; | |
public class ExternalIndex { | |
public static String EXPORT_QUEUE_ID = "eq"; | |
public static class CountChange { | |
private final Long oldCount; | |
private final Long newCount; | |
public CountChange(){ | |
oldCount = null; | |
newCount = null; | |
} | |
public CountChange(Optional<Long> oldCount, Optional<Long> newCount) { | |
this.oldCount = oldCount.orElse(null); | |
this.newCount = newCount.orElse(null); | |
} | |
public Long getOldCount() { | |
return oldCount; | |
} | |
public Long getNewCount() { | |
return newCount; | |
} | |
} | |
public static class SimpleExporter extends AccumuloExporter<String, CountChange> { | |
@Override | |
protected void translate(SequencedExport<String, CountChange> export, | |
Consumer<Mutation> consumer) { | |
CountChange change = export.getValue(); | |
String word = export.getKey(); | |
//use row of <count>:<word> | |
//TODO create a mutation that deletes old count (use seq for timestamp) | |
//TODO create a mutation that inserts new count (use seq for timestamp) | |
} | |
} | |
/** | |
* Create the external export table and configure an export queue to write to it. | |
*/ | |
public static void setup(FluoConfiguration config) { | |
String instance = config.getAccumuloInstance(); | |
String zookeepers = config.getAccumuloZookeepers(); | |
String user = config.getAccumuloUser(); | |
String pass = config.getAccumuloPassword(); | |
String table = "queryTable"; | |
try { | |
ZooKeeperInstance zk = new ZooKeeperInstance(instance, zookeepers); | |
Connector conn = zk.getConnector(user, new PasswordToken(pass)); | |
conn.tableOperations().create(table); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
// Create config for export table. | |
AccumuloExporter.Configuration exportTableCfg = | |
new AccumuloExporter.Configuration(instance, zookeepers, user, pass, table); | |
// Create config for export queue. | |
ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class, | |
String.class, CountChange.class, 3).setExporterConfiguration(exportTableCfg); | |
// Configure export queue. This will modify fluoConfig. | |
ExportQueue.configure(config, eqOpts); | |
} | |
public static void printTable(FluoConfiguration config){ | |
String table = "queryTable"; | |
try { | |
ZooKeeperInstance zk = new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers()); | |
Connector conn = zk.getConnector(config.getAccumuloUser(), new PasswordToken(config.getAccumuloPassword())); | |
FluoITHelper.printAccumuloTable(conn, table); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private ExportQueue<String, CountChange> exportQueue; | |
ExternalIndex(SimpleConfiguration appConfig) { | |
exportQueue = ExportQueue.getInstance(EXPORT_QUEUE_ID, appConfig); | |
} | |
public void exportTo(TransactionBase tx, String word, Optional<Long> oldCount, | |
Optional<Long> newCount) { | |
exportQueue.add(tx, word, new CountChange(oldCount, newCount)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws Exception { | |
String tmpDir = Files.createTempDirectory(Paths.get("target"), "mini").toString(); | |
// System.out.println("tmp dir : "+tmpDir); | |
FluoConfiguration fluoConfig = new FluoConfiguration(); | |
//must start mini accumulo before mini fluo so we can create export table | |
System.out.print("Starting MiniAccumulo ... "); | |
MiniAccumuloCluster mac = new MiniAccumuloCluster(new MiniAccumuloConfig(new File(tmpDir), "secret")); | |
mac.start(); | |
AccumuloExportITBase.configureFromMAC(fluoConfig, mac); | |
System.out.println("started."); | |
fluoConfig.setApplicationName("ftour"); | |
fluoConfig.setAccumuloTable("ftour"); | |
preInit(fluoConfig); | |
//when MiniFluo does not create MiniAccumulo, then MiniFluo does not initialize | |
FluoFactory.newAdmin(fluoConfig).initialize( | |
new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true)); | |
System.out.print("Starting MiniFluo ... "); | |
try (MiniFluo mini = FluoFactory.newMiniFluo(fluoConfig); | |
FluoClient client = FluoFactory.newClient(mini.getClientConfiguration())) { | |
System.out.println("started."); | |
excercise(mini, client); | |
//TODO print after each load | |
ExternalIndex.printTable(fluoConfig); | |
}finally{ | |
mac.stop(); | |
} | |
} | |
static void preInit(FluoConfiguration fluoConfig) { | |
fluoConfig.addObserver(new ObserverSpecification(ContentObserver.class.getName())); | |
WordCounter.configure(fluoConfig, 3); | |
ExternalIndex.setup(fluoConfig); | |
//TODO setup KryoFactory | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class WordCounter { | |
//change WordObserver to look like the following inorder to export count changes | |
public static class WordObserver extends UpdateObserver<String, Long> { | |
private ExternalIndex extIdx; | |
public void init(String mapId, Context observerContext) { | |
extIdx = new ExternalIndex(observerContext.getAppConfiguration()); | |
} | |
@Override | |
public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) { | |
while(updates.hasNext()) { | |
Update<String, Long> u = updates.next(); | |
extIdx.exportTo(tx, u.getKey(), u.getOldValue(), u.getNewValue()); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment