Last active
October 19, 2021 03:35
-
-
Save vimiix/40f2339e6fd2973efe0d8c4c0e436afc to your computer and use it in GitHub Desktop.
批量发送ping包测试网络连通性
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def _async_ping(ip): | |
"""发送ping包 | |
探测策略:发送3次包,每次间隔0.5s, 超时时间为2s | |
判断依据:指令执行的退出状态码是否正常为0 | |
Args: | |
ip (string): IP | |
Returns: | |
2-tuple: (IP, bool) | |
""" | |
proc = await asyncio.create_subprocess_shell( | |
f"ping -c 3 -i 0.5 -t 2 {ip}", | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE) | |
await proc.communicate() | |
return ip, proc.returncode == 0 | |
async def _async_ping_ips(ips, wait_timeout=5): | |
"""创建批量 ping 的异步任务并启动并发执行,等待退出 | |
Args: | |
ips (string list): IP列表 | |
Returns: | |
dict: {IP: bool} | |
""" | |
tasks = [asyncio.create_task(_async_ping(ip)) for ip in ips] | |
completed, pending = await asyncio.wait(tasks, timeout=wait_timeout) | |
for t in pending: | |
t.cancel() | |
result = {ip: False for ip in ips} | |
for t in completed: | |
res = t.result() | |
result[res[0]] = res[1] | |
return result | |
def parallel_ping(ips): | |
"""批量发送ping包测试网络连通性 | |
Args: | |
ips (string list): IP列表 | |
Returns: | |
dict: {ip: bool} | |
""" | |
loop = asyncio.get_event_loop() | |
return loop.run_until_complete(_async_ping_ips(ips)) | |
if __name__ == '__main__': | |
parallel_ping(['8.8.8.8', '1.2.3.4']) | |
# Output: | |
# {'8.8.8.8': True, '1.2.3.4': False} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment