Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import atexit
import os
import socket
import subprocess
import shlex
import time
import signal
tcpdump_bin = os.popen('which tcpdump').read().strip()
ss_bin = os.popen('which ss').read().strip()
def tcpdump_start(port):
p = subprocess.Popen(shlex.split('%s -B 16384 --packet-buffered -n -ttttt -i lo port %s' % (tcpdump_bin, port)))
time.sleep(1)
def close():
p.send_signal(signal.SIGINT)
p.wait()
p.close = close
atexit.register(close)
return p
srvd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srvd.bind(('127.0.0.1', 0))
srvd.listen(12)
_, port = srvd.getsockname()
print("[**] Starting tcpdump on port %d" % port)
tcpdump = tcpdump_start(port)
cd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cd.connect(('127.0.0.1', port))
sd, _ = srvd.accept()
sd.shutdown(socket.SHUT_WR)
time.sleep(1)
# this raises broken pipe:
# sd.send(b"asd")
# this is totally fine
cd.send(b"asd")
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment