Skip to content

Instantly share code, notes, and snippets.

@eoconnell
Created August 9, 2017 02:20
Show Gist options
  • Save eoconnell/31db4480e26aed3a697173fd6a8fc2fd to your computer and use it in GitHub Desktop.
Save eoconnell/31db4480e26aed3a697173fd6a8fc2fd to your computer and use it in GitHub Desktop.
Reactive Geode CqListener
import io.reactivex.*;
import org.apache.geode.cache.*;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.*;
import org.apache.geode.cache.query.internal.cq.CqAttributesImpl;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class RxCq {
static class CqOnSubscribe implements FlowableOnSubscribe<CqEvent> {
private final QueryService queryService;
private final String name;
private final String query;
public CqOnSubscribe(QueryService queryService, String name, String query) {
this.queryService = queryService;
this.name = name;
this.query = query;
}
@Override
public void subscribe(FlowableEmitter<CqEvent> e) throws Exception {
CqListenerOnSubscribe cqListener = new CqListenerOnSubscribe();
cqListener.subscribe(e);
CqQuery cqQuery = queryService.newCq(name, query, cqAttributesWithListener(cqListener));
new CqQueryInitialResultsOnSubscribe(cqQuery).subscribe(e);
}
private CqAttributes cqAttributesWithListener(CqListener listener) {
CqAttributesImpl cqAttributes = new CqAttributesImpl();
cqAttributes.addCqListener(listener);
return cqAttributes;
}
}
static class CqQueryInitialResultsOnSubscribe implements FlowableOnSubscribe<CqEvent> {
private final CqQuery cqQuery;
public CqQueryInitialResultsOnSubscribe(CqQuery cqQuery) {
this.cqQuery = cqQuery;
}
@Override
public void subscribe(FlowableEmitter<CqEvent> emitter) throws Exception {
CqResults<Struct> results = cqQuery.executeWithInitialResults();
results.stream()
.map(struct -> new InitialCqEvent(cqQuery, struct.get("key"), struct.get("value")))
.forEach(emitter::onNext);
}
}
static class InitialCqEvent implements CqEvent {
private final CqQuery cqQuery;
private final Object key;
private final Object value;
public InitialCqEvent(CqQuery aCqQuery, Object aKey, Object aValue) {
cqQuery = aCqQuery;
key = aKey;
value = aValue;
}
@Override
public CqQuery getCq() {
return cqQuery;
}
@Override
public Operation getBaseOperation() {
return Operation.GET;
}
@Override
public Operation getQueryOperation() {
return Operation.GET;
}
@Override
public Object getKey() {
return key;
}
@Override
public Object getNewValue() {
return value;
}
@Override
public Throwable getThrowable() {
return null;
}
@Override
public byte[] getDeltaValue() {
return new byte[0];
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("CqEvent [").append("CqName=").append(cqQuery.getName())
.append("; base operation=").append(getBaseOperation()).append("; cq operation=")
.append(getQueryOperation()).append("; key=").append(key).append("; value=")
.append(value).append("]");
return buffer.toString();
}
}
static class CqListenerOnSubscribe implements FlowableOnSubscribe<CqEvent>, CqListener {
private FlowableEmitter<CqEvent> emitter;
@Override
public void subscribe(FlowableEmitter<CqEvent> emitter) throws Exception {
this.emitter = emitter;
}
@Override
public void onEvent(CqEvent aCqEvent) {
emitter.onNext(aCqEvent);
}
@Override
public void onError(CqEvent aCqEvent) {
emitter.onError(aCqEvent.getThrowable());
}
@Override
public void close() {
emitter.onComplete();
}
}
public static void main(String[] args) throws QueryException, InterruptedException {
ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create();
ClientRegionFactory<Object, Object> rf = cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<Object, Object> testRegion = rf.create("hello");
testRegion.put(1, "Hello");
testRegion.put(2, "World");
QueryService qs = cache.getQueryService();
Query query = qs.newQuery("SELECT * FROM /hello");
SelectResults<Object> result = (SelectResults<Object>) query.execute();
System.out.println(result.asList());
Thread t = new Thread(() -> {
Random rand = new Random();
try {
int i = 1;
while (true) {
TimeUnit.SECONDS.sleep(2);
if (rand.nextDouble() > 0.7) {
testRegion.destroy(i++ % 10);
} else {
testRegion.put(i++ % 10, System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
Flowable<CqEvent> flow = Flowable.create(new CqOnSubscribe(cache.getQueryService(), "testcq", "SELECT * FROM /hello"), BackpressureStrategy.BUFFER);
flow.subscribe(System.out::println, Throwable::printStackTrace);
t.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment