Skip to content

Instantly share code, notes, and snippets.

@sormuras
Last active August 29, 2015 14:05
Show Gist options
  • Save sormuras/3fea6f6ade327b2a4249 to your computer and use it in GitHub Desktop.
Save sormuras/3fea6f6ade327b2a4249 to your computer and use it in GitHub Desktop.
Not thread-safe Prevayler implementation
package org.prevayler.contrib.eight;
import static java.lang.String.format;
import static java.nio.file.Files.createLink;
import static java.nio.file.Files.deleteIfExists;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Date;
import org.prevayler.Clock;
import org.prevayler.Prevayler;
import org.prevayler.Query;
import org.prevayler.SureTransactionWithQuery;
import org.prevayler.Transaction;
import org.prevayler.TransactionWithQuery;
import org.prevayler.implementation.clock.MachineClock;
public class PrevaylerEight<P> implements Prevayler<P> {
class Journal implements Closeable {
class Slice {
/**
* Amount of all transaction in this slice.
*/
long transactionCounter;
/**
* Sliced view on the journal buffer.
*/
ByteBuffer buffer;
/**
* Object output stream writing to this slices buffer.
*/
ObjectOutputStream stream;
Slice(ByteBuffer buffer) throws IOException {
this.buffer = buffer;
buffer.putLong(transactionCounter = 0);
this.stream = new ObjectOutputStream(new ByteBufferOutputStream(buffer));
stream.writeUTF(Instant.now().toString());
}
<T> void write(T object, long time) throws IOException {
buffer.mark();
try {
stream.writeLong(time);
stream.writeObject(object);
stream.flush();
} catch (IOException e) {
buffer.reset();
throw e;
}
}
}
/**
* Active slice.
*/
Slice slice;
/**
* Amount of all slices in the journal file.
*/
int sliceCounter;
/**
* Amount of all transaction in the journal file. That is the sum of all transations of all slices.
*/
long transactionCounter;
/**
* Random access journal file.
*/
RandomAccessFile file;
/**
* Direct byte buffer whose content is the memory-mapped journal file.
*/
MappedByteBuffer memory;
@SuppressWarnings("unchecked")
Journal(File journalFile, long size) throws Exception {
this.file = new RandomAccessFile(journalFile, "rw");
if (file.length() == 0) {
// logger.info(format("Creating initial journal file. size = %d", size));
file.setLength(size);
}
this.memory = file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, size);
this.sliceCounter = memory.getInt(); // first entry is the slice counter
this.transactionCounter = memory.getLong(); // second entry is the journal transaction counter
// logger.info(format("Loading journal with %d transaction(s) separated in %d slices(s)...", transactionCounter, sliceCounter));
// reload journal slices
for (int number = 0; number < sliceCounter; number++) {
long transactions = memory.getLong();
try (ObjectInputStream in = new ObjectInputStream(new ByteBufferInputStream(memory))) {
@SuppressWarnings("unused")
Instant start = Instant.parse(in.readUTF());
// logger.info(format("Loading slice #%d with %d transaction(s) starting from %s...", number, transactions, start));
for (int j = 0; j < transactions; j++) {
long time = in.readLong();
Object object = in.readObject();
Date executionTime = new Date(time);
if (object instanceof Transaction)
((Transaction<P>) object).executeOn(prevalentSystem, executionTime);
else
((TransactionWithQuery<P, ?>) object).executeAndQuery(prevalentSystem, executionTime);
}
}
}
this.slice = new Slice(memory.slice());
// okay, write slice counter (without increasing the related field) absolutely and be done
memory.putInt(0, sliceCounter + 1);
}
void clear() throws IOException {
sliceCounter = 0;
memory.clear(); // TODO Erase buffer?
memory.putInt(sliceCounter + 1);
memory.putLong(transactionCounter = 0);
slice = new Slice(memory.slice());
}
@Override
public void close() throws IOException {
slice.stream.close();
memory.force();
file.close();
}
void commit() {
slice.buffer.putLong(0, ++slice.transactionCounter);
memory.putLong(4, ++transactionCounter);
}
<T> T copy(T object, long time) {
try {
slice.write(object, time);
} catch (IOException e) {
throw new Error(e);
}
return object;
}
double usage() {
return 100d - slice.buffer.remaining() * 100d / memory.capacity();
}
}
class Snapshot {
/**
* Point on the time-line when the snapshot was taken.
*/
Instant timestamp;
/**
* Number of transactions that led to this system snapshot.
*/
long transactionCounter;
@SuppressWarnings("unchecked")
P reload(File snap, P initialPrevalentSystem) {
if (!snap.exists()) {
// logger.info(format("No %s file found. Using initial prevalent system.", snap));
return initialPrevalentSystem;
}
try (ObjectInputStream stream = new ObjectInputStream(new FileInputStream(snap))) {
@SuppressWarnings("unused")
Instant instant = (Instant) stream.readObject();
// logger.info("Loading " + snap.getCanonicalPath() + " taken on " + instant + "...");
transactionCounter = stream.readLong();
P storedSystem = (P) stream.readObject();
// logger.info("Loaded " + storedSystem + " (transactionCounter=" + transactionCounter + ")");
return storedSystem;
} catch (Exception e) {
throw new Error(e);
}
}
File take(File base) throws Exception {
Instant now = Instant.now();
File shot = new File(base, "snap-" + now.toString().replace(':', '-') + ".shot");
try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(shot))) {
out.writeObject(now);
out.writeLong(transactionCounter = age());
out.writeObject(prevalentSystem);
}
// logger.info(format("%d bytes saved to %s", shot.length(), shot));
return shot;
}
}
private final File base;
private final Clock clock;
// private final Logger logger;
private final P prevalentSystem;
private Snapshot snapshot;
private Journal journal;
public PrevaylerEight(P prevalentSystem) throws Exception {
this(prevalentSystem, new File("PrevalenceBase"), "sliced.journal", 10 * 1024 * 1024);
}
public PrevaylerEight(P prevalentSystem, File base, String journalName, long journalSize) throws Exception {
// this.logger = Logger.getLogger(getClass().getName() + "(" + prevalentSystem.getClass().getSimpleName() + ")");
this.base = base;
if (base.mkdirs())
// logger.info(base.getAbsolutePath() + " created.")
;
this.clock = new MachineClock();
this.snapshot = new Snapshot();
this.prevalentSystem = snapshot.reload(new File(base, "snap.shot"), prevalentSystem);
this.journal = new Journal(new File(base, journalName), journalSize);
}
public long age() {
return snapshot.transactionCounter + journal.transactionCounter;
}
@Override
public Clock clock() {
return clock;
}
@Override
public void close() throws IOException {
journal.close();
}
@Override
public <R> R execute(Query<? super P, R> sensitiveQuery) throws Exception {
return sensitiveQuery.query(prevalentSystem, clock().time()); // no copy needed - queries are transient by contract
}
@Override
public <R> R execute(SureTransactionWithQuery<? super P, R> sureTransactionWithQuery) {
try {
return execute((TransactionWithQuery<? super P, R>) sureTransactionWithQuery);
} catch (RuntimeException runtime) {
throw runtime;
} catch (Exception checked) {
throw new RuntimeException("Unexpected exception thrown!", checked);
}
}
@Override
public void execute(Transaction<? super P> transaction) {
Date date = clock().time();
Transaction<? super P> copy = journal.copy(transaction, date.getTime());
copy.executeOn(prevalentSystem, date);
journal.commit();
}
@Override
public <R> R execute(TransactionWithQuery<? super P, R> transactionWithQuery) throws Exception {
Date date = clock().time();
TransactionWithQuery<? super P, R> copy = journal.copy(transactionWithQuery, date.getTime());
R result = copy.executeAndQuery(prevalentSystem, date);
journal.commit();
return result;
}
@Override
public P prevalentSystem() {
return prevalentSystem;
}
@Override
public File takeSnapshot() throws Exception {
File taken = snapshot.take(base);
Path snapshot = new File(base, "snap.shot").toPath();
deleteIfExists(snapshot);
createLink(snapshot, taken.toPath());
journal.clear();
return snapshot.toFile();
}
@Override
public String toString() {
return format("PrevaylerEight(%s)[age=%d,usage=%.2f%%]", prevalentSystem.getClass().getSimpleName(), age(), journal.usage());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment