Created
October 10, 2011 18:43
-
-
Save vega113/1276144 to your computer and use it in GitHub Desktop.
A snippet with sample method modified to return ListebaleFuture
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DeltaStoreBasedWaveletState implements WaveletState { | |
// Some more declarations. | |
** Executor to evaluate results from future when they become ready. */ | |
private final Executor continuationExecutor = Executors.newSingleThreadExecutor(); | |
/** Keyed by appliedAtVersion. */ | |
private final ConcurrentMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>> | |
appliedDeltasToPersist = | |
new ConcurrentHashMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>>(); | |
/** Keyed by appliedAtVersion. */ | |
private final ConcurrentMap<HashedVersion, TransformedWaveletDelta> transformedDeltasToPersist = | |
new ConcurrentHashMap<HashedVersion, TransformedWaveletDelta>(); | |
/** Keyed by appliedAtVersion. */ | |
private final ConcurrentMap<HashedVersion, ListenableFuture<ByteStringMessage<ProtocolAppliedWaveletDelta>>> appliedDeltasCache = | |
new MapMaker().expireAfterAccess(5, TimeUnit.MINUTES).makeMap(); | |
/** Keyed by appliedAtVersion. */ | |
private final ConcurrentMap<HashedVersion, ListenableFuture<TransformedWaveletDelta>> transformedDeltasCache = | |
new MapMaker().expireAfterAccess(5, TimeUnit.MINUTES).makeMap(); | |
// .... | |
/** | |
* @return An entry keyed by a hashed version with the given version number, | |
* if any, otherwise null. | |
* @throws IOException | |
*/ | |
private ListenableFuture<WaveletDeltaRecord> lookup(final long version) { | |
ListenableFutureTask<WaveletDeltaRecord> futRecord = | |
new ListenableFutureTask<WaveletDeltaRecord>(new Callable<WaveletDeltaRecord>() { | |
@Override | |
public WaveletDeltaRecord call() throws IOException { | |
return deltasAccess.getDelta(version); | |
}}); | |
readAccessExecutor.execute(futRecord); | |
return futRecord; | |
} | |
@Override | |
public ListenableFuture<HashedVersion> getHashedVersion(final long version) { | |
final SettableFuture<HashedVersion> futVersion = SettableFuture.create(); | |
if (version == 0) { | |
futVersion.set(versionZero); | |
return futVersion; | |
} else if (snapshot == null) { | |
futVersion.set(null); | |
return futVersion; | |
} else if (version == snapshot.getVersion()) { | |
futVersion.set(snapshot.getHashedVersion()); | |
return futVersion; | |
} else { | |
final ListenableFuture<WaveletDeltaRecord> futRecord = lookup(version); | |
futRecord.addListener(new Runnable() { | |
@Override | |
public void run() { | |
WaveletDeltaRecord delta = null; | |
try { | |
delta = FutureUtil.getResultOrPropagateException(futRecord, IOException.class); | |
} catch (IOException e) { | |
LOG.warning("Problem when looking up delta " + version, e); | |
futVersion.setException(e); | |
} catch (InterruptedException e) { | |
futVersion.setException(e); | |
} | |
futVersion.set(delta != null ? delta.transformed.getResultingVersion() : null); | |
} | |
}, continuationExecutor); | |
return futVersion; | |
} | |
} | |
@Override | |
public ListenableFuture<TransformedWaveletDelta> getTransformedDelta( | |
final HashedVersion beginVersion) { | |
TransformedWaveletDelta delta = transformedDeltasToPersist.get(beginVersion); | |
if (delta != null) { | |
SettableFuture<TransformedWaveletDelta> futDelta = SettableFuture.create(); | |
futDelta.set(delta); | |
return futDelta; | |
} else { | |
ListenableFuture<TransformedWaveletDelta> futDelta = transformedDeltasCache.get(beginVersion); | |
if (futDelta == null) { | |
ListenableFutureTask<TransformedWaveletDelta> futDeltaNew = | |
new ListenableFutureTask<TransformedWaveletDelta>( | |
new Callable<TransformedWaveletDelta>() { | |
@Override | |
public TransformedWaveletDelta call() throws PersistenceException { | |
try { | |
return deltasAccess.getTransformedDelta(beginVersion.getVersion()); | |
} catch (IOException e) { | |
throw new PersistenceException("Cannot load transformed delta " | |
+ beginVersion, e); | |
} | |
} | |
}); | |
futDelta = transformedDeltasCache.putIfAbsent(beginVersion, futDeltaNew); | |
if (futDelta == null) { | |
futDelta = futDeltaNew; | |
readAccessExecutor.execute(futDeltaNew); | |
} | |
} | |
return futDelta; | |
} | |
} | |
// End of the class. | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment