Skip to content

Instantly share code, notes, and snippets.

@mathieuancelin
Last active December 24, 2015 23:39
Show Gist options
  • Save mathieuancelin/0fcacb380513733e52fe to your computer and use it in GitHub Desktop.
Save mathieuancelin/0fcacb380513733e52fe to your computer and use it in GitHub Desktop.
Issue with Excerpt length
import net.openhft.chronicle.ExcerptCommon;
import java.nio.charset.Charset;
public class AsyncAPIRequest implements WALRecord {
public String tenant;
public long time;
public long nanotime;
public String clazz;
public String payload;
public int size;
public static boolean writeBytes = true;
public AsyncAPIRequest(String tenant, String clazz, String payload){
this.tenant = tenant;
this.time = System.currentTimeMillis();
this.nanotime = System.nanoTime();
this.clazz = clazz;
this.payload = payload;
this.size = payload.length();
}
public AsyncAPIRequest() {
}
@Override
public int getCapacity(){
int value = 10000 + ((this.payload.length() * 8) * 3);
if (value > 16000000) {
return 16000000;
}
return value;
}
@Override
public void readMarshallable(ExcerptCommon in) throws IllegalStateException {
this.tenant = in.readUTF();
this.time = in.readLong();
this.nanotime = in.readLong();
this.clazz = in.readUTF();
if (writeBytes) {
this.size = in.readInt();
byte[] bytes = new byte[size];
in.read(bytes, 0, size);
this.payload = new String(bytes, Charset.forName("UTF-8"));
} else {
this.payload = in.readUTF();
}
}
@Override
public void writeMarshallable(ExcerptCommon out) {
out.writeUTF(tenant);
out.writeLong(time);
out.writeLong(nanotime);
out.writeUTF(clazz);
if (writeBytes) {
out.writeInt(size);
byte[] bytes = payload.getBytes(Charset.forName("UTF-8"));
out.write(bytes, 0, bytes.length);
} else {
out.writeUTF(payload);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AsyncAPIRequest)) return false;
AsyncAPIRequest that = (AsyncAPIRequest) o;
if (!tenant.equals(that.tenant)) return false;
if (time != that.time) return false;
if (nanotime != that.nanotime) return false;
if (!payload.equals(that.payload)) return false;
if (!clazz.equals(that.clazz)) return false;
return true;
}
@Override
public int hashCode() {
int result = tenant.hashCode();
result = 31 * result + (int) (time ^ (time >>> 32));
result = 31 * result + (int) (nanotime ^ (nanotime >>> 32));
result = 31 * result + clazz.hashCode();
result = 31 * result + payload.hashCode();
return result;
}
}
import com.google.common.io.Files;
import AsyncAPIRequest;
import WALRecord;
import WriteAheadLog;
import net.openhft.chronicle.ExcerptTailer;
import org.junit.Test;
import java.io.File;
public class WalFailWithUTFTest {
@Test
public void littleMessageWithComputeSize() throws Exception {
final String payload = "Hello World!!!";
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = true;
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void littleMessageWithComputeSizeAndWriteUTF() throws Exception {
final String payload = "Hello World!!!";
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = false;
// This does not fail, because of readUTF/writeUTF and normal payload
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void standardMessageWithComputeSize() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = true;
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void standardMessageWithComputeSizeAndWriteUTF() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = false;
// This does not fail, because of readUTF/writeUTF and normal payload
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSize() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 10; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = true;
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSizeAndWriteUTF() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 10; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = false;
// This fail, because of readUTF/writeUTF and big payload ???
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSizeAndExtremePayload() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 100; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = true;
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSizeAndExtremePayloadAndUTF() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 100; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = false;
// This fail, because of readUTF/writeUTF and big payload ???
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSizeAndVeryExtremePayload() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 500; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = true;
testWriteReadInWAL(payload, payloadSize);
}
@Test
public void bigMessageWithComputeSizeAndVeryExtremePayloadAndUTF() throws Exception {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 500; i++) {
builder.append(BLOB);
}
final String payload = builder.toString();
final int payloadSize = payload.length();
AsyncAPIRequest.writeBytes = false;
// This fail, because of readUTF/writeUTF and big payload ???
testWriteReadInWAL(payload, payloadSize);
}
public void testWriteReadInWAL(final String payload, final int expectedSize) throws Exception {
File tmp = Files.createTempDir();
WriteAheadLog wal = new WriteAheadLog(tmp);
try {
final WALRecord record = new AsyncAPIRequest("id", "clazz" , payload);
wal.append(record); // write in WAL
wal.redo(new WriteAheadLog.Function() {
@Override
public void copyDataToBuffer(long index, ExcerptTailer excerpt) {
AsyncAPIRequest read = new AsyncAPIRequest();
read.readMarshallable(excerpt); // read from WAL
if (expectedSize != read.payload.length()) System.err.println("The payload is truncated from %s characters to actually %s characters", expectedSize, read.payload.length());
Assert.assertEquals(expectedSize, read.payload.length());
Assert.assertEquals(payload, read.payload);
}
@Override
public void copyDataFromBuffer() {
// Nothing to do here
}
});
} finally {
wal.close();
tmp.delete();
tmp.deleteOnExit();
}
}
// length is 10117 characters
public static final String
}
public interface WALRecord extends ExcerptMarshallable {
int getCapacity();
}
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import net.openhft.chronicle.ChronicleConfig;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
public class WriteAheadLog {
private String walPath;
private IndexedChronicle chronicle;
private ExcerptAppender appender;
public WriteAheadLog(File directory) {
this(directory.getAbsolutePath());
}
public WriteAheadLog(final String walPath) {
assert walPath != null;
try {
this.initChronicleAndExcerpt(walPath);
} catch (IOException e) {
System.err.println("Cannot create Write Ahead Log", e);
}
}
private synchronized void initChronicleAndExcerpt(final String path) throws IOException {
this.walPath = path;
this.chronicle = createIndexedChronicle(path);
this.appender = this.chronicle.createAppender();
}
protected static String getWalPathFormat() {
return new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS").format(new Date(System.currentTimeMillis()));
}
private IndexedChronicle createIndexedChronicle(final String path) throws IOException {
return new IndexedChronicle(path, ChronicleConfig.SMALL);
}
public synchronized void append(final WALRecord walRecord) {
this.appender.startExcerpt(walRecord.getCapacity());
walRecord.writeMarshallable(this.appender);
this.appender.finish();
}
public long size() {
return this.chronicle.size();
}
public synchronized void rotate() throws IOException {
this.rotate(this.getDataFileName().getParent() + "/" + this.getWalPathFormat());
}
public synchronized void rotate(final String newPath) throws IOException {
this.close();
this.initChronicleAndExcerpt(newPath);
}
public synchronized void close() throws IOException {
this.appender.close();
this.chronicle.close();
}
public File getDataFileName() {
return new File(this.walPath + ".data");
}
public String getPath() {
return this.walPath;
}
public interface Function {
void copyDataToBuffer(long index, ExcerptTailer excerpt);
void copyDataFromBuffer();
}
public void redo(final Function function) throws IOException {
this.redo(size(), function);
}
public void redo(final long toIndex, final Function function) throws IOException {
final ExcerptTailer tailer = this.chronicle.createTailer();
int i = 0;
while(i < toIndex && tailer.nextIndex()) {
function.copyDataToBuffer(tailer.index(), tailer);
tailer.finish();
i++;
}
function.copyDataFromBuffer();
tailer.close();
this.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment