Skip to content

Instantly share code, notes, and snippets.

@comzyh
Last active May 2, 2022 12:14
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save comzyh/2d593fc3875d80d08b40f5e72c28b1f9 to your computer and use it in GitHub Desktop.
Save comzyh/2d593fc3875d80d08b40f5e72c28b1f9 to your computer and use it in GitHub Desktop.
a Python version of Breed Enter which can be used under linux
#!/usr/bin/env python3
import socket
import time
import threading
import signal
import sys
import os
sending = None
def send_thread_func(s, nic=None):
global sending
while sending:
time.sleep(0.1)
try:
s.sendto(b'BREED:ABORT', ('255.255.255.255', 37541))
except Exception:
print("Stop sending on {}".format(nic))
break
def receive_thread_func(s, nic=None):
global sending
while sending:
data, addr = s.recvfrom(2048)
if data == b'BREED:ABORTED':
print("BREED now running on {}\t{}".format(nic, addr[0]))
print("http://{}".format(addr[0]))
break
sending = False
def start_breed_enter():
global sending
nics = os.listdir('/sys/class/net/')
for nic in nics:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.setsockopt(socket.SOL_SOCKET, 25, nic.encode(
'utf-8')) # SO_BINDTODEVICE
s.bind(('0.0.0.0', 37540))
sending = True
receive_thread = threading.Thread(target=receive_thread_func, args=(s, nic))
send_thread = threading.Thread(target=send_thread_func, args=(s, nic))
receive_thread.daemon = True
send_thread.daemon = True
receive_thread.start()
send_thread.start()
print('start listening on {}'.format(nic))
print("Waiting breed to be discovered....")
while sending:
try:
time.sleep(0.5)
except KeyboardInterrupt:
break
print("QUIT")
sys.exit(0)
def main():
start_breed_enter()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import socket
import time
import threading
import signal
import sys
sending = None
def send_thread_func(s):
global sending
while sending:
time.sleep(0.1)
s.sendto(b'BREED:ABORT', ('255.255.255.255', 37541))
def receive_thread_func(s):
global sending
while sending:
data, addr = s.recvfrom(2048)
if data == b'BREED:ABORTED':
print("BREED now running on {}".format(addr[0]))
print("http://{}".format(addr[0]))
break
sending = False
def start_breed_enter():
global sending
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind(('0.0.0.0', 37540))
sending = True
receive_thread = threading.Thread(target=receive_thread_func, args=(s, ))
send_thread = threading.Thread(target=send_thread_func, args=(s, ))
receive_thread.daemon = True
send_thread.daemon = True
receive_thread.start()
send_thread.start()
print("Waiting breed to be discovered....")
while sending:
try:
time.sleep(0.5)
except KeyboardInterrupt:
break
print("QUIT")
sys.exit(0)
def main():
start_breed_enter()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment