Skip to content

Instantly share code, notes, and snippets.

@jheusser
Created October 9, 2016 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jheusser/28621f722f0afa6137e80ef0e0afc276 to your computer and use it in GitHub Desktop.
Save jheusser/28621f722f0afa6137e80ef0e0afc276 to your computer and use it in GitHub Desktop.
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.WireType;
public class RollCycleTest {
Path path;
@Before
public void setUp() throws Exception {
path = Files.createTempDirectory("rollCycleTest");
}
@After
public void tearDown() throws IOException {
Files.walk(path)
.collect(Collectors.toCollection(LinkedList::new))
.descendingIterator()
.forEachRemaining(path -> {try { Files.deleteIfExists(path); } catch (Exception e) {}});
}
class TestTimeProvider implements TimeProvider {
volatile long now = System.currentTimeMillis();
@Override
public long currentTimeMillis() {
return now;
}
public void add(long addInMs) {
now += addInMs;
}
}
class ParallelQueueObserver implements Runnable, StoreFileListener {
SingleChronicleQueue queue;
CountDownLatch progressLatch;
int documentsRead;
public ParallelQueueObserver(TimeProvider timeProvider, Path path) {
queue = SingleChronicleQueueBuilder.binary(path)
.rollCycle(RollCycles.DAILY).timeProvider(timeProvider).storeFileListener(this).wireType(WireType.FIELDLESS_BINARY).build();
documentsRead = 0;
progressLatch = new CountDownLatch(1);
}
@Override
public void run() {
ExcerptTailer tailer = queue.createTailer();
progressLatch.countDown();
while(!Thread.currentThread().isInterrupted()) {
String readText = tailer.readText();
if(readText != null) {
System.out.println("Read a document " + readText);
documentsRead++;
}
}
}
public void await() throws Exception {
progressLatch.await();
}
public int documentsRead() {
return documentsRead;
}
@Override
public void onAcquired(int cycle, File file) {
System.out.println("Acquiring " + file);
}
@Override
public void onReleased(int cycle, File file) {
System.out.println("Releasing " + file);
}
}
@Test
public void newRollCycleIgnored() throws Exception {
TestTimeProvider timeProvider = new TestTimeProvider();
ParallelQueueObserver observer = new ParallelQueueObserver(timeProvider, path);
try(SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path)
.rollCycle(RollCycles.DAILY).timeProvider(timeProvider).wireType(WireType.FIELDLESS_BINARY).build()) {
ExcerptAppender appender = queue.acquireAppender();
// uncomment next line to make the test pass
// appender.writeText("Day 1 data");
Thread thread = new Thread(observer);
thread.start();
observer.await();
// two days pass
timeProvider.add(TimeUnit.DAYS.toMillis(2));
appender.writeText("Day 3 data");
// allow parallel tailer to finish iteration
Thread.sleep(2000);
thread.interrupt();
}
assertEquals(2, observer.documentsRead);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment