Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save TheStaticTurtle/a9137d3af2cf00955acb832b7b1d59e6 to your computer and use it in GitHub Desktop.
Save TheStaticTurtle/a9137d3af2cf00955acb832b7b1d59e6 to your computer and use it in GitHub Desktop.
StormShield Firwall Minimal Syslog Server
HOST, PORT = "0.0.0.0", 5514
import logging,coloredlogs,socketserver
coloredlogs.install(level=logging.DEBUG, fmt='%(message)s', datefmt='')
#Remove theses values from the log
filter_proto = ["dns_udp", "dns_tcp", "ssdp", "bootps", "ntp"]
filter_dstname = []#["firefox", "github", "google", "msedge", "imgur", "cdn."]
filter_srcname = []
filter_dst = []
filter_src = []
filter_action = ["Unknown"]
def check_filter(values, key, filter):
for x in filter:
if x in values[key]:
return True
return False
class SyslogUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = str(bytes.decode(self.request[0].strip()))
socket = self.request[1]
#Check syslog facility is the firewall one
if "id=firewall" in data:
#Split and get the right side (values)
k = data.split("id=firewall ")[1]
#Split into key=value array
m = k.split(" ")
#Convert it to a dict
values = {}
for n in m:
try:
key, val = n.split("=")
values[key] = val
except Exception as e:
pass
#Add default values
if "srcname" not in values:
values["srcname"] = "Unknown"
if "src" not in values:
values["src"] = "Unknown"
if "srcif" not in values:
values["srcif"] = "Unknown"
if "srcifname" not in values:
values["srcifname"] = "Unknown"
if "srcport" not in values:
values["srcport"] = "??????"
if "dstname" not in values:
values["dstname"] = "Unknown"
if "dst" not in values:
values["dst"] = "Unknown"
if "dstport" not in values:
values["dstport"] = "??????"
if "proto" not in values:
values["proto"] = "Unknown"
if "ipproto" not in values:
values["ipproto"] = "???"
if "action" not in values:
values["action"] = "Unknown"
try:
# Check filters
if check_filter(values, "proto", filter_proto):
return
if check_filter(values, "dstname", filter_dstname):
return
if check_filter(values, "srcname", filter_srcname):
return
if check_filter(values, "dst", filter_dst):
return
if check_filter(values, "src", filter_src):
return
if check_filter(values, "action", filter_action):
return
# Log this thing
loglevelfn = logging.info
loglevelfn = logging.error if values['action'] == "block" else loglevelfn
loglevelfn = logging.warn if values['action'] == "Unknown" else loglevelfn
loglevelfn(f" Src={values['src']: <15} SrcPort={values['srcport']: <5} SrcName={values['srcname']: <20} SrcIf={values['srcifname']: <10} IpProto={values['ipproto']: <4} Proto={values['proto']: <15} Dst={values['dst']: <15} DstName={values['dstname']: <55} DstPort={values['dstport']: <5} Action={values['action']: <8}")
except Exception as e:
logging.error(e)
if __name__ == "__main__":
try:
server = socketserver.UDPServer((HOST,PORT), SyslogUDPHandler)
server.serve_forever(poll_interval=0.5)
except (IOError, SystemExit):
raise
except KeyboardInterrupt:
print ("Crtl+C Pressed. Shutting down.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment