Created
September 16, 2021 07:15
-
-
Save astropenguin/7aa072c934cdda4000ac2d06ec115167 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
from datetime import datetime | |
BUFSIZE = 4096 | |
LOCAL_IP = "192.168.101.88" | |
MULTICAST_IP = "239.0.0.1" | |
MULTICAST_PORT = 6000 | |
RECV_INTERVAL = 0.0 | |
def log(message: str, sep: str = ", ") -> str: | |
"""Simple print-based logger.""" | |
print(datetime.now().isoformat()[:-3], message, sep=sep) | |
return message | |
def main() -> None: | |
"""Receive data from a multicast address/port.""" | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
sock.setsockopt( | |
socket.SOL_SOCKET, | |
socket.SO_REUSEADDR, | |
1, | |
) | |
sock.bind(("", MULTICAST_PORT)) | |
sock.setsockopt( | |
socket.IPPROTO_IP, | |
socket.IP_ADD_MEMBERSHIP, | |
socket.inet_aton(MULTICAST_IP) + socket.inet_aton(LOCAL_IP), | |
) | |
while True: | |
log(sock.recv(BUFSIZE).decode()) | |
time.sleep(RECV_INTERVAL) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
from datetime import datetime | |
from uuid import uuid4 | |
LOCAL_IP = "192.168.101.88" | |
MULTICAST_IP = "239.0.0.1" | |
MULTICAST_PORT = 6000 | |
SEND_INTERVAL = 1.0 | |
def log(message: str, sep: str = ", ") -> str: | |
"""Simple print-based logger.""" | |
print(datetime.now().isoformat()[:-3], message, sep=sep) | |
return message | |
def main() -> None: | |
"""Send data to a multicast address/port.""" | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
sock.setsockopt( | |
socket.IPPROTO_IP, | |
socket.IP_MULTICAST_IF, | |
socket.inet_aton(LOCAL_IP), | |
) | |
sock.connect((MULTICAST_IP, MULTICAST_PORT)) | |
while True: | |
data = log(str(uuid4())[:8]) | |
sock.send(data.encode()) | |
time.sleep(SEND_INTERVAL) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment