Skip to content

Instantly share code, notes, and snippets.

@vimiix
Last active October 19, 2021 03:35
Show Gist options
  • Save vimiix/40f2339e6fd2973efe0d8c4c0e436afc to your computer and use it in GitHub Desktop.
Save vimiix/40f2339e6fd2973efe0d8c4c0e436afc to your computer and use it in GitHub Desktop.
批量发送ping包测试网络连通性
import asyncio
async def _async_ping(ip):
"""发送ping包
探测策略:发送3次包,每次间隔0.5s, 超时时间为2s
判断依据:指令执行的退出状态码是否正常为0
Args:
ip (string): IP
Returns:
2-tuple: (IP, bool)
"""
proc = await asyncio.create_subprocess_shell(
f"ping -c 3 -i 0.5 -t 2 {ip}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
await proc.communicate()
return ip, proc.returncode == 0
async def _async_ping_ips(ips, wait_timeout=5):
"""创建批量 ping 的异步任务并启动并发执行,等待退出
Args:
ips (string list): IP列表
Returns:
dict: {IP: bool}
"""
tasks = [asyncio.create_task(_async_ping(ip)) for ip in ips]
completed, pending = await asyncio.wait(tasks, timeout=wait_timeout)
for t in pending:
t.cancel()
result = {ip: False for ip in ips}
for t in completed:
res = t.result()
result[res[0]] = res[1]
return result
def parallel_ping(ips):
"""批量发送ping包测试网络连通性
Args:
ips (string list): IP列表
Returns:
dict: {ip: bool}
"""
loop = asyncio.get_event_loop()
return loop.run_until_complete(_async_ping_ips(ips))
if __name__ == '__main__':
parallel_ping(['8.8.8.8', '1.2.3.4'])
# Output:
# {'8.8.8.8': True, '1.2.3.4': False}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment