Skip to content

Instantly share code, notes, and snippets.

@keith-turner
Last active December 30, 2022 20:39
Show Gist options
  • Save keith-turner/f2159111b025e600a6e0abbaba1d92f3 to your computer and use it in GitHub Desktop.
Save keith-turner/f2159111b025e600a6e0abbaba1d92f3 to your computer and use it in GitHub Desktop.
Scan server test programs

Testing Accumulo offline scans

Wrote some test programs to excercise offline scans in Accumulo. These programs are expected to run in seperate processes, this is to prevent table operations from clearing the client side tablet cache used by scans.

# start some scan servers
accumulo WriteRead accumulo-client.properties &> writeread.log &
accumulo ModifyTable accumulo-client.properties &> modifytable.log &

Running the above will create one process that continually reads a table using scanner and batchscanner and validates the results. The other process will randomly offline, online, split, merge, and compact the table.

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
public class ModifyTable {
private static final Logger log = LoggerFactory.getLogger(ModifyTable.class);
public static void main(String[] args) throws Exception {
try(var client = Accumulo.newClient().from(args[0]).build()){
run(client);
}
}
public static void run(AccumuloClient client) throws Exception {
Random rand = new Random();
while(true){
var p = rand.nextDouble();
if(p < .05){
switchTableState(client);
} else if (p < .3666) {
splitTable(client, rand);
} else if(p < .6833){
mergeTable(client, rand);
} else {
compactTable(client);
}
Thread.sleep(5000);
}
}
private static void compactTable(AccumuloClient client) throws Exception {
boolean takeOffline = false;
if(!client.tableOperations().isOnline("testscan")){
client.tableOperations().online("testscan",true);
takeOffline=true;
}
client.tableOperations().compact("testscan", new CompactionConfig().setWait(true));
log.info("Compacted table");
if(takeOffline){
client.tableOperations().offline("testscan",true);
}
}
private static void mergeTable(AccumuloClient client, Random rand) throws Exception {
var splits = new TreeSet<>(client.tableOperations().listSplits("testscan"));
if(splits.isEmpty()) {
log.info("Table has not splits, so not merging");
return;
}
boolean takeOffline = false;
if(!client.tableOperations().isOnline("testscan")){
client.tableOperations().online("testscan",true);
takeOffline=true;
}
int d = rand.nextInt(4)+1;
//remove a random subset of splits
splits.removeIf(split -> split.hashCode() % d == 0);
if(splits.size() <= 1) {
client.tableOperations().merge("testscan",null,null);
log.info("Merged entire table");
} else {
var begin = splits.first();
var end = splits.last();
client.tableOperations().merge("testscan",begin,end);
log.info("Merged {} to {}",begin,end);
}
if(takeOffline){
client.tableOperations().offline("testscan",true);
}
}
private static void splitTable(AccumuloClient client, Random rand) throws Exception {
boolean takeOffline = false;
if(!client.tableOperations().isOnline("testscan")){
client.tableOperations().online("testscan",true);
takeOffline=true;
}
int numToAdd = rand.nextInt(5)+1;
SortedSet<Text> splits = new TreeSet<>();
for(int i = 0; i<numToAdd;i++){
splits.add(new Text(String.format("%09d",rand.nextInt(1000000))));
}
client.tableOperations().addSplits("testscan", splits);
log.info("Added splits : {}",splits);
if(takeOffline){
client.tableOperations().offline("testscan",true);
}
}
private static void switchTableState(AccumuloClient client) throws Exception {
if(client.tableOperations().isOnline("testscan")){
client.tableOperations().offline("testscan",true);
log.info("Took table offline");
}else{
client.tableOperations().online("testscan",true);
log.info("Brought table online");
}
}
}
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class WriteRead {
private static final Logger log = LoggerFactory.getLogger(WriteRead.class);
public static void main(String[] args) throws Exception {
try(var client = Accumulo.newClient().from(args[0]).build()){
run(client);
}
}
public static void run(AccumuloClient client) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException, InterruptedException {
if(client.tableOperations().exists("testscan")) {
client.tableOperations().delete("testscan");
}
client.tableOperations().create("testscan");
try(var writer = client.createBatchWriter("testscan")) {
for(int i = 0; i < 1000000; i++) {
Mutation m = new Mutation(String.format("%09d",i));
m.put("f","q", i+"");
writer.addMutation(m);
}
}
client.tableOperations().flush("testscan",null,null,true);
while(true) {
long t1 = System.currentTimeMillis();
try (var scanner = client.createScanner("testscan")) {
scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
scanner.setRange(new Range());
validate(scanner);
}
long t2 = System.currentTimeMillis();
try (var scanner = client.createBatchScanner("testscan")) {
scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
scanner.setRanges(List.of(new Range()));
validate(scanner);
}
long t3 = System.currentTimeMillis();
log.info(String.format("Finished scans %.2f %.2f",(t2-t1)/1000.0,(t3-t2)/1000.0));
Thread.sleep(100);
}
}
private static void validate(ScannerBase scanner) {
Set<Integer> seen = new HashSet<>();
for (Map.Entry<Key, Value> e : scanner) {
int r = Integer.parseInt(e.getKey().getRowData().toString());
int v = Integer.parseInt(e.getValue().toString());
if(r!=v){
throw new RuntimeException("row != value"+r+" "+v);
}
if(r < 0 || r >= 1000000) {
throw new RuntimeException("unexpected value "+r);
}
if(!seen.add(r)){
throw new RuntimeException("duplicate seen "+r);
}
//TODO could do order check for scanner only
}
if(seen.size() != 1000000) {
throw new RuntimeException("unexpected size "+seen.size());
}
for(int i = 0; i < 1000000; i++) {
if(!seen.contains(i)) {
throw new RuntimeException("missing "+i);
}
}
}
}
@DomGarguilo
Copy link

Would this be useful if added to the accumulo-testing repo? It could potentially serve as a stress test similar to continuous ingest maybe.

@keith-turner
Copy link
Author

@DomGarguilo I think this test would be useful to add to accumulo testing, but I was not sure how to add it. It kinda close to random walk test, but has more strict requirements (run only two processes w/ each program in a process) that would not be supported out of the box by random walk.

It could potentially serve as a stress test similar to continuous ingest maybe.

Maybe it would be useful as its own top level test, I had not thought of that. I was trying to think of how to jam it in as a random walk test.

If we move forward with making more table operations like split, merge, compact support offline tables, then this would be a handy test to stress that functionality. We could probably come up with new random walk test to stress those new features also.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment