Skip to content

Instantly share code, notes, and snippets.

@Romeh
Created December 5, 2017 16:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Romeh/d421a3657089572c4a164714f63ae728 to your computer and use it in GitHub Desktop.
Save Romeh/d421a3657089572c4a164714f63ae728 to your computer and use it in GitHub Desktop.
/**
* the main ignite snapshot implementation based into SnapshotStore
*/
@Slf4j
public class IgniteSnapshotStore extends SnapshotStore {
private final Serializer serializer;
private final Store<SnapshotItem> storage;
private final IgniteCache<Long, SnapshotItem> cache;
private final BiFunction<Config, ActorSystem, IgniteCache<Long, SnapshotItem>> snapshotCacheProvider =
new SnapshotCacheProvider();
public IgniteSnapshotStore(Config config) throws NotSerializableException {
ActorSystem actorSystem = context().system();
storage = new Store<>(actorSystem);
serializer = SerializationExtension.get(actorSystem).serializerFor(Snapshot.class);
cache = snapshotCacheProvider.apply(config, actorSystem);
}
private static Set<Long> listsToSetLong(List<List<?>> list) {
return list.stream().flatMap(Collection::stream).filter(o -> o instanceof Long).map(o -> (Long) o).collect(Collectors.toSet());
}
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doLoadAsync '{}' {} {}", persistenceId, criteria.minSequenceNr(), criteria.toString());
}
try (QueryCursor<Cache.Entry<Long, SnapshotItem>> query = cache
.query(new SqlQuery<Long, SnapshotItem>(SnapshotItem.class, "sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?")
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId))) {
List<Cache.Entry<Long, SnapshotItem>> iterator = query.getAll();
final Optional<Cache.Entry<Long, SnapshotItem>> max = iterator.stream().max((o1, o2) -> {
if (o1.getValue().getSequenceNr() > o2.getValue().getSequenceNr()) {
return 1;
} else if (o1.getValue().getTimestamp() > o2.getValue().getTimestamp()) {
return 1;
} else {
return -1;
}
});
return Optional.ofNullable(max.isPresent() ? convert(persistenceId, max.get().getValue()) : null);
}
});
}
@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doSaveAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr());
}
SnapshotItem item = convert(metadata, snapshot);
cache.put(item.getSequenceNr(), item);
return null;
});
}
@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doDeleteAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr());
}
cache.remove(metadata.sequenceNr());
return null;
});
}
@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doDeleteAsync '{}' ({}; {})", persistenceId, criteria.minSequenceNr(), criteria.maxSequenceNr());
}
List<List<?>> seq = cache
.query(new SqlFieldsQuery("select sequenceNr from SnapshotItem where sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?")
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId))
.getAll();
Set<Long> keys = listsToSetLong(seq);
if (log.isDebugEnabled()) {
log.debug("remove keys {}", keys);
}
cache.removeAll(keys);
return null;
});
}
private SnapshotItem convert(SnapshotMetadata metadata, Object snapshot) {
return new SnapshotItem(metadata.sequenceNr(), metadata.persistenceId(), metadata.timestamp(), serializer.toBinary(new Snapshot(snapshot)));
}
private SelectedSnapshot convert(String persistenceId, SnapshotItem item) {
SnapshotMetadata metadata = new SnapshotMetadata(persistenceId, item.getSequenceNr(), item.getTimestamp());
Snapshot snapshot = (Snapshot) serializer.fromBinary(item.getPayload());
return SelectedSnapshot.create(metadata, snapshot.data());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment