Created
March 13, 2019 19:23
-
-
Save keith-turner/5ef61908f5194a4991ff15d6d658eec9 to your computer and use it in GitHub Desktop.
Accumulo client code to create continuous bulk import load. Created to test changes for apache/accumulo#979
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cmd; | |
import java.net.URI; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Random; | |
import org.apache.accumulo.core.client.Connector; | |
import org.apache.accumulo.core.client.rfile.RFile; | |
import org.apache.accumulo.core.client.rfile.RFileWriter; | |
import org.apache.accumulo.core.data.Key; | |
import org.apache.accumulo.core.data.Value; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import acbase.CmdUtil; | |
/** | |
* Continuous bulk import client. Creates continuous bulk import to a single tablet to test side | |
* compactions. | |
*/ | |
public class CBI { | |
public static void main(String[] args) throws Exception { | |
if (args.length != 3) { | |
System.err.println("Usage : " + CBI.class.getName() + " <files> <rows> <delay>"); | |
return; | |
} | |
int files = Integer.parseInt(args[0]); | |
int rows = Integer.parseInt(args[1]); | |
int delay = Integer.parseInt(args[2]); | |
Connector conn = CmdUtil.getConnector(); | |
try { | |
conn.tableOperations().delete("cbisc"); | |
} catch (Exception e) { | |
System.out.println(e.getMessage()); | |
} | |
conn.tableOperations().create("cbisc"); | |
conn.tableOperations().setProperty("cbisc", "table.majc.compaction.side-strategy", | |
"org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy"); | |
conn.tableOperations().setProperty("cbisc", "table.split.threshold", "10G"); | |
Configuration conf = new Configuration(true); | |
conf.set("dfs.replication", "1"); | |
FileSystem fs = FileSystem.get(new URI("hdfs://localhost:8020"), conf); | |
try { | |
fs.delete(new Path("/tmp/cbi"), true); | |
fs.delete(new Path("/tmp/cbi-fail"), true); | |
} catch (Exception e) { | |
System.out.println(e.getMessage()); | |
} | |
Random random = new Random(); | |
long count = 0; | |
while (true) { | |
fs.mkdirs(new Path("/tmp/cbi")); | |
for (int i = 0; i < files; i++) { | |
try (RFileWriter writer = RFile.newWriter().to("/tmp/cbi/f" + i + ".rf").withFileSystem(fs) | |
.build()) { | |
List<Key> keys = new ArrayList<>(); | |
for (int j = 0; j < rows; j++) { | |
String row = String.format("%016x", random.nextLong() & 0x7fffffffffffffffL); | |
keys.add(new Key(row, "f", "q")); | |
} | |
Collections.sort(keys); | |
for (Key key : keys) { | |
writer.append(key, new Value(count + "")); | |
count++; | |
} | |
} | |
} | |
fs.mkdirs(new Path("/tmp/cbi-fail")); | |
conn.tableOperations().importDirectory("cbisc", "hdfs://localhost:8020/tmp/cbi", | |
"hdfs://localhost:8020/tmp/cbi-fail", false); | |
fs.delete(new Path("/tmp/cbi"), true); | |
fs.delete(new Path("/tmp/cbi-fail"), true); | |
Thread.sleep(delay); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment