Last active
August 29, 2015 14:05
-
-
Save sormuras/3fea6f6ade327b2a4249 to your computer and use it in GitHub Desktop.
Not thread-safe Prevayler implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.prevayler.contrib.eight; | |
import static java.lang.String.format; | |
import static java.nio.file.Files.createLink; | |
import static java.nio.file.Files.deleteIfExists; | |
import java.io.Closeable; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.ObjectInputStream; | |
import java.io.ObjectOutputStream; | |
import java.io.RandomAccessFile; | |
import java.nio.ByteBuffer; | |
import java.nio.MappedByteBuffer; | |
import java.nio.channels.FileChannel; | |
import java.nio.file.Path; | |
import java.time.Instant; | |
import java.util.Date; | |
import org.prevayler.Clock; | |
import org.prevayler.Prevayler; | |
import org.prevayler.Query; | |
import org.prevayler.SureTransactionWithQuery; | |
import org.prevayler.Transaction; | |
import org.prevayler.TransactionWithQuery; | |
import org.prevayler.implementation.clock.MachineClock; | |
public class PrevaylerEight<P> implements Prevayler<P> { | |
class Journal implements Closeable { | |
class Slice { | |
/** | |
* Amount of all transaction in this slice. | |
*/ | |
long transactionCounter; | |
/** | |
* Sliced view on the journal buffer. | |
*/ | |
ByteBuffer buffer; | |
/** | |
* Object output stream writing to this slices buffer. | |
*/ | |
ObjectOutputStream stream; | |
Slice(ByteBuffer buffer) throws IOException { | |
this.buffer = buffer; | |
buffer.putLong(transactionCounter = 0); | |
this.stream = new ObjectOutputStream(new ByteBufferOutputStream(buffer)); | |
stream.writeUTF(Instant.now().toString()); | |
} | |
<T> void write(T object, long time) throws IOException { | |
buffer.mark(); | |
try { | |
stream.writeLong(time); | |
stream.writeObject(object); | |
stream.flush(); | |
} catch (IOException e) { | |
buffer.reset(); | |
throw e; | |
} | |
} | |
} | |
/** | |
* Active slice. | |
*/ | |
Slice slice; | |
/** | |
* Amount of all slices in the journal file. | |
*/ | |
int sliceCounter; | |
/** | |
* Amount of all transaction in the journal file. That is the sum of all transations of all slices. | |
*/ | |
long transactionCounter; | |
/** | |
* Random access journal file. | |
*/ | |
RandomAccessFile file; | |
/** | |
* Direct byte buffer whose content is the memory-mapped journal file. | |
*/ | |
MappedByteBuffer memory; | |
@SuppressWarnings("unchecked") | |
Journal(File journalFile, long size) throws Exception { | |
this.file = new RandomAccessFile(journalFile, "rw"); | |
if (file.length() == 0) { | |
// logger.info(format("Creating initial journal file. size = %d", size)); | |
file.setLength(size); | |
} | |
this.memory = file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, size); | |
this.sliceCounter = memory.getInt(); // first entry is the slice counter | |
this.transactionCounter = memory.getLong(); // second entry is the journal transaction counter | |
// logger.info(format("Loading journal with %d transaction(s) separated in %d slices(s)...", transactionCounter, sliceCounter)); | |
// reload journal slices | |
for (int number = 0; number < sliceCounter; number++) { | |
long transactions = memory.getLong(); | |
try (ObjectInputStream in = new ObjectInputStream(new ByteBufferInputStream(memory))) { | |
@SuppressWarnings("unused") | |
Instant start = Instant.parse(in.readUTF()); | |
// logger.info(format("Loading slice #%d with %d transaction(s) starting from %s...", number, transactions, start)); | |
for (int j = 0; j < transactions; j++) { | |
long time = in.readLong(); | |
Object object = in.readObject(); | |
Date executionTime = new Date(time); | |
if (object instanceof Transaction) | |
((Transaction<P>) object).executeOn(prevalentSystem, executionTime); | |
else | |
((TransactionWithQuery<P, ?>) object).executeAndQuery(prevalentSystem, executionTime); | |
} | |
} | |
} | |
this.slice = new Slice(memory.slice()); | |
// okay, write slice counter (without increasing the related field) absolutely and be done | |
memory.putInt(0, sliceCounter + 1); | |
} | |
void clear() throws IOException { | |
sliceCounter = 0; | |
memory.clear(); // TODO Erase buffer? | |
memory.putInt(sliceCounter + 1); | |
memory.putLong(transactionCounter = 0); | |
slice = new Slice(memory.slice()); | |
} | |
@Override | |
public void close() throws IOException { | |
slice.stream.close(); | |
memory.force(); | |
file.close(); | |
} | |
void commit() { | |
slice.buffer.putLong(0, ++slice.transactionCounter); | |
memory.putLong(4, ++transactionCounter); | |
} | |
<T> T copy(T object, long time) { | |
try { | |
slice.write(object, time); | |
} catch (IOException e) { | |
throw new Error(e); | |
} | |
return object; | |
} | |
double usage() { | |
return 100d - slice.buffer.remaining() * 100d / memory.capacity(); | |
} | |
} | |
class Snapshot { | |
/** | |
* Point on the time-line when the snapshot was taken. | |
*/ | |
Instant timestamp; | |
/** | |
* Number of transactions that led to this system snapshot. | |
*/ | |
long transactionCounter; | |
@SuppressWarnings("unchecked") | |
P reload(File snap, P initialPrevalentSystem) { | |
if (!snap.exists()) { | |
// logger.info(format("No %s file found. Using initial prevalent system.", snap)); | |
return initialPrevalentSystem; | |
} | |
try (ObjectInputStream stream = new ObjectInputStream(new FileInputStream(snap))) { | |
@SuppressWarnings("unused") | |
Instant instant = (Instant) stream.readObject(); | |
// logger.info("Loading " + snap.getCanonicalPath() + " taken on " + instant + "..."); | |
transactionCounter = stream.readLong(); | |
P storedSystem = (P) stream.readObject(); | |
// logger.info("Loaded " + storedSystem + " (transactionCounter=" + transactionCounter + ")"); | |
return storedSystem; | |
} catch (Exception e) { | |
throw new Error(e); | |
} | |
} | |
File take(File base) throws Exception { | |
Instant now = Instant.now(); | |
File shot = new File(base, "snap-" + now.toString().replace(':', '-') + ".shot"); | |
try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(shot))) { | |
out.writeObject(now); | |
out.writeLong(transactionCounter = age()); | |
out.writeObject(prevalentSystem); | |
} | |
// logger.info(format("%d bytes saved to %s", shot.length(), shot)); | |
return shot; | |
} | |
} | |
private final File base; | |
private final Clock clock; | |
// private final Logger logger; | |
private final P prevalentSystem; | |
private Snapshot snapshot; | |
private Journal journal; | |
public PrevaylerEight(P prevalentSystem) throws Exception { | |
this(prevalentSystem, new File("PrevalenceBase"), "sliced.journal", 10 * 1024 * 1024); | |
} | |
public PrevaylerEight(P prevalentSystem, File base, String journalName, long journalSize) throws Exception { | |
// this.logger = Logger.getLogger(getClass().getName() + "(" + prevalentSystem.getClass().getSimpleName() + ")"); | |
this.base = base; | |
if (base.mkdirs()) | |
// logger.info(base.getAbsolutePath() + " created.") | |
; | |
this.clock = new MachineClock(); | |
this.snapshot = new Snapshot(); | |
this.prevalentSystem = snapshot.reload(new File(base, "snap.shot"), prevalentSystem); | |
this.journal = new Journal(new File(base, journalName), journalSize); | |
} | |
public long age() { | |
return snapshot.transactionCounter + journal.transactionCounter; | |
} | |
@Override | |
public Clock clock() { | |
return clock; | |
} | |
@Override | |
public void close() throws IOException { | |
journal.close(); | |
} | |
@Override | |
public <R> R execute(Query<? super P, R> sensitiveQuery) throws Exception { | |
return sensitiveQuery.query(prevalentSystem, clock().time()); // no copy needed - queries are transient by contract | |
} | |
@Override | |
public <R> R execute(SureTransactionWithQuery<? super P, R> sureTransactionWithQuery) { | |
try { | |
return execute((TransactionWithQuery<? super P, R>) sureTransactionWithQuery); | |
} catch (RuntimeException runtime) { | |
throw runtime; | |
} catch (Exception checked) { | |
throw new RuntimeException("Unexpected exception thrown!", checked); | |
} | |
} | |
@Override | |
public void execute(Transaction<? super P> transaction) { | |
Date date = clock().time(); | |
Transaction<? super P> copy = journal.copy(transaction, date.getTime()); | |
copy.executeOn(prevalentSystem, date); | |
journal.commit(); | |
} | |
@Override | |
public <R> R execute(TransactionWithQuery<? super P, R> transactionWithQuery) throws Exception { | |
Date date = clock().time(); | |
TransactionWithQuery<? super P, R> copy = journal.copy(transactionWithQuery, date.getTime()); | |
R result = copy.executeAndQuery(prevalentSystem, date); | |
journal.commit(); | |
return result; | |
} | |
@Override | |
public P prevalentSystem() { | |
return prevalentSystem; | |
} | |
@Override | |
public File takeSnapshot() throws Exception { | |
File taken = snapshot.take(base); | |
Path snapshot = new File(base, "snap.shot").toPath(); | |
deleteIfExists(snapshot); | |
createLink(snapshot, taken.toPath()); | |
journal.clear(); | |
return snapshot.toFile(); | |
} | |
@Override | |
public String toString() { | |
return format("PrevaylerEight(%s)[age=%d,usage=%.2f%%]", prevalentSystem.getClass().getSimpleName(), age(), journal.usage()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment