Skip to content

Instantly share code, notes, and snippets.

@Bill
Last active July 13, 2019 17:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bill/7f424cc64e250043632feae3c5b75cf7 to your computer and use it in GitHub Desktop.
Save Bill/7f424cc64e250043632feae3c5b75cf7 to your computer and use it in GitHub Desktop.
A non-thread-safe ring buffer thingie
package com.thoughtpropulsion.reactrode;
import static com.thoughtpropulsion.reactrode.Functional.returning;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.vavr.Tuple2;
import io.vavr.control.Option;
/**
* A ring-buffer for {@link GameStateWithBackpressure}
*
* An instance of this class may be accessed by only one thread at a time, but may be accessed
* by more than one thread over its lifetime.
*
* @param <T>
*/
public class RingBufferSequential<T> {
private final int capacity;
private final AtomicReference<T>[] buffer;
/*
These offsets are in logical coordinates, i.e. coordinates meaningful to the client.
Constraints:
1. tail >= head
2. empty when head == tail
*/
private final AtomicInteger head; // the occupied position for next get
private final AtomicInteger tail; // the empty position for next put
public RingBufferSequential(final int capacity) {
this(capacity,0);
}
@SuppressWarnings("unchecked")
public RingBufferSequential(final int capacity, final int initialIndex) {
if (capacity < 1)
throw new IllegalArgumentException(String.format("capacity (%d) must be greater than 0", capacity));
this.capacity = capacity;
buffer = (AtomicReference<T>[])new AtomicReference[capacity];
for(int i = 0; i < capacity; ++i)
buffer[i] = new AtomicReference();
head = new AtomicInteger(initialIndex);
tail = new AtomicInteger(initialIndex);
assert isEmpty();
}
public boolean offer(final T x) {
if (isFull())
return false;
else {
/*
This is the only situation where this class can overflow an index
(i.e. tail or head) since head <= tail always.
*/
if (tail.get() == Integer.MAX_VALUE)
throw new RuntimeException(String.format(
"put() failed because tail index %d is too large to be incremented", tail.get()));
writeBounded(x);
tail.incrementAndGet();
return true;
}
}
/**
* One key feature of this ring buffer, over a queue, is that it provides O(1) indexed
* access.
*/
public Option<T> peek(final int index) {
if (head.get() <= index && index < tail.get())
return Option.of(readBounded(index));
else
return Option.none();
}
public Tuple2<Integer,Option<T>> peek() {
final int oldHead = head.get();
// can't use static method Tuple.of() here because...compiler
return new Tuple2<>(oldHead, peek(oldHead));
}
public Tuple2<Integer,Option<T>> poll() {
final int oldHead = head.get();
// can't use static method Tuple.of() here because...compiler
if (isEmpty())
return new Tuple2<>(oldHead,Option.none());
else
return returning(new Tuple2<>(oldHead,Option.of(readBounded(oldHead))),
_ignored -> head.incrementAndGet()); // side-effect: advance head
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("RingBufferSequential{");
sb.append("capacity=").append(capacity);
sb.append(", head=").append(head.get());
sb.append(", tail=").append(tail.get());
sb.append('}');
return sb.toString();
}
private void writeBounded(final T x) {
buffer[bound(tail.get())].set(x);
}
private T readBounded(final int index) {
return buffer[bound(index)].get();
}
private boolean isEmpty() {
return head.get() == tail.get();
}
private boolean isFull() {
return tail.get() - head.get() == capacity;
}
/**
* Convert logical coordinates to physical {@code buffer} offset.
*
* @param index
* @return
*/
private int bound(final int index) {
return Math.floorMod(index,capacity);
}
}
package com.thoughtpropulsion.reactrode;
import static org.assertj.core.api.Assertions.assertThat;
import io.vavr.Tuple2;
import io.vavr.control.Option;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class RingBufferSequentialTest {
private RingBufferSequential<String> buffer;
@BeforeEach
void before() {
buffer = new RingBufferSequential<String>(1);
}
@Test
void canPut() {
assertThat(buffer.offer("hi")).isTrue();
}
@Test
void canGetWhatWasPut() {
buffer.offer("hi");
assertGet("hi");
}
@Test
void canFill() {
buffer.offer("apple");
assertGet("apple");
}
@Test
void capacityLimit() {
buffer.offer("apple");
assertThat(buffer.offer("orange")).isFalse();
}
@Test
void windowMoves() {
buffer.offer("apple");
buffer.poll();
buffer.offer("orange");
assertGet("orange");
}
private void assertGet(final String value) {
final Tuple2<Integer, Option<String>> t2 = buffer.poll();
final Option<String> strings = t2._2;
strings.peek(string -> assertThat(string).isEqualTo(value))
.orElse(() -> {throw new AssertionError("expected to get something but got nothing");});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment