Created
August 9, 2017 02:20
-
-
Save eoconnell/31db4480e26aed3a697173fd6a8fc2fd to your computer and use it in GitHub Desktop.
Reactive Geode CqListener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.*; | |
import org.apache.geode.cache.*; | |
import org.apache.geode.cache.client.ClientCache; | |
import org.apache.geode.cache.client.ClientCacheFactory; | |
import org.apache.geode.cache.client.ClientRegionFactory; | |
import org.apache.geode.cache.client.ClientRegionShortcut; | |
import org.apache.geode.cache.query.*; | |
import org.apache.geode.cache.query.internal.cq.CqAttributesImpl; | |
import java.util.Random; | |
import java.util.concurrent.TimeUnit; | |
public class RxCq { | |
static class CqOnSubscribe implements FlowableOnSubscribe<CqEvent> { | |
private final QueryService queryService; | |
private final String name; | |
private final String query; | |
public CqOnSubscribe(QueryService queryService, String name, String query) { | |
this.queryService = queryService; | |
this.name = name; | |
this.query = query; | |
} | |
@Override | |
public void subscribe(FlowableEmitter<CqEvent> e) throws Exception { | |
CqListenerOnSubscribe cqListener = new CqListenerOnSubscribe(); | |
cqListener.subscribe(e); | |
CqQuery cqQuery = queryService.newCq(name, query, cqAttributesWithListener(cqListener)); | |
new CqQueryInitialResultsOnSubscribe(cqQuery).subscribe(e); | |
} | |
private CqAttributes cqAttributesWithListener(CqListener listener) { | |
CqAttributesImpl cqAttributes = new CqAttributesImpl(); | |
cqAttributes.addCqListener(listener); | |
return cqAttributes; | |
} | |
} | |
static class CqQueryInitialResultsOnSubscribe implements FlowableOnSubscribe<CqEvent> { | |
private final CqQuery cqQuery; | |
public CqQueryInitialResultsOnSubscribe(CqQuery cqQuery) { | |
this.cqQuery = cqQuery; | |
} | |
@Override | |
public void subscribe(FlowableEmitter<CqEvent> emitter) throws Exception { | |
CqResults<Struct> results = cqQuery.executeWithInitialResults(); | |
results.stream() | |
.map(struct -> new InitialCqEvent(cqQuery, struct.get("key"), struct.get("value"))) | |
.forEach(emitter::onNext); | |
} | |
} | |
static class InitialCqEvent implements CqEvent { | |
private final CqQuery cqQuery; | |
private final Object key; | |
private final Object value; | |
public InitialCqEvent(CqQuery aCqQuery, Object aKey, Object aValue) { | |
cqQuery = aCqQuery; | |
key = aKey; | |
value = aValue; | |
} | |
@Override | |
public CqQuery getCq() { | |
return cqQuery; | |
} | |
@Override | |
public Operation getBaseOperation() { | |
return Operation.GET; | |
} | |
@Override | |
public Operation getQueryOperation() { | |
return Operation.GET; | |
} | |
@Override | |
public Object getKey() { | |
return key; | |
} | |
@Override | |
public Object getNewValue() { | |
return value; | |
} | |
@Override | |
public Throwable getThrowable() { | |
return null; | |
} | |
@Override | |
public byte[] getDeltaValue() { | |
return new byte[0]; | |
} | |
@Override | |
public String toString() { | |
StringBuffer buffer = new StringBuffer(); | |
buffer.append("CqEvent [").append("CqName=").append(cqQuery.getName()) | |
.append("; base operation=").append(getBaseOperation()).append("; cq operation=") | |
.append(getQueryOperation()).append("; key=").append(key).append("; value=") | |
.append(value).append("]"); | |
return buffer.toString(); | |
} | |
} | |
static class CqListenerOnSubscribe implements FlowableOnSubscribe<CqEvent>, CqListener { | |
private FlowableEmitter<CqEvent> emitter; | |
@Override | |
public void subscribe(FlowableEmitter<CqEvent> emitter) throws Exception { | |
this.emitter = emitter; | |
} | |
@Override | |
public void onEvent(CqEvent aCqEvent) { | |
emitter.onNext(aCqEvent); | |
} | |
@Override | |
public void onError(CqEvent aCqEvent) { | |
emitter.onError(aCqEvent.getThrowable()); | |
} | |
@Override | |
public void close() { | |
emitter.onComplete(); | |
} | |
} | |
public static void main(String[] args) throws QueryException, InterruptedException { | |
ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create(); | |
ClientRegionFactory<Object, Object> rf = cache.createClientRegionFactory(ClientRegionShortcut.PROXY); | |
Region<Object, Object> testRegion = rf.create("hello"); | |
testRegion.put(1, "Hello"); | |
testRegion.put(2, "World"); | |
QueryService qs = cache.getQueryService(); | |
Query query = qs.newQuery("SELECT * FROM /hello"); | |
SelectResults<Object> result = (SelectResults<Object>) query.execute(); | |
System.out.println(result.asList()); | |
Thread t = new Thread(() -> { | |
Random rand = new Random(); | |
try { | |
int i = 1; | |
while (true) { | |
TimeUnit.SECONDS.sleep(2); | |
if (rand.nextDouble() > 0.7) { | |
testRegion.destroy(i++ % 10); | |
} else { | |
testRegion.put(i++ % 10, System.currentTimeMillis()); | |
} | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
t.start(); | |
Flowable<CqEvent> flow = Flowable.create(new CqOnSubscribe(cache.getQueryService(), "testcq", "SELECT * FROM /hello"), BackpressureStrategy.BUFFER); | |
flow.subscribe(System.out::println, Throwable::printStackTrace); | |
t.join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment