Created
July 17, 2020 01:47
-
-
Save libra146/bfc6332aed81bbd8b8572dc8400f56c1 to your computer and use it in GitHub Desktop.
基于asyncio的全端口异步扫描器
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from asyncio import Queue, TimeoutError, gather | |
from socket import socket, AF_INET, SOCK_STREAM | |
from typing import List | |
from async_timeout import timeout | |
""" | |
基于asyncio的全端口异步扫描器 | |
""" | |
class ScanPort(object): | |
def __init__(self, ip: str = '', time_out: float = 0.1, port: List[int] = None, concurrency: int = 500): | |
if not ip: | |
raise ValueError(f'wrong ip! {ip}') | |
self.ip = ip | |
self.port = port | |
self.result: List[int] = [] | |
self.loop = asyncio.get_event_loop() | |
self.queue = Queue() | |
self.timeout = time_out | |
# 并发数 | |
self.concurrency = concurrency | |
async def scan(self): | |
while True: | |
t1 = time.time() | |
port = await self.queue.get() | |
sock = socket(AF_INET, SOCK_STREAM) | |
try: | |
with timeout(self.timeout): | |
# 这里windows和Linux返回值不一样 | |
# windows返回sock对象,Linux返回None | |
await self.loop.sock_connect(sock, (self.ip, port)) | |
t2 = time.time() | |
# 所以这里直接直接判断sock | |
if sock: | |
self.result.append(port) | |
print(time.strftime('%Y-%m-%d %H:%M:%S'), port, 'open', round(t2 - t1, 2)) | |
# 这里要捕获所有可能的异常,windows会抛出前两个异常,Linux直接抛最后一个异常 | |
# 如果有异常不处理的话会卡在这 | |
except (TimeoutError, PermissionError, ConnectionRefusedError) as _: | |
sock.close() | |
sock.close() | |
self.queue.task_done() | |
async def start(self): | |
start = time.time() | |
if self.port: | |
for a in self.port: | |
self.queue.put_nowait(a) | |
else: | |
for a in range(1, 65535 + 1): | |
self.queue.put_nowait(a) | |
task = [self.loop.create_task(self.scan()) for _ in range(self.concurrency)] | |
# 如果队列不为空,则一直在这里阻塞 | |
await self.queue.join() | |
# 依次退出 | |
for a in task: | |
a.cancel() | |
# Wait until all worker tasks are cancelled. | |
await gather(*task, return_exceptions=True) | |
print(f'扫描所用时间为:{time.time() - start:.2f}') | |
if __name__ == '__main__': | |
scan = ScanPort('127.0.0.1') | |
scan.loop.run_until_complete(scan.start()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment