Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Utility for synchronizing a list of indicators with a MineMeld local DB Miner (Python 2.7.9+)
#!/usr/bin/env python
# Copyright 2015-present Palo Alto Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Script for synchronizing a MineMeld local DB Miner with indicators read from
local file using MineMeld API.
## EXAMPLES
To only add the indicators stored in Miner IPv4ListMiner with the IPv4
indicators contained in the file my-ipv4-addresses:
./minemeld-sync.py -m https://my-minemeld-address -u my-admin -p my-password -t IPv4 IPv4ListMiner my-ipv4-addresses
By default old indicators are not removed from the Miner. To synchronize
indicators in DomainListMiner with indicators contained in file my-domains,
adding new indicators and removing old indicators, use the --delete option:
./minemeld-sync.py -m https://my-minemeld-address -u my-admin -p my-password -t domain --delete DomainListMiner my-domains
Default share_level of the added indicators is red. To specify a different
share level use the --share-level option:
./minemeld-sync.py -m https://my-minemeld-address -u my-admin -p my-password -t IPv6 IPv6ListMiner my-ipv6-addresses
## CERT VERIFICATION
By default remote MineMeld certificate is verified using certifi (if installed),
or using the CA bundle file or CA certs directory specified via the --ca-path option.
./minemeld-sync.py -m https://my-minemeld-address --ca-path /etc/ssl/certs -u my-admin -p my-password -t IPv6 IPv6ListMiner my-ipv6-addresses
To disable remote certificate verification use the option -k:
./minemeld-sync.py -m https://my-minemeld-address -k -u my-admin -p my-password -t IPv6 IPv6ListMiner my-ipv6-addresses
## INPUT FILE FORMAT
Version 0.1.4 add support for input files in JSON format.
### JSON
MineMeld output feeds in JSON format. Example:
[
{
"indicator": "8.8.8.8",
"value": {
"comment": "Google DNS 1",
"confidence": 100,
"type": "IPv4",
"share_level": "green"
}
},
{
"indicator": "8.8.4.4",
"value": {
"comment": "Google DNS 2",
"confidence": 100,
"type": "IPv4",
"share_level": "green"
}
}
]
### PLAIN TEXT
Input file format is quite simple: a list of indicators. One per line. Example:
http://malicious1.example.com
https://malicious2.example.com
To add comments to each indicator, include it in a line before the indicator starting with
the # character:
# indicator provided by my cousin
http://malicious1.example.com
# indicator provided by SOC
https://malicious2.example.com
It is also possible to add custom attributes, using the format @<attribute name>: <attribute value>
# Google Public DNS (this is the comment)
# @direction: outbound
8.8.8.8
# Google Public DNS (this is the comment)
# @direction: outbound
8.8.4.4
"""
__version__ = '0.1.5'
import logging
import argparse
import urllib2
import json
import base64
import ssl
import os.path
from urlparse import urljoin
try:
import certifi
CERTIFI_PATH = certifi.where()
except ImportError:
CERTIFI_PATH = None
LOG = logging.getLogger(__name__)
_MINEMELD_CLASS_TO_TYPE = {
'minemeld.ft.local.YamlIPv4FT': {
'data_file_type': 'yaml',
'types': ['IPv4']
},
'minemeld.ft.local.YamlIPv6FT': {
'data_file_type': 'yaml',
'types': ['IPv6']
},
'minemeld.ft.local.YamlDomainFT': {
'data_file_type': 'yaml',
'types': ['domain']
},
'minemeld.ft.local.YamlURLFT': {
'data_file_type': 'yaml',
'types': ['URL']
},
'minemeld.ft.localdb.Miner': {
'data_file_type': 'localdb',
'types': ['IPv4', 'IPv6', 'domain', 'URL']
}
}
class MineMeldAPIClient(object):
def __init__(self, url, username, password, capath):
self.url = url
self.username = username
self.password = password
self.cafile = None
self.capath = None
self.context = None
self.data_file_type = None
if capath is None:
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
else:
if os.path.isfile(capath):
self.cafile = capath
elif os.path.isdir(capath):
self.capath = capath
else:
raise RuntimeError('CA path should be a file or a directory')
def _call_api(self, uri, data=None, headers=None, method=None):
if headers is None:
headers = {}
api_url = urljoin(self.url, uri)
api_request = urllib2.Request(api_url, headers=headers)
basic_authorization = base64.b64encode('{}:{}'.format(self.username, self.password))
api_request.add_header(
'Authorization',
'Basic {}'.format(basic_authorization)
)
if method is not None:
api_request.get_method = lambda: method
LOG.debug('MineMeld API Request: {} {}'.format(
method if method is not None else 'GET',
api_url
))
result = urllib2.urlopen(
api_request,
data=data,
timeout=30,
capath=self.capath,
cafile=self.cafile,
context=self.context
)
content = result.read()
result.close()
return content
def check(self, miner, type_):
content = self._call_api('/status/minemeld')
minemeld_status = json.loads(content)['result']
for node in minemeld_status:
if node['name'] == miner:
if not node['class'] in _MINEMELD_CLASS_TO_TYPE:
raise RuntimeError('Unhandled Miner class {}'.format(node['class']))
if type_ not in _MINEMELD_CLASS_TO_TYPE[node['class']]['types']:
LOG.critical('Miner {} of class {} does not support {} indicators'.format(miner, node['class'], type_))
return False
self.data_file_type = _MINEMELD_CLASS_TO_TYPE[node['class']]['data_file_type']
return True
LOG.critical('Miner {} not found'.format(miner))
return False
def retrieve_list(self, miner):
try:
content = self._call_api('/config/data/{}_indicators?t={}'.format(miner, self.data_file_type))
except urllib2.HTTPError, e:
if e.code != 400:
raise
content = '{"result":[]}'
return json.loads(content)['result']
def upload(self, miner, data):
if self.data_file_type == 'localdb':
self._call_api(
'/config/data/{}_indicators/append?h={}&t=localdb'.format(miner, miner, self.data_file_type),
data=data,
headers={'Content-Type': 'application/json'},
method='POST'
)
return
self._call_api(
'/config/data/{}_indicators?h={}'.format(miner, miner),
data=data,
headers={'Content-Type': 'application/json'},
method='PUT'
)
def _iterate_over_json(listname):
with open(listname, 'r') as f:
ilist = json.load(f)
if not isinstance(ilist, list):
LOG.error('List of indicators expected in {}'.format(listname))
return
for e in ilist:
i = e.pop('indicator', None)
if i is None:
LOG.error('Missing indicator in entry in {} - ignored'.format(listname))
continue
e = e.pop('value', e)
e.pop('first_seen', None)
e.pop('last_seen', None)
e.pop('sources', None)
yield i, e
def _iterate_over_list(listname):
with open(listname, 'r') as f:
value = {}
for i in f:
i = i.strip()
if not i:
continue
if i.startswith('#'):
# if line starts with # it's an attribute for the next indicator
# format could be
# # <comment>
# # @<attribute name>: <attribute value>
i = i[1:].strip()
if not i:
continue
if i.startswith('@'):
a, v = [x.strip() for x in i[1:].split(':', 1)]
if a == 'confidence':
v = int(v)
elif a == 'ttl':
try:
v = int(v)
except ValueError:
v = 'disabled'
value[a] = v
else:
value['comment'] = i
else:
# add to the set
yield i, value
value = {}
def _merge_lists(lists, vdefault=None):
"""Returns a dictionary with all the indicators of the lists
"""
result = {}
for l in lists:
_, extension = os.path.splitext(l)
if extension == '.json':
g = _iterate_over_json(l)
else:
g = _iterate_over_list(l)
for indicator, ivalue in g:
value = dict(vdefault if vdefault is not None else {})
value.update(ivalue)
result[indicator] = value
return result
def _compute_actions(current, new_indicators):
current_list = set(current.keys())
new_list = set(new_indicators.keys())
result = []
for e in (new_list - current_list):
result.append(['add', e])
for e in (current_list - new_list):
result.append(['delete', e])
for e in (current_list & new_list):
oldrepr = current[e]
newrepr = json.dumps(dict(indicator=e, **new_indicators[e]), sort_keys=True)
if oldrepr != newrepr:
result.append(['update', e])
return result
def _parse_args():
parser = argparse.ArgumentParser(
description="Upload indicators to a MineMeld list Miner"
)
parser.add_argument(
'--version',
action='version',
version=__version__
)
parser.add_argument(
'--verbose',
action='store_true',
help='verbose'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='do not perform any action, displays actions that would be taken instead'
)
parser.add_argument(
'--ca-path',
action='store',
help='CA bundle or CA directory to be used for MineMeld cert verification'
)
parser.add_argument(
'-k',
action='store_true',
help='disable MineMeld cert verification'
)
parser.add_argument(
'-m', '--minemeld',
action='store',
required=True,
help='URL of MineMeld API. Example: https://myminemeld.example.com (required)'
)
parser.add_argument(
'-u', '--username',
action='store',
required=True,
help='username for authenticating to the MineMeld instance (required)'
)
parser.add_argument(
'-p', '--password',
action='store',
required=True,
help='password for authenticating to the MineMeld instance (required)'
)
parser.add_argument(
'-t', '--type',
action='store',
required=True,
help='type of indicators (required)'
)
parser.add_argument(
'--delete',
action='store_true',
help='delete old indicators'
)
parser.add_argument(
'--update',
action='store_true',
help='update existing indicators if different'
)
parser.add_argument(
'--share-level',
action='store',
default='red',
choices=['white', 'green', 'amber', 'red']
)
parser.add_argument(
'miner',
action='store',
help='Miner name'
)
parser.add_argument(
'list',
action='store',
nargs='+',
help='path of the file with the list of indicators'
)
return parser.parse_args()
def main():
args = _parse_args()
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
capath = CERTIFI_PATH
if args.ca_path:
capath = args.ca_path
if args.k:
LOG.warning('MineMeld cert verification disabled')
capath = None
elif capath is None:
LOG.critical('MineMeld cert verification enabled but no CA path specified and certifi is not installed')
return 1
MM = MineMeldAPIClient(
url=args.minemeld,
username=args.username,
password=args.password,
capath=capath
)
if not MM.check(miner=args.miner, type_=args.type):
return 1
current_list = MM.retrieve_list(miner=args.miner)
current = {e['indicator']: json.dumps(e, sort_keys=True) for e in current_list}
new_indicators = _merge_lists(args.list, {'share_level': args.share_level, 'type': args.type})
actions = _compute_actions(current, new_indicators)
if not args.update:
actions = [a for a in actions if a[0] != 'update']
if not args.delete:
actions = [a for a in actions if a[0] != 'delete']
if MM.data_file_type == 'localdb':
result = {}
else:
result = current
for aidx, (action, indicator) in enumerate(actions):
if action == 'delete':
LOG.info('A#{} - {} (delete)'.format(aidx, indicator))
if MM.data_file_type == 'localdb':
# with localdb with set a negative ttl to remove it
value = json.loads(current[indicator])
value['ttl'] = 0
result[indicator] = json.dumps(value, sort_keys=True)
else:
result.pop(indicator)
elif action == 'update' or action == 'add':
LOG.info('A#{} - {} ({})'.format(aidx, indicator, action))
result[indicator] = json.dumps(dict(indicator=indicator, **new_indicators[indicator]), sort_keys=True)
else:
raise RuntimeError('Unknown action {}'.format(action))
if args.dry_run:
LOG.info('Dry-run active, actions not performed')
return 0
if result:
MM.upload(args.miner, '[{}]'.format(','.join(result.values())))
LOG.info('Done')
if __name__ == "__main__":
main()
@codyrat

This comment has been minimized.

Copy link

codyrat commented Dec 21, 2018

Could a feature to set the expiration of the indicator be added?

@codyrat

This comment has been minimized.

Copy link

codyrat commented Dec 21, 2018

My apologies as I determined setting # @Ttl: disabled in the attribute of the indicator disables the expiration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.