Skip to content

Instantly share code, notes, and snippets.

@jackeylu
Created August 3, 2016 07:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jackeylu/edf600b66273d17db0438065ae362934 to your computer and use it in GitHub Desktop.
Save jackeylu/edf600b66273d17db0438065ae362934 to your computer and use it in GitHub Desktop.
If p2p is enabled, CQ failed when 3rd node join the cluster.
package example.ignite;
import javax.cache.event.CacheEntryEvent;
import java.util.EventListener;
/**
* Created by jackeylv on 2016/8/3.
*/
public interface CacheEventProcessor extends EventListener {
void entryInserted(CacheEntryEvent event);
void entryUpdated(CacheEntryEvent event);
void entryDeleted(CacheEntryEvent event);
}
package example.ignite;
import com.tencent.perf.ignite.AlwaysTrueRemoteFilter;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.typedef.G;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import java.util.concurrent.TimeUnit;
/**
* The second node, as a client with a CQ listener.
* Created by jackeylv on 2016/8/3.
*/
public class CQListenerClient {
public static void main(String[] args) throws InterruptedException {
IgniteConfiguration icfg = new IgniteConfiguration();
icfg.setIgniteHome(System.getProperty("user.dir"));
icfg.setWorkDirectory(System.getProperty("user.dir"));
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter.
icfg.setPeerClassLoadingEnabled(true);
BinaryConfiguration bCfg = new BinaryConfiguration();
bCfg.setCompactFooter(true);
bCfg.setNameMapper(new BinaryBasicNameMapper(false));
bCfg.setIdMapper(new BinaryBasicIdMapper(false));
icfg.setMarshaller(new BinaryMarshaller());
icfg.setBinaryConfiguration(bCfg);
Ignition.setClientMode(true);
QueryCursor cursor = null;
try {
Ignite ignite = Ignition.start(icfg);
IgniteCache cache = ignite.getOrCreateCache("Test");
CacheEventProcessor listener = new EventPrinter();
cursor = registerEventListener(cache, listener);
cache.put("Hello", "world");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}finally {
if (cursor!=null)
cursor.close();
}
}
public static QueryCursor registerEventListener(final IgniteCache cache,
final CacheEventProcessor listener){
final IgniteLogger logger = G.ignite().log();
ContinuousQuery qry = new ContinuousQuery();
qry.setAutoUnsubscribe(true);
qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> cacheEntryEvents)
throws CacheEntryListenerException {
for (CacheEntryEvent event :
cacheEntryEvents) {
logger.info(Thread.currentThread().getName() + " receive CacheEntryEvent " + event );
localEventProcess(listener, event, cache.getName() ,logger);
}
}
});
AlwaysTrueRemoteFilter rmtFilter = new AlwaysTrueRemoteFilter(listener);
qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFilter));
QueryCursor cursor = cache.query(qry);
return cursor;
}
private static void localEventProcess(CacheEventProcessor listener, CacheEntryEvent event, String name, IgniteLogger logger) {
logger.info("listener = [" + listener + "], event = [" + event + "], name = [" + name + "], logger = [" + logger + "]");
}
}
package example.ignite;
import javax.cache.event.CacheEntryEvent;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Created by jackeylv on 2016/8/3.
*/
public class EventPrinter implements CacheEventProcessor, Externalizable {
@Override
public void entryInserted(CacheEntryEvent event) {
System.out.println("EventPrinter.entryInserted " + event);
}
@Override
public void entryUpdated(CacheEntryEvent event) {
System.out.println("EventPrinter.entryUpdated " + event);
}
@Override
public void entryDeleted(CacheEntryEvent event) {
System.out.println("EventPrinter.entryDeleted " + event);
}
// dummy code, for Externalization
public EventPrinter(){
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
}
}
package example.ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
/**
* The first node, as a server.
* Created by jackeylv on 2016/8/3.
*/
public class IgniteServer {
public static void main(String[] args) {
IgniteConfiguration icfg = new IgniteConfiguration();
icfg.setIgniteHome(System.getProperty("user.dir"));
icfg.setWorkDirectory(System.getProperty("user.dir"));
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter.
icfg.setPeerClassLoadingEnabled(true);
BinaryConfiguration bCfg = new BinaryConfiguration();
bCfg.setCompactFooter(true);
bCfg.setNameMapper(new BinaryBasicNameMapper(false));
bCfg.setIdMapper(new BinaryBasicIdMapper(false));
icfg.setMarshaller(new BinaryMarshaller());
icfg.setBinaryConfiguration(bCfg);
Ignition.start(icfg);
}
}
package example.ignite;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import java.util.concurrent.TimeUnit;
/**
* The 3rd node, try to put some key-value paris.
* Created by jackeylv on 2016/8/3.
*/
public class IgniteSimpleClient {
public static void main(String[] args) throws InterruptedException {
IgniteConfiguration icfg = new IgniteConfiguration();
icfg.setIgniteHome(System.getProperty("user.dir"));
icfg.setWorkDirectory(System.getProperty("user.dir"));
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter.
icfg.setPeerClassLoadingEnabled(true);
BinaryConfiguration bCfg = new BinaryConfiguration();
bCfg.setCompactFooter(true);
bCfg.setNameMapper(new BinaryBasicNameMapper(false));
bCfg.setIdMapper(new BinaryBasicIdMapper(false));
icfg.setMarshaller(new BinaryMarshaller());
icfg.setBinaryConfiguration(bCfg);
Ignition.setClientMode(true);
Ignite ignite = Ignition.start(icfg);
IgniteCache cache = ignite.getOrCreateCache("Test");
for (int i = 0; i < 100; i++) {
cache.put(i, i);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment