Skip to content

Instantly share code, notes, and snippets.

@knabben
Created April 11, 2018 11:58
Show Gist options
  • Save knabben/4a7b8a0af6726c38880a31fb8927c052 to your computer and use it in GitHub Desktop.
Save knabben/4a7b8a0af6726c38880a31fb8927c052 to your computer and use it in GitHub Desktop.
psql Sniffer
from utils import now_with_ms
from scapy.all import TCP, sniff
def start_scapy(packets_deque, plot_deque, loop):
print('scapy')
def sniffer(pkt):
out = pkt[TCP].payload
if hasattr(out, 'load'):
out = out.load.decode('utf-8')
if '\r\n' not in out and len(out):
packets_deque.append({'data': out, 'ds': now_with_ms()})q
def mysql_printer(pkt):
""" MySQL packet decoder """
exit_list = ['01'] # Ignore response
fetch_list = ['03'] # COM_QUERY
hexdata = hexstr(str(pkt[TCP].payload), onlyhex=1).split()
header, payload = hexdata[:5], hexdata[6:]
if header and payload and header[0] not in exit_list and \
header[-1] in fetch_list:
print("{} - {}\n".format(str(datetime.now()), pkt[TCP].payload))
def pgsql_printer(pkt):
""" PostgreSQL packet decoder """
output = str(pkt[TCP].payload)
if output and not re.search('SELECT (\d+)', output):
try:
grouper = re.search('SELECT (.*)', output)
output = grouper[0][:-5]
except (re.error, TypeError):
return
else:
output = sqlparse.format(
output, reindent=True, keyword_case='upper')
print('{}{}\n{}{}{}'.format(
fg('blue'),
datetime.now().isoformat(), fg('white'), output, attr(
'reset')))
print('-----')
def main():
loop = asyncio.get_event_loop()
packets_deque = deque()
plot_deque = deque()
with ThreadPoolExecutor(max_workers=5) as executor:
tasks = [executor.submit(task, packets_deque, plot_deque, loop)
for task in [start_scapy]]
for task in tasks:
task.result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment