Skip to content

Instantly share code, notes, and snippets.

@joydragon
Created March 27, 2021 17:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joydragon/830075b7da2b4a5c6a02913797f03e69 to your computer and use it in GitHub Desktop.
Save joydragon/830075b7da2b4a5c6a02913797f03e69 to your computer and use it in GitHub Desktop.
This is a script to listen to the ZMQ on your MISP project and validate the confidence level. Article about it on https://wordpress.com/post/finsin.cl/582
#! /usr/bin/env python3
import json
import logging
import zmq
import time
import re
from pymisp import ExpandedPyMISP
####################################
# Initialization of global variables
misp_url = ""
misp_key = ""
tag_25 = "25%"
tag_50 = "50%"
tag_75 = "75%"
tag_100 = "100%"
misp = ExpandedPyMISP(misp_url, misp_key, True)
events_verified = []
filters = ["misp_json"]
loop = True
logging.basicConfig(filename='default.log', level=logging.INFO, filemode='w', format='%(name)s - %(levelname)s - %(message)s')
###########################################
# Initialization of variables for arguments
port = "50000"
host = "127.0.0.1"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://%s:%s" % (host, port))
socket.setsockopt(zmq.SUBSCRIBE, b'')
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
###########
# Functions
def loop_through_event(j, misp):
for o in j["Event"]["Object"]:
for a in o["Attribute"]:
check_and_update(a, misp)
for a in j["Event"]["Attribute"]:
check_and_update(a, misp)
misp.publish(j["Event"]["id"], alert=False)
def check_and_update(a, misp):
if "Tag" in a:
tags = a["Tag"]
else:
tags = []
con = check_confidence(a["comment"])
to_ids = False
if con is not False:
if int(con) == 100:
to_ids = True
if not check_for_tag(a,tag_100):
tags.append({"name":tag_100})
elif int(con) >= 75:
to_ids = True
if not check_for_tag(a,tag_75):
tags.append({"name":tag_75})
elif int(con) >= 50:
if not check_for_tag(a,tag_50):
tags.append({"name":tag_50})
elif int(con) >= 25:
if not check_for_tag(a,tag_25):
tags.append({"name":tag_25})
attr = {"uuid": a["uuid"], "to_ids": to_ids, "Tag":tags}
misp.update_attribute(attr)
logging.info('Attribute ' + a["value"] + ' was flaged with "to_ids" ' + str(to_ids) + " and confidence level " + str(con))
def check_for_tag(attr, tag):
if "Tag" in attr:
for t in attr["Tag"]:
if t["name"] == tag:
return True
return False
def check_confidence(comment):
m = re.search(r'confidence\s*level\s*:\s*([0-9]+)%', comment)
if len(m.groups()) > 0:
return m.groups()[0]
return False
logging.info('Starting confidence level checker')
while loop:
socks = dict(poller.poll(timeout=None))
if socket in socks and socks[socket] == zmq.POLLIN:
message = socket.recv()
topic, s, m = message.decode('utf-8').partition(" ")
if topic not in filters:
continue
logging.info('Receiving message')
j = json.loads(m)
if j["Event"]["id"] not in events_verified:
events_verified.append(j["Event"]["id"])
logging.info("Starting loop through event.")
loop_through_event(j, misp)
else:
logging.info('Event ' + j["Event"]["id"] + ' already checked')
time.sleep(0.1)
logging.info('Finishing cleanly')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment