Skip to content

Instantly share code, notes, and snippets.

@mpurzynski
Created April 16, 2019 17:44
Show Gist options
  • Save mpurzynski/1b835b8e92bc3ce6662c42d43c0b8216 to your computer and use it in GitHub Desktop.
Save mpurzynski/1b835b8e92bc3ce6662c42d43c0b8216 to your computer and use it in GitHub Desktop.
otx-misp.py
---
otx_api_key: {{ otx_api_key }}
misp_api_key: {{ misp_api_key }}
misp_api_url: {{ misp_api_url }}
#!/usr/bin/env python3
import argparse
import logging
from yaml import Loader, load, dump
from sys import argv, stderr
from os import environ, fsync, stat, rename
from logging.handlers import SysLogHandler
from OTXv2 import OTXv2, IndicatorTypes
from pandas.io.json import json_normalize
from datetime import datetime, timedelta
from dateutil import parser as date_parser
from pymisp import ExpandedPyMISP, MISPEvent
def setup_logging(stream=stderr, level=logging.INFO):
formatstr = (
"[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s"
)
logging.basicConfig(format=formatstr, datefmt="%H:%M:%S", stream=stream)
logger = logging.getLogger(__name__)
logger.setLevel(level)
return logger
def main():
global logger
environ["TZ"] = "UTC" # Override timezone so we know where we're at
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Specify a configuration file")
parser.add_argument("-d", "--debug", help="Print debug messages")
args = parser.parse_args()
with open(argv[0].replace(".py", ".yml"), "r") as configyaml:
config = load(configyaml, Loader=Loader)
otx_api_key = config.get("otx_api_key", "<OTXAPIKEY>")
misp_api_key = config.get("misp_api_key", "<MISPAPIKEY>")
misp_api_url = config.get("misp_api_url", "<APIKEY>")
if args.debug:
logger = setup_logging(level=logging.DEBUG)
else:
logger = setup_logging(level=logging.INFO)
logger.level = logging.DEBUG
logger.debug("Started and initialized")
pulses = []
otx = OTXv2(otx_api_key)
pulses = otx.getall(modified_since=datetime.today() - timedelta(days=3))
print(len(pulses))
misp = ExpandedPyMISP(misp_api_url, misp_api_key, True)
for pulse in pulses:
event = MISPEvent()
event.distribution = 0
event.threat_level_id = 1
event.analysis = 2
if "name" in pulse:
event.info = pulse["name"]
if "author_name" in pulse:
event.info = pulse["author_name"] + " | " + pulse["name"]
try:
dt = date_parser.parse(pulse["created"])
except (ValueError, OverflowError):
logger.error("Cannot parse Pulse 'created' date")
dt = datetime.utcnow()
event["date"] = dt
event_obj = misp.add_event(event)
event_id = event_obj.id
print("Event id: %s" % event_id)
for indicator in pulse["indicators"]:
indicator_kwargs = {"to_ids": True}
indicator_kwargs["comment"] = indicator["description"]
if indicator["type"] == "FileHash-SHA256":
misp.add_hashes(
event_id, sha256=indicator["indicator"], **indicator_kwargs
)
if indicator["type"] == "FileHash-SHA1":
misp.add_hashes(
event_id, sha1=indicator["indicator"], **indicator_kwargs
)
if indicator["type"] == "FileHash-MD5":
misp.add_hashes(
event_id, md5=indicator["indicator"], **indicator_kwargs
)
if "description" in indicator:
indicator_description = indicator["description"]
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment