Skip to content

Instantly share code, notes, and snippets.

@zed
Last active March 30, 2023 18:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save zed/6c8dde3216c005232934 to your computer and use it in GitHub Desktop.
Save zed/6c8dde3216c005232934 to your computer and use it in GitHub Desktop.
React to snort alert
#!/usr/bin/env python
"""Run `snort -A console` command using a pipe.
Warning! Alerts are delayed until snort's stdout buffer is flushed.
"""
from __future__ import print_function
from subprocess import Popen, PIPE, STDOUT
snort_process = Popen(['snort', '-A', 'console', '-c', 'snort.conf'],
stdout=PIPE, stderr=STDOUT, bufsize=1,
universal_newlines=True, close_fds=True)
with snort_process.stdout:
for line in iter(snort_process.stdout.readline, ''):
#XXX run python script here:
# `subprocess.call([sys.executable or 'python', '-m', 'your_module'])`
print(line, end='')
rc = snort_process.wait()
#!/usr/bin/env python
"""Run `snort -A console` and print each alert as soon as it is generated using pty."""
import os
import pty
from subprocess import Popen, STDOUT
# start snort process
master_fd, slave_fd = pty.openpty() # provide tty to enable
# line-buffering on snort's side
snort_process = Popen(['snort', '-A', 'console', '-c', 'snort.conf'],
stdout=slave_fd, stderr=STDOUT, close_fds=True)
os.close(slave_fd)
with os.fdopen(master_fd, 'Ur') as pipe:
try:
for line in iter(pipe.readline, ''):
# detect alert based on the output and run the script here
print('got ' + repr(line))
except KeyboardInterrupt:
pass
#NOTE: no need to handle EIO -- there is no EOF (snort runs forever)
#!/usr/bin/env python
"""Run snort and print each alert as soon as it is generated using Unix domain sockets."""
import ctypes
import os
import socket
from datetime import datetime
from subprocess import Popen
from snort import Alertpkt
# listen for alerts using unix domain sockets (UDS)
UNSOCK_FILE = 'snort_alert'
snort_log_dir = os.getcwd()
server_address = os.path.join(snort_log_dir, UNSOCK_FILE)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
os.remove(server_address)
except OSError:
pass
sock.bind(server_address)
# start snort process
snort_process = Popen(['snort', '-A', 'unsock', '-l', snort_log_dir,
'-c', 'snort.conf'], close_fds=True)
# receive alerts
alert = Alertpkt()
try:
while 1:
if sock.recv_into(alert) != ctypes.sizeof(alert):
break # EOF
#run python script here `subprocess.call([sys.executable or 'python', '-m', 'your_module'])`
print("{:03d} {:%H:%M:%S}".format(alert.val, datetime.fromtimestamp(alert.data)))
except KeyboardInterrupt:
pass
finally:
sock.close()
os.remove(server_address)
if snort_process.poll() is None: # the process is still running
snort_process.kill()
snort_process.wait() # wait for snort process to exit
#!/usr/bin/env python
"""A dummy snort-like program (for testing)."""
from __future__ import print_function
import argparse
import os
import random
import socket
import sys
import time
from contextlib import closing
from snort import Alertpkt
def generate_alerts():
"""Generate dummy alerts"""
alert = Alertpkt()
"""[
('alertmsg', (c_uint8 * ALERTMSG_LENGTH)),
('pkth', pcap_pkthdr),
('dlthdr', c_uint32),
('nethdr', c_uint32),
('transhdr', c_uint32),
('data', c_uint32),
('val', c_uint32),
('pkt', (c_uint8 * SNAPLEN)),
('event', Event)]
"""
alert.val = 0
while 1:
time.sleep(random.randrange(3))
alert.data = int(time.time())
alert.val += 1
yield alert
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-A', dest='alert', choices=['unsock', 'console'])
parser.add_argument('-l', dest='log_dir')
args, _ = parser.parse_known_args()
if args.alert == 'unsock':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.connect(os.path.join(args.log_dir, 'snort_alert'))
with closing(sock):
for alert in generate_alerts():
sock.sendall(alert)
elif args.alert == 'console':
for alert in generate_alerts():
print(Alertpkt.str_format.format(alert))
if __name__ == "__main__":
try:
print('Press Ctrl+C to exit..', file=sys.stderr)
sys.stderr.flush()
main()
except KeyboardInterrupt:
print('Exiting..', file=sys.stderr)
"""Provide ctypes' struct Alertpkt definition from spo_alert_unixsock.h"""
from ctypes import *
ALERTMSG_LENGTH = 256 # decode.h
SNAPLEN = 1514 # decode.h
"""
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
"""
# LONGWORD -- type of __WORDSIZE bits, traditionally long
time_t = suseconds_t = c_long
class timeval(Structure):
_fields_ = [('tv_sec', time_t),
('tv_usec', suseconds_t)]
"""
00037 typedef struct _Event
00038 {
00039 u_int32_t sig_generator; /* which part of snort generated the alert? */
00040 u_int32_t sig_id; /* sig id for this generator */
00041 u_int32_t sig_rev; /* sig revision for this id */
00042 u_int32_t classification; /* event classification */
00043 u_int32_t priority; /* event priority */
00044 u_int32_t event_id; /* event ID */
00045 u_int32_t event_reference; /* reference to other events that have gone off,
00046 * such as in the case of tagged packets...
00047 */
00048 struct timeval ref_time; /* reference time for the event reference */
00049
00050 /* Don't add to this structure because this is the serialized data
00051 * struct for unified logging.
00052 */
00053 } Event;
"""
class Event(Structure):
_fields_ = [('sig_generator', c_uint32),
('sig_id', c_uint32),
('sig_rev', c_uint32),
('classification', c_uint32),
('priority', c_uint32),
('event_id', c_uint32),
('event_reference', c_uint32),
('ref_time', timeval),
]
"""
/*
00143 * Each packet in the dump file is prepended with this generic header.
00144 * This gets around the problem of different headers for different
00145 * packet interfaces.
00146 */
00147 struct pcap_pkthdr {
00148 struct timeval ts; /* time stamp */
00149 bpf_u_int32 caplen; /* length of portion present */
00150 bpf_u_int32 len; /* length this packet (off wire) */
00151 };
"""
class pcap_pkthdr(Structure):
_fields_ = [('ts', timeval),
('caplen', c_uint32),
('len', c_uint32)]
"""
/* this struct is for the alert socket code.... */
00035 typedef struct _Alertpkt
00036 {
00037 u_int8_t alertmsg[ALERTMSG_LENGTH]; /* variable.. */
00038 struct pcap_pkthdr pkth;
00039 u_int32_t dlthdr; /* datalink header offset. (ethernet, etc.. ) */
00040 u_int32_t nethdr; /* network header offset. (ip etc...) */
00041 u_int32_t transhdr; /* transport header offset (tcp/udp/icmp ..) */
00042 u_int32_t data;
00043 u_int32_t val; /* which fields are valid. (NULL could be
00044 * valids also)
00045 * */
00046 /* Packet struct --> was null */
00047 #define NOPACKET_STRUCT 0x1
00048 /* no transport headers in packet */
00049 #define NO_TRANSHDR 0x2
00050 u_int8_t pkt[SNAPLEN];
00051 Event event;
00052 } Alertpkt;
"""
class Alertpkt(Structure):
_fields_ = [('alertmsg', (c_uint8 * ALERTMSG_LENGTH)),
('pkth', pcap_pkthdr),
('dlthdr', c_uint32),
('nethdr', c_uint32),
('transhdr', c_uint32),
('data', c_uint32),
('val', c_uint32),
('pkt', (c_uint8 * SNAPLEN)),
('event', Event)
]
str_format = "[%s]" % ', '.join(['{0.%s}' % f[0] for f in _fields_])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment