Created
March 27, 2021 17:54
-
-
Save joydragon/830075b7da2b4a5c6a02913797f03e69 to your computer and use it in GitHub Desktop.
This is a script to listen to the ZMQ on your MISP project and validate the confidence level. Article about it on https://wordpress.com/post/finsin.cl/582
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import json | |
import logging | |
import zmq | |
import time | |
import re | |
from pymisp import ExpandedPyMISP | |
#################################### | |
# Initialization of global variables | |
misp_url = "" | |
misp_key = "" | |
tag_25 = "25%" | |
tag_50 = "50%" | |
tag_75 = "75%" | |
tag_100 = "100%" | |
misp = ExpandedPyMISP(misp_url, misp_key, True) | |
events_verified = [] | |
filters = ["misp_json"] | |
loop = True | |
logging.basicConfig(filename='default.log', level=logging.INFO, filemode='w', format='%(name)s - %(levelname)s - %(message)s') | |
########################################### | |
# Initialization of variables for arguments | |
port = "50000" | |
host = "127.0.0.1" | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.connect("tcp://%s:%s" % (host, port)) | |
socket.setsockopt(zmq.SUBSCRIBE, b'') | |
poller = zmq.Poller() | |
poller.register(socket, zmq.POLLIN) | |
########### | |
# Functions | |
def loop_through_event(j, misp): | |
for o in j["Event"]["Object"]: | |
for a in o["Attribute"]: | |
check_and_update(a, misp) | |
for a in j["Event"]["Attribute"]: | |
check_and_update(a, misp) | |
misp.publish(j["Event"]["id"], alert=False) | |
def check_and_update(a, misp): | |
if "Tag" in a: | |
tags = a["Tag"] | |
else: | |
tags = [] | |
con = check_confidence(a["comment"]) | |
to_ids = False | |
if con is not False: | |
if int(con) == 100: | |
to_ids = True | |
if not check_for_tag(a,tag_100): | |
tags.append({"name":tag_100}) | |
elif int(con) >= 75: | |
to_ids = True | |
if not check_for_tag(a,tag_75): | |
tags.append({"name":tag_75}) | |
elif int(con) >= 50: | |
if not check_for_tag(a,tag_50): | |
tags.append({"name":tag_50}) | |
elif int(con) >= 25: | |
if not check_for_tag(a,tag_25): | |
tags.append({"name":tag_25}) | |
attr = {"uuid": a["uuid"], "to_ids": to_ids, "Tag":tags} | |
misp.update_attribute(attr) | |
logging.info('Attribute ' + a["value"] + ' was flaged with "to_ids" ' + str(to_ids) + " and confidence level " + str(con)) | |
def check_for_tag(attr, tag): | |
if "Tag" in attr: | |
for t in attr["Tag"]: | |
if t["name"] == tag: | |
return True | |
return False | |
def check_confidence(comment): | |
m = re.search(r'confidence\s*level\s*:\s*([0-9]+)%', comment) | |
if len(m.groups()) > 0: | |
return m.groups()[0] | |
return False | |
logging.info('Starting confidence level checker') | |
while loop: | |
socks = dict(poller.poll(timeout=None)) | |
if socket in socks and socks[socket] == zmq.POLLIN: | |
message = socket.recv() | |
topic, s, m = message.decode('utf-8').partition(" ") | |
if topic not in filters: | |
continue | |
logging.info('Receiving message') | |
j = json.loads(m) | |
if j["Event"]["id"] not in events_verified: | |
events_verified.append(j["Event"]["id"]) | |
logging.info("Starting loop through event.") | |
loop_through_event(j, misp) | |
else: | |
logging.info('Event ' + j["Event"]["id"] + ' already checked') | |
time.sleep(0.1) | |
logging.info('Finishing cleanly') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment