Skip to content

Instantly share code, notes, and snippets.

@s1monw
Created April 28, 2015 14:32
Show Gist options
  • Save s1monw/06ee69f8d0dc86ec89db to your computer and use it in GitHub Desktop.
Save s1monw/06ee69f8d0dc86ec89db to your computer and use it in GitHub Desktop.
diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java
index 295a4e8..eeb85cd 100644
--- a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java
+++ b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java
@@ -68,19 +68,7 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
return read(buffer, location.translogLocation, location.size);
}
- /**
- * reads an operation from the given position and changes to point at the start of the next
- * operation
- */
- public Translog.Operation readOpAndAdvancePosition(ByteBuffer reusableBuffer, AtomicLong position) throws IOException {
- final long pos = position.get();
- int opSize = readSize(reusableBuffer, pos);
- Translog.Operation op = read(reusableBuffer, pos, opSize);
- position.addAndGet(opSize);
- return op;
- }
-
- protected int readSize(ByteBuffer reusableBuffer, long position) {
+ public int readSize(ByteBuffer reusableBuffer, long position) {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try {
@@ -99,7 +87,7 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
*/
- protected Translog.Operation read(ByteBuffer reusableBuffer, long position, int opSize) throws IOException {
+ public Translog.Operation read(ByteBuffer reusableBuffer, long position, int opSize) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
index 2e0d1d0..7364ccb 100644
--- a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
+++ b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
@@ -37,12 +37,11 @@ public class FsChannelSnapshot implements Closeable {
protected final FsChannelReader reader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
- // we use an atomic long to allow passing it by reference :(
- protected final AtomicLong position;
+ protected long position;
public FsChannelSnapshot(FsChannelReader reader) {
this.reader = reader;
- this.position = new AtomicLong(reader.firstPosition());
+ this.position = reader.firstPosition();
}
public long translogId() {
@@ -54,10 +53,13 @@ public class FsChannelSnapshot implements Closeable {
}
public Translog.Operation next(ByteBuffer reusableBuffer) throws IOException {
- if (position.get() >= reader.sizeInBytes()) {
+ if (position >= reader.sizeInBytes()) {
return null;
}
- return reader.readOpAndAdvancePosition(reusableBuffer, position);
+ int opSize = reader.readSize(reusableBuffer, position);
+ Translog.Operation op = reader.read(reusableBuffer, position, opSize);
+ position += opSize;
+ return op;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment