Last active
December 3, 2019 03:05
-
-
Save jianingy/84ff1dd4887ce3b4db03 to your computer and use it in GitHub Desktop.
read snort alert from unix domain socket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/virtualenvs/snort/bin/python | |
# | |
# This piece of code is written by | |
# Jianing Yang <jianingy.yang@gmail.com> | |
# with love and passion! | |
# | |
# H A P P Y H A C K I N G ! | |
# _____ ______ | |
# ____==== ]OO|_n_n__][. | | | |
# [________]_|__|________)< |YANG| | |
# oo oo 'oo OOOO-| oo\\_ ~o~~o~ | |
# +--+--+--+--+--+--+--+--+--+--+--+--+--+ | |
# 9 Mar, 2016 | |
# | |
from cStringIO import StringIO | |
from scapy.all import Ether, IP, TCP | |
from tornado.ioloop import IOLoop | |
import errno | |
import os | |
import scapy | |
import socket | |
import struct | |
import logging | |
LOG = logging.getLogger('snort.client') | |
_ALERTMSG_LENGTH = 256 | |
_SNAP_LENGTH = 1514 | |
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN) | |
class SnortClient(object): | |
def __init__(self, socket_name): | |
self.io_loop = IOLoop.instance() | |
self._state = None | |
self._read_buffer = StringIO() | |
self._read_chunk_size = 4096 | |
self._socket_name = socket_name | |
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) | |
self._socket.setblocking(0) | |
def _add_io_state(self, state): | |
if self._state is None: | |
self._state = IOLoop.ERROR | |
self.io_loop.add_handler( | |
self._socket.fileno(), self._handle_events, self._state) | |
elif not self._state & state: | |
self._state = self._state | state | |
self.io_loop.update_handler(self._socket.fileno(), self._state) | |
def _handle_events(self, fd, events): | |
if events & self.io_loop.READ: | |
self._handle_read() | |
if events & self.io_loop.ERROR: | |
LOG.error('%s event error' % self) | |
def _read_from_fd(self): | |
try: | |
chunk = self._socket.recv(self._read_chunk_size) | |
except socket.error as e: | |
if e.args[0] in _ERRNO_WOULDBLOCK: | |
return None | |
else: | |
raise | |
if not chunk: | |
return None | |
return chunk | |
def _read_to_buffer(self): | |
chunk = self._read_from_fd() | |
if chunk is None: | |
return 0 | |
self._read_buffer.write(chunk) | |
return len(chunk) | |
def _handle_read(self): | |
while self._read_to_buffer(): | |
pass | |
data = self._read_buffer.getvalue() | |
self._read_buffer.close() | |
self._read_buffer = StringIO() | |
if self._read_callback: | |
self._read_callback(data) | |
def _parse(self, data): | |
fmt = '<%ds9l%ds' % (_ALERTMSG_LENGTH, _SNAP_LENGTH) | |
size = struct.calcsize(fmt) | |
(msg, | |
ts_sec, ts_usec, | |
caplen, pktlen, | |
dlthdr, nethdr, transhdr, | |
data, val, pkt) = struct.unpack(fmt, data[:size]) | |
msg = msg.strip('\x00') | |
ether = Ether(pkt) | |
if self._on_alert: | |
self._on_alert(msg, ether[IP].src, ether[IP].sport, | |
ether[IP].dst, ether[IP].dport) | |
def bind(self): | |
self._socket.bind(self._socket_name) | |
self._add_io_state(None) | |
def read(self, callback=None): | |
self._read_callback = callback | |
self._add_io_state(IOLoop.READ) | |
def start(self, on_alert=None): | |
self._on_alert = on_alert | |
self.read(self._parse) | |
def on_alert(msg, src, sport, dst, dport): | |
print msg, src, sport | |
if __name__ == '__main__': | |
try: | |
os.remove("/var/log/snort/snort_alert") | |
except OSError: | |
pass | |
client = SnortClient('/var/log/snort/snort_alert') | |
client.bind() | |
client.start(on_alert) | |
io_loop = IOLoop.instance() | |
io_loop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment