Skip to content

Instantly share code, notes, and snippets.

@nshaw
Last active August 26, 2022 07:47
Show Gist options
  • Save nshaw/5111802 to your computer and use it in GitHub Desktop.
Save nshaw/5111802 to your computer and use it in GitHub Desktop.
v3. Sorted nodes visible to ClusterExecutor, added default heartbeat value to support LR 6.0 v2. Added more logging to improve readability, removed some logic which was examining packet content, changed duration to be based on configured heartbeat interval v1. First revision
import com.liferay.portal.kernel.cluster.ClusterExecutorUtil
import com.liferay.portal.kernel.cluster.ClusterNode
import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream
import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream
import com.liferay.portal.kernel.util.DateUtil
import com.liferay.portal.kernel.util.GetterUtil
import com.liferay.portal.kernel.util.PropsUtil
import com.liferay.portal.kernel.util.StringBundler
import com.liferay.util.transport.DatagramHandler
import com.liferay.util.transport.MulticastDatagramHandler
import com.liferay.util.transport.MulticastTransport
import com.liferay.util.transport.Transport
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.zip.GZIPInputStream
////////
//This script does two things when executed through the Server Admin - Script console.
// 1) Display the cluster nodes currently known to the ClusterExecutor.
// 2) Attach a multicast handler to the control channel and record any nodes sending traffic (e.g. heartbeats) on that channel.
//
// If you turn up logging to at least info on com.liferay.util.transport, you'll also see each datagram packet logged to the
// server logs.
///////
try {
boolean clusteringEnabled = ClusterExecutorUtil.isEnabled();
println "Clustering enabled: " + clusteringEnabled;
if (!clusteringEnabled) {
return;
}
println "-----"
List<ClusterNode> nodes = ClusterExecutorUtil.getClusterNodes()
int numberOfNodes = nodes.size();
println "Number of nodes registered in ClusterExecutor: " + numberOfNodes
List<ClusterNode> sortedNodes = nodes.sort(new Comparator<ClusterNode>() {
int compare(ClusterNode o1, ClusterNode o2) {
return o1.getHostName().compareTo(o2.getHostName());
}
})
for (ClusterNode node : sortedNodes) {
println node.getHostName() + "," +
node.getInetAddress() + "," +
node.getPort() + "," +
node.getClusterNodeId()
}
// Defaults for the heartbeat/control-channel
String host =
PropsUtil.get("multicast.group.address[\"cluster-link-control\"]")
Integer port = GetterUtil.getInteger(
PropsUtil.get("multicast.group.port[\"cluster-link-control\"]")
);
Integer heartbeat = GetterUtil.getInteger(
PropsUtil.get("cluster.executor.heartbeat.interval"),
5
);
Boolean gzipData = false;
Boolean shortData = true;
// Try a fairly long test duration - four times the heartbeat
int heartbeatMultiplier = 4;
int testDurationInMilliseconds = heartbeatMultiplier*heartbeat;
CustomMulticastDatagramHandler handler = new CustomMulticastDatagramHandler(
gzipData.booleanValue(), shortData.booleanValue());
println "-----"
println "Created transport against multicast group " + host + ":" + port;
CustomMulticastTransport transport = new CustomMulticastTransport(
handler, host, port);
// if (shortData.booleanValue()) {
// println "Truncating to 96 bytes.";
// }
println "Started up heartbeat listener at " + DateUtil.newDate();
transport.connect();
ExecutorService service = Executors.newSingleThreadExecutor();
try {
service.submit(transport).get(testDurationInMilliseconds,
TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
//no-op, expected when the transport is timed out by the executor
}
finally {
transport.interrupt();
service.shutdown();
println "Shutdown heartbeat listener at " + DateUtil.newDate();
println "-----"
}
String[] addresses = handler.getAddresses()
int numberSeen = addresses.length
println "Number of nodes observed through control traffic: " + numberSeen
println "Addresses seen: " + addresses
println "-----"
if ((numberSeen <= 1) || (numberSeen != numberOfNodes)) {
println "WARNING: clustering does not appear to be working correctly."
}
}
catch (Exception e) {
println "Exception " + e;
}
public class CustomMulticastDatagramHandler implements DatagramHandler {
public CustomMulticastDatagramHandler(boolean gzipData, boolean shortData) {
_gzipData = gzipData;
_shortData = shortData;
}
public void errorReceived(Throwable t) {
_log.error(t, t);
}
public void process(DatagramPacket packet) {
if (_log.isInfoEnabled()) {
byte[] bytes = packet.getData();
if (_gzipData) {
try {
bytes = getUnzippedBytes(bytes);
}
catch (Exception e) {
_log.error(e, e);
}
}
if (_shortData) {
byte[] temp = new byte[96];
System.arraycopy(bytes, 0, temp, 0, 96);
bytes = temp;
}
StringBundler sb = new StringBundler(4);
sb.append("[");
sb.append(packet.getSocketAddress());
sb.append("] ");
sb.append(new String(bytes));
_log.info(sb);
}
//CUSTOM
_socketAddresses.add(packet.getSocketAddress().toString());
//CUSTOM END
}
//CUSTOM START
public String[] getAddresses() {
return _socketAddresses.toArray();
}
//CUSTOM END
protected byte[] getUnzippedBytes(byte[] bytes) throws Exception {
InputStream is = new GZIPInputStream(
new UnsyncByteArrayInputStream(bytes));
UnsyncByteArrayOutputStream ubaos = new UnsyncByteArrayOutputStream(
bytes.length);
byte[] buffer = new byte[1500];
int c = 0;
while (true) {
if (c == -1) {
break;
}
c = is.read(buffer, 0, 1500);
if (c != -1) {
ubaos.write(buffer, 0, c);
}
}
is.close();
ubaos.flush();
ubaos.close();
return ubaos.toByteArray();
}
private static Log _log = LogFactory.getLog(MulticastDatagramHandler.class);
private boolean _gzipData;
private boolean _shortData;
//CUSTOM START
private Set<String> _socketAddresses = new TreeSet<String>();
//CUSTOM END
}
public class CustomMulticastTransport extends Thread implements Transport {
public CustomMulticastTransport(DatagramHandler handler, String host, int port) {
//Not working in groovy script super("MulticastListener-" + host + port);
setDaemon(true);
_handler = handler;
_host = host;
_port = port;
}
public synchronized void connect() throws IOException {
if (_socket == null) {
_socket = new MulticastSocket(_port);
}
else if (_socket.isConnected() && _socket.isBound()) {
return;
}
_address = InetAddress.getByName(_host);
_socket.joinGroup(_address);
_connected = true;
start();
}
public synchronized void disconnect() {
// Interrupt all processing
if (_address != null) {
try {
_socket.leaveGroup(_address);
_address = null;
}
catch (IOException ioe) {
_log.error("Unable to leave group", ioe);
}
}
_connected = false;
interrupt();
_socket.close();
}
public boolean isConnected() {
return _connected;
}
@Override
public void run() {
try {
while (_connected && !this.isInterrupted()) {
_socket.receive(_inboundPacket);
_handler.process(_inboundPacket);
}
}
catch (IOException ioe) {
_log.error("Unable to process ", ioe);
_socket.disconnect();
_connected = false;
_handler.errorReceived(ioe);
}
}
public synchronized void sendMessage(byte[] bytes) throws IOException {
_outboundPacket.setData(bytes);
_outboundPacket.setAddress(_address);
_outboundPacket.setPort(_port);
_socket.send(_outboundPacket);
}
public synchronized void sendMessage(String message) throws IOException {
sendMessage(message.getBytes());
}
private static Log _log = LogFactory.getLog(MulticastTransport.class);
private InetAddress _address;
private boolean _connected;
private DatagramHandler _handler;
private String _host;
private byte[] _inboundBuffer = new byte[4096];
private DatagramPacket _inboundPacket = new DatagramPacket(
_inboundBuffer, _inboundBuffer.length);
private byte[] _outboundBuffer = new byte[4096];
private DatagramPacket _outboundPacket = new DatagramPacket(
_outboundBuffer, _outboundBuffer.length);
private int _port;
private MulticastSocket _socket;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment