Skip to content

Instantly share code, notes, and snippets.

@allanwax
Created August 7, 2015 23:18
Show Gist options
  • Save allanwax/6cb5385737e671f76eb9 to your computer and use it in GitHub Desktop.
Save allanwax/6cb5385737e671f76eb9 to your computer and use it in GitHub Desktop.
Scan a redis cluster in parallel
package com.findology.util.jediscluster;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import redis.clients.jedis.*;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by allan.wax on 7/27/2015.
*/
public abstract class ClusterScanner {
private static final Logger log = Logger.getLogger(ClusterScanner.class);
protected static final ExecutorService executor = Executors.newCachedThreadPool();
protected static final HashMap<String, String> hostMap = new HashMap<>();
private JedisCluster jedisCluster = null;
private static boolean TEST_MODE = false;
public ClusterScanner(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}
public abstract void action(String key);
private String normalizeHostAndPort(String hostAndPort) {
try {
String hap = hostMap.get(hostAndPort);
if (hap == null) {
int colon = hostAndPort.lastIndexOf(':');
InetAddress address = InetAddress.getByName(hostAndPort.substring(0, colon));
hap = address.getHostAddress() + hostAndPort.substring(colon);
hostMap.put(hostAndPort, hap);
}
return hap;
}
catch (Exception e) {
return hostAndPort;
}
}
private class ScannerImpl implements Callable<Long> {
private String clusterInstance;
private String match;
public ScannerImpl(String clusterInstance, String match) {
this.clusterInstance = clusterInstance;
this.match = match;
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Long call() throws Exception {
long count = 0; /* TEST_MODE */
int scanCount = 0;
try {
String hostAndPort = normalizeHostAndPort(clusterInstance);
String[] parts = hostAndPort.split(":");
String host = parts[0];
int port = Integer.valueOf(parts[1]);
try (Jedis jedis = new Jedis(host, port)) {
ScanParams params = new ScanParams().match(match).count(100);
String scanMarker = "0";
ScanResult<String> results = null;
do {
results = jedis.scan(scanMarker, params);
scanCount++;
List<String> keys = results.getResult();
if (keys != null && keys.size() > 0) {
count += keys.size(); /* TEST_MODE */
for (String key : keys) {
action(key);
}
}
scanMarker = results.getStringCursor();
} while (!scanMarker.equals("0"));
}
if (TEST_MODE) {
log.info("Found " + count + " keys for " + hostAndPort + " in " + scanCount + " scans"); /* TEST_MODE */
}
}
catch (Exception e) {
log.error("" + e);
}
return count;
}
}
public void scan(String match) {
scan(match, 15);
}
public void scan(String match, int maxSeconds) {
if (TEST_MODE) {
log.info("Start scan for '" + match + "'");
}
Map<String, JedisPool> jedisPools = jedisCluster.getClusterNodes();
String nodeList = null;
Exception screwed = null;
// get the list of nodes (masters and slaves)
for (JedisPool pool : jedisPools.values()) {
try {
Jedis j = null;
try {
j = pool.getResource();
nodeList = j.clusterNodes();
}
catch (Exception e1) {
screwed = e1;
j.close();
j = null;
continue;
}
finally {
if (j != null) {
j.close();
break;
}
}
}
catch (Exception e) {
// DO SOMETHING
}
}
if (nodeList == null) {
log.error("The cluster is screwed. Can't use any members.", screwed);
return;
}
String[] nodes = nodeList.split("\n");
ArrayList<ScannerImpl> scanners = new ArrayList<>();
// pick out the masters
for (String node : nodes) {
String[] info = node.split("\\s+");
if (info[2].indexOf("fail") >= 0) {
continue;
}
if (info[2].indexOf("handshake") >= 0) {
continue;
}
if (info[2].indexOf("master") >= 0) {
scanners.add(new ScannerImpl(info[1], match));
}
}
if (!scanners.isEmpty()) {
try {
executor.invokeAll(scanners, maxSeconds, TimeUnit.SECONDS);
}
catch (Exception e) {
log.error(e);
}
}
}
/* TEST */
public static void main(String[] args) {
TEST_MODE = true;
Logger rootLogger = Logger.getRootLogger();
if (!rootLogger.getAllAppenders().hasMoreElements()) {
rootLogger.setLevel(Level.INFO);
rootLogger.addAppender(new ConsoleAppender(new org.apache.log4j.PatternLayout("%d{yyyy-MM-dd HH:mm:ss.SSS} [%t; %C{1}] %-5p -- %m%n")));
}
Set<HostAndPort> jcNodes = new HashSet<>();
jcNodes.add(new HostAndPort("test2", 17000));
final JedisCluster jc = new JedisCluster(jcNodes);
final AtomicInteger ai = new AtomicInteger(0);
ClusterScanner scanner = new ClusterScanner(jc) {
@Override
public void action(String key) {
ai.incrementAndGet();
}
};
String scanFor = "*";
long start = System.currentTimeMillis();
scanner.scan(scanFor);
log.info("Scan for '" + scanFor + "' found " + ai + " keys in " + ((System.currentTimeMillis() - start) / 1000.0) + " seconds");
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment