Skip to content

Instantly share code, notes, and snippets.

@Terrance
Created March 31, 2024 11:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Terrance/4efbec3ba72725cc977f2ae8b4500bd1 to your computer and use it in GitHub Desktop.
Save Terrance/4efbec3ba72725cc977f2ae8b4500bd1 to your computer and use it in GitHub Desktop.
SSH tarpit server which accepts requests but stalls them with infinite SSH banners before accepting authentication.
#!/usr/bin/env python3
import asyncio
import logging
from random import randint
LOG = logging.getLogger(__name__)
HOST = "0.0.0.0"
PORT = 2222
DELAY = 10
LENGTH = 32
async def handler(_, writer):
ip, port = writer.get_extra_info("peername")
LOG.info(f"%s:%d connected", ip, port)
count = 0
try:
while True:
await asyncio.sleep(DELAY)
count += 1
LOG.debug("%s:%d receiving line %d", ip, port, count)
writer.write(b"%x\r\n" % randint(0, 2 ** LENGTH))
await writer.drain()
except ConnectionResetError:
LOG.info(f"%s:%d disconnected (%d lines)", ip, port, count)
except TimeoutError:
LOG.info(f"%s:%d timed out (%d lines)", ip, port, count)
except Exception:
LOG.exception(f"%s:%d caused exception (%d lines)", ip, port, count)
async def main():
server = await asyncio.start_server(handler, HOST, PORT)
async with server:
LOG.info("Listening at %s:%d", HOST, PORT)
await server.serve_forever()
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
try:
PORT = int(sys.argv[1])
DELAY = float(sys.argv[2])
LENGTH = int(sys.argv[3])
except IndexError:
pass
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment