Skip to content

Instantly share code, notes, and snippets.

@keith-turner
Created March 13, 2019 19:23
Show Gist options
  • Save keith-turner/5ef61908f5194a4991ff15d6d658eec9 to your computer and use it in GitHub Desktop.
Save keith-turner/5ef61908f5194a4991ff15d6d658eec9 to your computer and use it in GitHub Desktop.
Accumulo client code to create continuous bulk import load. Created to test changes for apache/accumulo#979
package cmd;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import acbase.CmdUtil;
/**
* Continuous bulk import client. Creates continuous bulk import to a single tablet to test side
* compactions.
*/
public class CBI {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage : " + CBI.class.getName() + " <files> <rows> <delay>");
return;
}
int files = Integer.parseInt(args[0]);
int rows = Integer.parseInt(args[1]);
int delay = Integer.parseInt(args[2]);
Connector conn = CmdUtil.getConnector();
try {
conn.tableOperations().delete("cbisc");
} catch (Exception e) {
System.out.println(e.getMessage());
}
conn.tableOperations().create("cbisc");
conn.tableOperations().setProperty("cbisc", "table.majc.compaction.side-strategy",
"org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy");
conn.tableOperations().setProperty("cbisc", "table.split.threshold", "10G");
Configuration conf = new Configuration(true);
conf.set("dfs.replication", "1");
FileSystem fs = FileSystem.get(new URI("hdfs://localhost:8020"), conf);
try {
fs.delete(new Path("/tmp/cbi"), true);
fs.delete(new Path("/tmp/cbi-fail"), true);
} catch (Exception e) {
System.out.println(e.getMessage());
}
Random random = new Random();
long count = 0;
while (true) {
fs.mkdirs(new Path("/tmp/cbi"));
for (int i = 0; i < files; i++) {
try (RFileWriter writer = RFile.newWriter().to("/tmp/cbi/f" + i + ".rf").withFileSystem(fs)
.build()) {
List<Key> keys = new ArrayList<>();
for (int j = 0; j < rows; j++) {
String row = String.format("%016x", random.nextLong() & 0x7fffffffffffffffL);
keys.add(new Key(row, "f", "q"));
}
Collections.sort(keys);
for (Key key : keys) {
writer.append(key, new Value(count + ""));
count++;
}
}
}
fs.mkdirs(new Path("/tmp/cbi-fail"));
conn.tableOperations().importDirectory("cbisc", "hdfs://localhost:8020/tmp/cbi",
"hdfs://localhost:8020/tmp/cbi-fail", false);
fs.delete(new Path("/tmp/cbi"), true);
fs.delete(new Path("/tmp/cbi-fail"), true);
Thread.sleep(delay);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment