Last active
March 17, 2020 15:05
-
-
Save BriFuture/5789fef5db9d233d2a405c0cfd6a8462 to your computer and use it in GitHub Desktop.
查找局域网中的可用设备,相应机器上应该以 server 模式运行该脚本,查找机器上以 local 模式运行该脚本
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
""" | |
## 如何查找特定设备的 IP | |
有几种方法在局域网中找到某个设备(设为设备 A)的 IP 地址: | |
1. 在设备 A 上运行一段程序,该程序每隔一段时间向局域网中发送广播包(UDP 广播包),(设备 B)上运行另一个程序监听相应的端口,当接收到特定格式的消息时认为收到正确的消息,此时在命令行中打印出来的的远程设备的 IP 地址即为需要的 IP。 | |
2. 在设备 A 上运行一段程序,该程序监听预先约定好的端口,在设备 B 上向所有 IP 地址的该端口(广播)发送消息,远程设备回复时即可得到对应的 IP。 | |
注意发送 UDP 的广播包时,将 IP 地址设为 "255.255.255.255" 即可广播到整个网络,设为 "192.168.0.255" 可广播到 "192.168.0.0/24" 的网络。如果设为其他的 IP 地址如 “192.168.255.255” 则程序运行时会报错。 | |
## 代码地址: | |
https://gist.github.com/BriFuture/5789fef5db9d233d2a405c0cfd6a8462 | |
update: 19-03-01 | |
1. autematically check whether SERVER_RECV_PORT is useable, and if it is not useable and this script is running under server mode, | |
then it won't start UdpServer, | |
2. replace print function with logger, when running under local mode, logger will stream message to stdout | |
""" | |
# 功能:发送广播包或接受心跳包 | |
SERVER_RECV_PORT = 7000 | |
LOCAL_RECV_PORT = 7007 | |
import socket | |
def createLogger(stream = False): | |
import logging | |
from pathlib import Path | |
log_file = Path.home() / ".shells" | |
if( not log_file.exists() ): | |
log_file.mkdir(parents=True) | |
log_file = log_file / "broadcast.log" | |
logger = logging.getLogger() | |
fh = logging.FileHandler(log_file) | |
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s")) | |
fh.setLevel(logging.DEBUG) | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(fh) | |
if stream: | |
sh = logging.StreamHandler() | |
sh.setLevel(logging.DEBUG) | |
sh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s")) | |
logger.addHandler(sh) | |
return logger | |
logger = None | |
isLocal = False | |
def local_machine(): | |
"""send broadcast udp packet to see who will response | |
""" | |
con = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
con.settimeout(3) | |
con.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
con.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
serIp = "255.255.255.255" | |
con.sendto(b"'test'", (serIp, SERVER_RECV_PORT)) | |
logger.info("Wait Response") | |
try: | |
msg, addr = con.recvfrom(1024) | |
logger.info(f"Recv from {addr[0]}: {msg}") | |
except Exception as e: | |
logger.warning(e) | |
con.close() | |
from socketserver import UDPServer, BaseRequestHandler | |
import time | |
import json | |
MsgHeaderLen = len('"camera_1":') | |
class UdpHandler(BaseRequestHandler): | |
def handle(self): | |
msg, sock = self.request | |
resp = time.ctime() | |
# msgObject = json.loads(msg[MsgHeaderLen:]) | |
logger.info(f"recved from {self.client_address}: {msg}") | |
# sock.sendto(resp.encode("ascii"), (self.client_address[0], LOCAL_RECV_PORT)) | |
sock.sendto(resp.encode("ascii"), self.client_address) | |
class Interval(object): | |
def __init__(self): | |
import socket | |
con = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
con.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
con.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
con.settimeout(5) | |
self.con = con | |
def send(self): | |
try: | |
self.con.sendto(b"HeartBeat", ("255.255.255.255", LOCAL_RECV_PORT)) | |
msg, addr = self.con.recvfrom(1024) | |
logger.info(f"Recv from {addr[0]}: {msg}") | |
except Exception as e: | |
logger.warning("Heart beat sent but no response: {}".format(e)) | |
def serve_forever(self): | |
while True: | |
self.send() | |
time.sleep(10) | |
def server(): | |
from threading import Thread | |
inter = Interval() | |
t = Thread(target=inter.serve_forever, daemon=True) | |
t.start() | |
server = UDPServer(("", SERVER_RECV_PORT), UdpHandler) | |
try: | |
server.serve_forever() | |
except Exception as e: | |
logger.warning(e) | |
def isServerPortAvailable(port: int) -> bool: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.settimeout(3) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 0) | |
sock.sendto(b"'test port available'", ("127.0.0.1", SERVER_RECV_PORT)) | |
try: | |
msg, addr = sock.recvfrom(1024) | |
logger.info("{} is not available".format(port)) | |
except Exception as e: | |
logger.info("{} is available".format(port)) | |
logger.warning(e) | |
sock.close() | |
return True | |
sock.close() | |
return False | |
if __name__ == '__main__': | |
import sys | |
if len(sys.argv) > 1 and sys.argv[1] == "local": | |
isLocal = True | |
if isLocal: | |
logger = createLogger(stream=True) | |
logger.info("Running Local Mode") | |
local_machine() | |
server = UDPServer(("", LOCAL_RECV_PORT), UdpHandler) | |
try: | |
server.serve_forever() | |
except Exception as e: | |
logger.warning(e) | |
elif isServerPortAvailable(SERVER_RECV_PORT): | |
logger = createLogger(stream=False) | |
logger.info("Running Server Mode") | |
server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment