Last active
November 16, 2021 16:18
-
-
Save TheStaticTurtle/a9137d3af2cf00955acb832b7b1d59e6 to your computer and use it in GitHub Desktop.
StormShield Firwall Minimal Syslog Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HOST, PORT = "0.0.0.0", 5514 | |
import logging,coloredlogs,socketserver | |
coloredlogs.install(level=logging.DEBUG, fmt='%(message)s', datefmt='') | |
#Remove theses values from the log | |
filter_proto = ["dns_udp", "dns_tcp", "ssdp", "bootps", "ntp"] | |
filter_dstname = []#["firefox", "github", "google", "msedge", "imgur", "cdn."] | |
filter_srcname = [] | |
filter_dst = [] | |
filter_src = [] | |
filter_action = ["Unknown"] | |
def check_filter(values, key, filter): | |
for x in filter: | |
if x in values[key]: | |
return True | |
return False | |
class SyslogUDPHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
data = str(bytes.decode(self.request[0].strip())) | |
socket = self.request[1] | |
#Check syslog facility is the firewall one | |
if "id=firewall" in data: | |
#Split and get the right side (values) | |
k = data.split("id=firewall ")[1] | |
#Split into key=value array | |
m = k.split(" ") | |
#Convert it to a dict | |
values = {} | |
for n in m: | |
try: | |
key, val = n.split("=") | |
values[key] = val | |
except Exception as e: | |
pass | |
#Add default values | |
if "srcname" not in values: | |
values["srcname"] = "Unknown" | |
if "src" not in values: | |
values["src"] = "Unknown" | |
if "srcif" not in values: | |
values["srcif"] = "Unknown" | |
if "srcifname" not in values: | |
values["srcifname"] = "Unknown" | |
if "srcport" not in values: | |
values["srcport"] = "??????" | |
if "dstname" not in values: | |
values["dstname"] = "Unknown" | |
if "dst" not in values: | |
values["dst"] = "Unknown" | |
if "dstport" not in values: | |
values["dstport"] = "??????" | |
if "proto" not in values: | |
values["proto"] = "Unknown" | |
if "ipproto" not in values: | |
values["ipproto"] = "???" | |
if "action" not in values: | |
values["action"] = "Unknown" | |
try: | |
# Check filters | |
if check_filter(values, "proto", filter_proto): | |
return | |
if check_filter(values, "dstname", filter_dstname): | |
return | |
if check_filter(values, "srcname", filter_srcname): | |
return | |
if check_filter(values, "dst", filter_dst): | |
return | |
if check_filter(values, "src", filter_src): | |
return | |
if check_filter(values, "action", filter_action): | |
return | |
# Log this thing | |
loglevelfn = logging.info | |
loglevelfn = logging.error if values['action'] == "block" else loglevelfn | |
loglevelfn = logging.warn if values['action'] == "Unknown" else loglevelfn | |
loglevelfn(f" Src={values['src']: <15} SrcPort={values['srcport']: <5} SrcName={values['srcname']: <20} SrcIf={values['srcifname']: <10} IpProto={values['ipproto']: <4} Proto={values['proto']: <15} Dst={values['dst']: <15} DstName={values['dstname']: <55} DstPort={values['dstport']: <5} Action={values['action']: <8}") | |
except Exception as e: | |
logging.error(e) | |
if __name__ == "__main__": | |
try: | |
server = socketserver.UDPServer((HOST,PORT), SyslogUDPHandler) | |
server.serve_forever(poll_interval=0.5) | |
except (IOError, SystemExit): | |
raise | |
except KeyboardInterrupt: | |
print ("Crtl+C Pressed. Shutting down.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment