Skip to content

Instantly share code, notes, and snippets.

@xupeng
Created May 6, 2014 02:11
Show Gist options
  • Save xupeng/d720dd84ff3d83f8c0c9 to your computer and use it in GitHub Desktop.
Save xupeng/d720dd84ff3d83f8c0c9 to your computer and use it in GitHub Desktop.
mysqlquerysniffer.py
#!/usr/bin/env python
#coding=utf8
""" A MySQL query sniffer
"""
__author__ = "Xupeng Yun <recordus@gmail.com>"
__version__ = "$Revision: 8817 $"
__date__ = "$Date: 2012-07-09 12:18:28 +0800 (Mon, 09 Jul 2012) $"
import os
import re
import sys
import pcap
import dpkt
import time
import struct
import socket
from datetime import datetime
from optparse import OptionParser
COM_QUERY = 3
def sniff(interface, port=3306, src_ip=None, dst_ip=None, regex=None,
start_timpstamp=None, stop_timpstamp=None):
"""Sniff MySQL queries(COM_QUERY)"""
pc = pcap.pcap(interface)
_filter = 'tcp dst port %s' % port
if src_ip:
_filter += ' and src %s' % src_ip
if dst_ip:
_filter += ' and dst %s' % dst_ip
pc.setfilter(_filter)
re_match = regex and re.compile(regex, re.I) or None
partials = {}
for ptime, pdata in pc:
if start_timpstamp and ptime < start_timpstamp:
continue
if stop_timpstamp and ptime > stop_timpstamp:
continue
ether_pkt = dpkt.ethernet.Ethernet(pdata)
ip_pkt = ether_pkt.data
tcp_pkt = ip_pkt.data
tcp_data = tcp_pkt.data
src = socket.inet_ntoa(ip_pkt.src)
sport = tcp_pkt.sport
dst = socket.inet_ntoa(ip_pkt.dst)
dport = tcp_pkt.dport
ack = tcp_pkt.ack
if partials.has_key(ack):
partial = partials[ack]
partial.append(tcp_data)
query = ''.join(partial[1:])
if partial[0] == len(query):
del partials[ack]
yield (ptime, query, src, sport, dst, dport)
continue
# MySQL command header are 4 bytes
# --------------------------------------------------
# | packet length(3 bytes) | packet number(1 byte) |
# --------------------------------------------------
if len(tcp_data) < 4:
continue
pkt_header = tcp_data[:3] + '\x00' + tcp_data[3]
pkt_len, pkt_no = struct.unpack('<IB', pkt_header)
if pkt_len < 2:
continue
pkt_data = tcp_data[4:]
command_type, = struct.unpack('B', pkt_data[0])
if command_type != COM_QUERY:
continue
query = pkt_data[1:]
query_length = pkt_len - 1
if len(query) < query_length:
partial = partials.setdefault(tcp_pkt.ack, [])
if not partial:
partial.append(query_length)
partial.append(query)
continue
if not re_match or re_match.search(query):
yield (ptime, query, src, sport, dst, dport)
def str_to_timestamp(timestr):
if not timestr or not isinstance(timestr, basestring):
return None
try:
_datetime = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
return time.mktime(_datetime.timetuple())
except ValueError:
return None
def main():
parser = OptionParser()
parser.add_option('-i', '--interface')
parser.add_option('-p', '--port')
parser.add_option('-s', '--src-ip')
parser.add_option('-d', '--dst-ip')
parser.add_option('--start-datetime',
help='start datetime with format: yyyy-mm-dd hh:mm:ss')
parser.add_option('--stop-datetime',
help='stop datetime with format: yyyy-mm-dd hh:mm:ss')
parser.add_option('-v', '--verbose', action='store_true')
parser.add_option('-r', '--regex')
parser.add_option('-q', '--quiet', action='store_true', default=False)
options, args = parser.parse_args()
if os.geteuid() != 0:
print 'You must be root to run', sys.argv[0]
return 1
if not options.interface or not options.port:
print 'Interface and MySQL server port must be set'
return 1
interface = options.interface
port = options.port
src_ip = options.src_ip
dst_ip = options.dst_ip
regex = options.regex or None
start_timpstamp = str_to_timestamp(options.start_datetime)
if options.start_datetime and not start_timpstamp:
print >>sys.stderr, 'Invalid start datetime: "%s"' % options.start_datetime
return 1
stop_timpstamp = str_to_timestamp(options.stop_datetime)
if options.stop_datetime and not stop_timpstamp:
print >>sys.stderr, 'Invalid stop datetime: "%s"' % options.stop_datetime
return 1
count = 0
start = time.time()
try:
for query in sniff(interface, port, src_ip=src_ip, dst_ip=dst_ip,
regex=regex,
start_timpstamp=start_timpstamp,
stop_timpstamp=stop_timpstamp):
(ptime, sql, src_ip, sport, dst_ip, dport) = query
sql = ' '.join([p.strip() for p in sql.splitlines()]) + ';'
count += 1
if options.verbose:
ptime = datetime.fromtimestamp(ptime)
print '%s %s:%s => %s:%s' % (ptime, src_ip, sport, dst_ip, dport), sql
else:
print sql
except KeyboardInterrupt:
duration = time.time() - start
qps = int(count / duration)
print '%d queries in %d seconds, %d/s' % (count, duration, qps)
print 'Done'
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment