Skip to content

Instantly share code, notes, and snippets.

@jianingy
Last active December 3, 2019 03:05
Show Gist options
  • Save jianingy/84ff1dd4887ce3b4db03 to your computer and use it in GitHub Desktop.
Save jianingy/84ff1dd4887ce3b4db03 to your computer and use it in GitHub Desktop.
read snort alert from unix domain socket
#!/virtualenvs/snort/bin/python
#
# This piece of code is written by
# Jianing Yang <jianingy.yang@gmail.com>
# with love and passion!
#
# H A P P Y H A C K I N G !
# _____ ______
# ____==== ]OO|_n_n__][. | |
# [________]_|__|________)< |YANG|
# oo oo 'oo OOOO-| oo\\_ ~o~~o~
# +--+--+--+--+--+--+--+--+--+--+--+--+--+
# 9 Mar, 2016
#
from cStringIO import StringIO
from scapy.all import Ether, IP, TCP
from tornado.ioloop import IOLoop
import errno
import os
import scapy
import socket
import struct
import logging
LOG = logging.getLogger('snort.client')
_ALERTMSG_LENGTH = 256
_SNAP_LENGTH = 1514
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
class SnortClient(object):
def __init__(self, socket_name):
self.io_loop = IOLoop.instance()
self._state = None
self._read_buffer = StringIO()
self._read_chunk_size = 4096
self._socket_name = socket_name
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self._socket.setblocking(0)
def _add_io_state(self, state):
if self._state is None:
self._state = IOLoop.ERROR
self.io_loop.add_handler(
self._socket.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self._socket.fileno(), self._state)
def _handle_events(self, fd, events):
if events & self.io_loop.READ:
self._handle_read()
if events & self.io_loop.ERROR:
LOG.error('%s event error' % self)
def _read_from_fd(self):
try:
chunk = self._socket.recv(self._read_chunk_size)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
return None
return chunk
def _read_to_buffer(self):
chunk = self._read_from_fd()
if chunk is None:
return 0
self._read_buffer.write(chunk)
return len(chunk)
def _handle_read(self):
while self._read_to_buffer():
pass
data = self._read_buffer.getvalue()
self._read_buffer.close()
self._read_buffer = StringIO()
if self._read_callback:
self._read_callback(data)
def _parse(self, data):
fmt = '<%ds9l%ds' % (_ALERTMSG_LENGTH, _SNAP_LENGTH)
size = struct.calcsize(fmt)
(msg,
ts_sec, ts_usec,
caplen, pktlen,
dlthdr, nethdr, transhdr,
data, val, pkt) = struct.unpack(fmt, data[:size])
msg = msg.strip('\x00')
ether = Ether(pkt)
if self._on_alert:
self._on_alert(msg, ether[IP].src, ether[IP].sport,
ether[IP].dst, ether[IP].dport)
def bind(self):
self._socket.bind(self._socket_name)
self._add_io_state(None)
def read(self, callback=None):
self._read_callback = callback
self._add_io_state(IOLoop.READ)
def start(self, on_alert=None):
self._on_alert = on_alert
self.read(self._parse)
def on_alert(msg, src, sport, dst, dport):
print msg, src, sport
if __name__ == '__main__':
try:
os.remove("/var/log/snort/snort_alert")
except OSError:
pass
client = SnortClient('/var/log/snort/snort_alert')
client.bind()
client.start(on_alert)
io_loop = IOLoop.instance()
io_loop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment