Skip to content

Instantly share code, notes, and snippets.

@dmgcodevil
Created December 28, 2019 03:48
Show Gist options
  • Save dmgcodevil/22e50eb8f415602bb8d128fe28cdb209 to your computer and use it in GitHub Desktop.
Save dmgcodevil/22e50eb8f415602bb8d128fe28cdb209 to your computer and use it in GitHub Desktop.
Using UDP multicast for pods/peers discovery in Kubernetes

Java app: Publisher/Receiver

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MulticastDiscovery {

    public static void main(String[] args) throws Exception {
        String multicastAddr = System.getenv("MULTICAST_ADDR");
        int multicastPort = Integer.parseInt(System.getenv("MULTICAST_PORT"));

        ExecutorService service = Executors.newFixedThreadPool(2);
        service.submit(new MulticastPublisher(multicastAddr, multicastPort));
        service.submit(new MulticastReceiver(multicastAddr, multicastPort)).get();
    }

    static class MulticastReceiver implements Runnable {

        private final byte[] buf = new byte[256];

        private final String addr;
        private final int port;

        MulticastReceiver(String addr, int port) {
            this.addr = addr;
            this.port = port;
        }


        @Override
        public void run() {
            try (MulticastSocket socket = new MulticastSocket(port)) {
                String selfIp = getSelfIP();
                InetAddress group = InetAddress.getByName(addr);
                socket.joinGroup(group);
                while (!Thread.currentThread().isInterrupted()) {
                    DatagramPacket packet = new DatagramPacket(buf, buf.length);
                    socket.receive(packet);
                    String received = new String(
                            packet.getData(), 0, packet.getLength());

                    if (!packet.getAddress().getHostAddress().equals(selfIp)) {
                        System.out.println(String.format("[%s] received '%s' from %s:%d",
                                selfIp,
                                received,
                                packet.getAddress(),
                                packet.getPort()));

                        if ("end".equals(received)) {
                            break;
                        }
                    }


                }
                socket.leaveGroup(group);

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class MulticastPublisher implements Runnable {

        private final static byte[] MESSAGE_BUF = "ping".getBytes();
        private final String addr;
        private final int port;

        MulticastPublisher(String addr, int port) {
            this.addr = addr;
            this.port = port;
        }

        @Override
        public void run() {
            try (DatagramSocket socket = new DatagramSocket()) {
                InetAddress group = InetAddress.getByName(addr);
                while (!Thread.currentThread().isInterrupted()) {
                    DatagramPacket packet = new DatagramPacket(MESSAGE_BUF, MESSAGE_BUF.length, group, port);
                    socket.send(packet);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

    }

    private static String getSelfIP() throws UnknownHostException {
        InetAddress inetAddress = InetAddress.getLocalHost();
        return inetAddress.getHostAddress();
    }

}

Kube deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: p2p-deployment
  labels:
    app: p2p
spec:
  replicas: 2
  selector:
    matchLabels:
      app: p2p
  template:
    metadata:
      labels:
        app: p2p
    spec:
      containers:
        - name: p2p
          image: p2p:1.0-SNAPSHOT
          env:
            - name: MULTICAST_ADDR
              value: "230.0.0.0"
            - name: MULTICAST_PORT
              value: "4446"

Test:

  1. kubectl apply -f deployment.yaml
  2. kubectl get pods -o wide
NAME                              READY   STATUS    RESTARTS   AGE    IP          NODE             NOMINATED NODE   READINESS GATES
p2p-deployment-5b98b7f4b4-npzkh   1/1     Running   0          3m7s   10.1.0.53   docker-desktop   <none>           <none>
p2p-deployment-5b98b7f4b4-vh9dk   1/1     Running   0          3m7s   10.1.0.54   docker-desktop   <none>           <none>
  1. kubectl logs -f p2p-deployment-5b98b7f4b4-vh9dk
[10.1.0.54] received 'ping' from /10.1.0.53:49766
[10.1.0.54] received 'ping' from /10.1.0.53:49766
....

kubectl logs -f p2p-deployment-5b98b7f4b4-npzkh

[10.1.0.53] received 'ping' from /10.1.0.54:47230
[10.1.0.53] received 'ping' from /10.1.0.54:47230
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment