Java app: Publisher/Receiver
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MulticastDiscovery {
public static void main(String[] args) throws Exception {
String multicastAddr = System.getenv("MULTICAST_ADDR");
int multicastPort = Integer.parseInt(System.getenv("MULTICAST_PORT"));
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(new MulticastPublisher(multicastAddr, multicastPort));
service.submit(new MulticastReceiver(multicastAddr, multicastPort)).get();
}
static class MulticastReceiver implements Runnable {
private final byte[] buf = new byte[256];
private final String addr;
private final int port;
MulticastReceiver(String addr, int port) {
this.addr = addr;
this.port = port;
}
@Override
public void run() {
try (MulticastSocket socket = new MulticastSocket(port)) {
String selfIp = getSelfIP();
InetAddress group = InetAddress.getByName(addr);
socket.joinGroup(group);
while (!Thread.currentThread().isInterrupted()) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
String received = new String(
packet.getData(), 0, packet.getLength());
if (!packet.getAddress().getHostAddress().equals(selfIp)) {
System.out.println(String.format("[%s] received '%s' from %s:%d",
selfIp,
received,
packet.getAddress(),
packet.getPort()));
if ("end".equals(received)) {
break;
}
}
}
socket.leaveGroup(group);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
static class MulticastPublisher implements Runnable {
private final static byte[] MESSAGE_BUF = "ping".getBytes();
private final String addr;
private final int port;
MulticastPublisher(String addr, int port) {
this.addr = addr;
this.port = port;
}
@Override
public void run() {
try (DatagramSocket socket = new DatagramSocket()) {
InetAddress group = InetAddress.getByName(addr);
while (!Thread.currentThread().isInterrupted()) {
DatagramPacket packet = new DatagramPacket(MESSAGE_BUF, MESSAGE_BUF.length, group, port);
socket.send(packet);
Thread.sleep(1000);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private static String getSelfIP() throws UnknownHostException {
InetAddress inetAddress = InetAddress.getLocalHost();
return inetAddress.getHostAddress();
}
}
Kube deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: p2p-deployment
labels:
app: p2p
spec:
replicas: 2
selector:
matchLabels:
app: p2p
template:
metadata:
labels:
app: p2p
spec:
containers:
- name: p2p
image: p2p:1.0-SNAPSHOT
env:
- name: MULTICAST_ADDR
value: "230.0.0.0"
- name: MULTICAST_PORT
value: "4446"
Test:
kubectl apply -f deployment.yaml
kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
p2p-deployment-5b98b7f4b4-npzkh 1/1 Running 0 3m7s 10.1.0.53 docker-desktop <none> <none>
p2p-deployment-5b98b7f4b4-vh9dk 1/1 Running 0 3m7s 10.1.0.54 docker-desktop <none> <none>
kubectl logs -f p2p-deployment-5b98b7f4b4-vh9dk
[10.1.0.54] received 'ping' from /10.1.0.53:49766
[10.1.0.54] received 'ping' from /10.1.0.53:49766
....
kubectl logs -f p2p-deployment-5b98b7f4b4-npzkh
[10.1.0.53] received 'ping' from /10.1.0.54:47230
[10.1.0.53] received 'ping' from /10.1.0.54:47230