Skip to content

Instantly share code, notes, and snippets.

@pradeepbn
Created November 16, 2021 01:47
Show Gist options
  • Save pradeepbn/65ef387a164953fb3f90057860b12da1 to your computer and use it in GitHub Desktop.
Save pradeepbn/65ef387a164953fb3f90057860b12da1 to your computer and use it in GitHub Desktop.
Test client to reproduce the shutdown sequence issue
import org.apache.bookkeeper.client.*;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class testBKAPI {
public static void main(String [] args) {
try {
String connectionString = "127.0.0.1:2181"; // For a single-node, local ZooKeeper cluster
BookKeeper bkClient = new BookKeeper(connectionString);
LedgerHandle writelh = bkClient.createLedger(1, 1, 1,
BookKeeper.DigestType.CRC32C,
"du".getBytes(StandardCharsets.UTF_8));
// LedgerHandle writelh = bkClient.openLedger(0, BookKeeper.DigestType.CRC32C, "du".getBytes(StandardCharsets.UTF_8));
System.out.println("LedgerID = " + writelh.getId());
// writelh.asyncAddEntry();
long entryId = writelh.addEntry("Check_Check".getBytes(StandardCharsets.UTF_8));
CountDownLatch wDoneSignal = new CountDownLatch(200000);
for (int i = 0; i < 200000; i ++) {
// writelh.addEntry(("Check_Check_1 " + i).getBytes(StandardCharsets.UTF_8));
writelh.asyncAddEntry(("Check_Check_1 " + i).getBytes(StandardCharsets.UTF_8),
(int rc, LedgerHandle lh, long entryIdCb, Object ctx) -> {
((CountDownLatch)ctx).countDown();
}, wDoneSignal );
}
wDoneSignal.await();
System.out.println("Done writing to ledger");
CountDownLatch rDoneSignal = new CountDownLatch(200000);
for (int j = 0; j < 199999; j++) {
// LedgerEntries entries = writelh.read(j, j + 1);
TestReadAsync rCb = new TestReadAsync();
writelh.asyncReadEntries(j, j+1, rCb, rDoneSignal);
// entries.forEach(entry -> {
// System.out.println("Data " + new String(entry.getEntryBytes(), StandardCharsets.UTF_8));
// });
// TimeUnit.SECONDS.sleep(1);
}
System.out.println("Waiting for reads to be done");
rDoneSignal.await();
} catch (InterruptedException | IOException | BKException e) {
e.printStackTrace();
} catch (org.apache.bookkeeper.client.api.BKException e) {
e.printStackTrace();
}
System.out.println("hello world");
}
static class TestReadAsync implements AsyncCallback.ReadCallback {
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
Object ctx) {
while (seq.hasMoreElements()) {
byte[] entry = seq.nextElement().getEntry();
System.out.println("RC code: " + rc);
// System.out.println("Data " + new String(entry, StandardCharsets.UTF_8));
}
((CountDownLatch)ctx).countDown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment