Skip to content

Instantly share code, notes, and snippets.

@davengeo
Last active October 19, 2015 21:07
Show Gist options
  • Save davengeo/fb6062a1a2fc0a674242 to your computer and use it in GitHub Desktop.
Save davengeo/fb6062a1a2fc0a674242 to your computer and use it in GitHub Desktop.
Observable Stream created from Infinispan Cache
/*
* Copyright (c) 2015.
* me@davengeo.com
*/
package org.daven.demo.rxcache;
import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import javax.annotation.PostConstruct;
import java.util.EventListener;
@Component
public class CacheStream {
private static Logger LOG = LoggerFactory.getLogger(CacheStream.class);
Cache<String, String> cache;
@PostConstruct
public void init() {
DefaultCacheManager defaultCacheManager = new DefaultCacheManager();
defaultCacheManager.start();
cache = defaultCacheManager.getCache("registry-1", true);
cache.addListener(new EmitWhenChanged());
}
public void put(String value) {
cache.put("one", value);
}
@Listener
class EmitWhenChanged {
@CacheEntryModified
public void modified(CacheEntryModifiedEvent event) {
LOG.info("value:{},pre:{},created:{}",
event.getValue(),
event.isPre(),
event.isCreated());
if(!event.isPre()) {
emitListener.emit((String) event.getValue());
}
}
}
private static EmitListener emitListener;
private static Observable<String> stream = Observable.
create((Subscriber<? super String> subscriber) -> {
register(subscriber::onNext);
}).subscribeOn(Schedulers.io());
public static Observable<String> getStream() {
return stream;
}
private static void register(EmitListener listener) {
emitListener = listener;
}
private interface EmitListener extends EventListener {
void emit(String str);
}
}
@davengeo
Copy link
Author

http://rxmarbles.com/#merge
for the next chapter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment