Skip to content

Instantly share code, notes, and snippets.

@astropenguin
Created September 16, 2021 07:15
Show Gist options
  • Save astropenguin/7aa072c934cdda4000ac2d06ec115167 to your computer and use it in GitHub Desktop.
Save astropenguin/7aa072c934cdda4000ac2d06ec115167 to your computer and use it in GitHub Desktop.
import socket
import time
from datetime import datetime
BUFSIZE = 4096
LOCAL_IP = "192.168.101.88"
MULTICAST_IP = "239.0.0.1"
MULTICAST_PORT = 6000
RECV_INTERVAL = 0.0
def log(message: str, sep: str = ", ") -> str:
"""Simple print-based logger."""
print(datetime.now().isoformat()[:-3], message, sep=sep)
return message
def main() -> None:
"""Receive data from a multicast address/port."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1,
)
sock.bind(("", MULTICAST_PORT))
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(MULTICAST_IP) + socket.inet_aton(LOCAL_IP),
)
while True:
log(sock.recv(BUFSIZE).decode())
time.sleep(RECV_INTERVAL)
if __name__ == "__main__":
main()
import socket
import time
from datetime import datetime
from uuid import uuid4
LOCAL_IP = "192.168.101.88"
MULTICAST_IP = "239.0.0.1"
MULTICAST_PORT = 6000
SEND_INTERVAL = 1.0
def log(message: str, sep: str = ", ") -> str:
"""Simple print-based logger."""
print(datetime.now().isoformat()[:-3], message, sep=sep)
return message
def main() -> None:
"""Send data to a multicast address/port."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_MULTICAST_IF,
socket.inet_aton(LOCAL_IP),
)
sock.connect((MULTICAST_IP, MULTICAST_PORT))
while True:
data = log(str(uuid4())[:8])
sock.send(data.encode())
time.sleep(SEND_INTERVAL)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment