Skip to content

Instantly share code, notes, and snippets.

@krishna81m
Forked from mp911de/Client.java
Created January 10, 2020 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krishna81m/70615951adc437ba2bfbe2397595ab24 to your computer and use it in GitHub Desktop.
Save krishna81m/70615951adc437ba2bfbe2397595ab24 to your computer and use it in GitHub Desktop.
JMX Monitoring Demo of lettuce 3.4-SNAPSHOT
package com.lambdaworks.redis.experimental.mbean;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.springframework.test.util.ReflectionTestUtils;
import rx.functions.Action1;
import rx.functions.Func1;
import com.google.common.collect.Lists;
import com.lambdaworks.redis.ConnectionId;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.event.Event;
import com.lambdaworks.redis.event.connection.*;
import io.netty.channel.EventLoopGroup;
/**
* Demo for a JMX-based client management.
*
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
*/
public class Client implements ClientMBean {
private static volatile int counter;
private final RedisClient redisClient;
private final MBeanServer mBeanServer;
private final ObjectName self;
private final RedisConnection<String, String> monitoringConnection;
private Map<ConnectionId, Connection> connections = new ConcurrentHashMap<ConnectionId, Connection>();
private Client(RedisClient redisClient, final MBeanServer mBeanServer) throws JMException {
this.redisClient = redisClient;
redisClient.getResources().eventBus().get().filter(new Func1<Event, Boolean>() {
@Override
public Boolean call(Event event) {
return event instanceof ConnectionEvent;
}
}).cast(ConnectionEvent.class).doOnNext(new Action1<ConnectionEvent>() {
@Override
public void call(ConnectionEvent connectionEvent) {
try {
handleEvent(connectionEvent);
} catch (JMException e) {
throw new IllegalStateException(e);
}
}
}).subscribe();
this.mBeanServer = mBeanServer;
StringBuffer id = new StringBuffer();
id.append("name=RedisClient,");
id.append("id=" + (counter++));
this.self = ObjectName.getInstance("redis.client.lettuce:" + id);
this.mBeanServer.registerMBean(this, self);
this.monitoringConnection = redisClient.connect();
this.redisClient.getResources().eventExecutorGroup().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Map<ConnectionId, Connection> connections = new HashMap<ConnectionId, Connection>(Client.this.connections);
long timeout = TimeUnit.SECONDS.toMillis(10);
long clearDisconnectedConnectionsFromBefore = System.currentTimeMillis() - timeout;
for (Map.Entry<ConnectionId, Connection> entry : connections.entrySet()) {
Long disconnectedSince = entry.getValue().getDisconnectedSince();
if (disconnectedSince != null && disconnectedSince < clearDisconnectedConnectionsFromBefore) {
try {
mBeanServer.unregisterMBean(entry.getValue().getObjectName());
} catch (JMException e) {
e.printStackTrace();
}
Client.this.connections.remove(entry.getKey());
}
}
}
}, 10, 10, TimeUnit.SECONDS);
}
private void handleEvent(ConnectionEvent connectionEvent) throws JMException {
Connection connection = connections.get(connectionEvent);
if (connection == null) {
connection = new Connection(self, connectionEvent);
connections.put(connectionEvent, connection);
mBeanServer.registerMBean(connection, connection.getObjectName());
}
if (connectionEvent instanceof ConnectedEvent) {
connection.updateState("connected");
connection.setDisconnectedSince(null);
}
if (connectionEvent instanceof ConnectionActivatedEvent) {
connection.updateState("activated");
connection.setDisconnectedSince(null);
}
if (connectionEvent instanceof DisconnectedEvent) {
connection.updateState("disconnected");
connection.setDisconnectedSince(System.currentTimeMillis());
}
if (connectionEvent instanceof ConnectionDeactivatedEvent) {
connection.updateState("deactivated");
connection.setDisconnectedSince(System.currentTimeMillis());
}
}
public static Client create(RedisClient redisClient) throws JMException {
return new Client(redisClient, ManagementFactory.getPlatformMBeanServer());
}
@Override
public int getConnectionCount() {
Object o = ReflectionTestUtils.invokeGetterMethod(redisClient, "getChannelCount");
if (o != null) {
return (Integer) o;
}
return -1;
}
@Override
public int getIoThreadCount() {
Map<?, EventLoopGroup> eventLoopGroups = (Map) ReflectionTestUtils.getField(redisClient, "eventLoopGroups");
int result = 0;
for (EventLoopGroup eventExecutors : eventLoopGroups.values()) {
result += Lists.newArrayList(eventExecutors.iterator()).size();
}
return result;
}
@Override
public int getWorkerThreadCount() {
return Lists.newArrayList(redisClient.getResources().eventExecutorGroup().iterator()).size();
}
@Override
public String redisInfo() {
return monitoringConnection.info();
}
@Override
public String clientList() {
return monitoringConnection.clientList();
}
}
package com.lambdaworks.redis.experimental.mbean;
public interface ClientMBean {
/**
*
* @return number of connections
*/
int getConnectionCount();
/**
*
* @return number of all I/O threads
*/
int getIoThreadCount();
/**
*
* @return number of computation threads
*/
int getWorkerThreadCount();
/**
*
* @return The result of the Redis {@code INFO} command
*/
String redisInfo();
/**
*
* @return The result of the Redis {@code CLIENT LIST} command
*/
String clientList();
}
package com.lambdaworks.redis.experimental.mbean;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.management.JMException;
import javax.management.ObjectName;
import com.lambdaworks.redis.ConnectionId;
/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
*/
public class Connection implements ConnectionMBean {
private final ObjectName objectName;
private String state = "unknown";
private final SocketAddress local;
private final SocketAddress remote;
private Long disconnectedSince = null;
public Connection(ObjectName clientObjectName, ConnectionId connectionId) throws JMException {
local = connectionId.localAddress();
remote = connectionId.remoteAddress();
StringBuffer id = new StringBuffer();
id.append("name=" + clientObjectName.getKeyProperty("name"));
id.append(",id=" + clientObjectName.getKeyProperty("id"));
if (remote instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) remote;
id.append(",protocol=TCP");
id.append(",host=" + isa.getHostName());
id.append(",port=" + "" + isa.getPort());
} else {
id.append(",protocol=" + remote.getClass().getSimpleName());
id.append(",remote=" + remote.toString());
}
if (local instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) local;
id.append(",localPort=" + isa.getPort());
} else {
id.append(",local=" + remote.toString());
}
objectName = ObjectName.getInstance(clientObjectName.getDomain() + ":" + id);
}
public ObjectName getObjectName() {
return objectName;
}
public Long getDisconnectedSince() {
return disconnectedSince;
}
public void setDisconnectedSince(Long disconnectedSince) {
this.disconnectedSince = disconnectedSince;
}
@Override
public String getLocalAddress() {
return local.toString();
}
@Override
public String getRemoteAddress() {
return remote.toString();
}
@Override
public String getState() {
return state;
}
void updateState(String state) {
this.state = state;
}
}
package com.lambdaworks.redis.experimental.mbean;
public interface ConnectionMBean {
/**
*
* @return the local address
*/
String getLocalAddress();
/**
*
* @return the remote address
*/
String getRemoteAddress();
/**
*
* @return the connection state
*/
String getState();
}
package com.lambdaworks.redis.experimental.mbean;
import org.junit.Test;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.TestSettings;
/**
* JMX Demo Runner. Registers JMX Monitoring and creates/closes periodically connections.
*
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
*/
public class JMXDemo {
@Test
public void run() throws Exception {
RedisClient redisClient = RedisClient.create(RedisURI.Builder.redis(TestSettings.host(), TestSettings.port()).build());
Client.create(redisClient);
while (true) {
Thread.sleep(5000);
System.out.println("Connecting");
RedisConnection<String, String> connect = redisClient.connect();
connect.ping();
Thread.sleep(1000);
connect.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment