Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mcchae
Last active August 29, 2018 06:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mcchae/fc928f8de526031cbf665f924f402487 to your computer and use it in GitHub Desktop.
Save mcchae/fc928f8de526031cbf665f924f402487 to your computer and use it in GitHub Desktop.
CVE search engine using python Whoosh
#!/usr/bin/env python
# coding=utf-8
"""
import CVE from "http://cve.mitre.org/data/downloads/allitems.txt"
"""
################################################################################
import os
import urllib2
import getopt
import datetime
from whoosh.index import create_in
from whoosh.fields import *
from termcolor import colored
INDEX_DIR="/opt/scap/cve"
# ################################################################################
# def get_unique_fields(txt=None,
# url="http://cve.mitre.org/data/downloads/allitems.txt"):
# """
# :param txt:
# :return:
# {Name} hits 102065
# {Status} hits 102065
# {URL} hits 99012
# {Phase} hits 99012
# {Category} hits 99012
# {Reference} hits 874535
# """
################################################################################
def cve_generator(txt=None,
url="http://cve.mitre.org/data/downloads/allitems.txt"):
"""
:param txt:
:param url:
:return:
"""
if txt:
ifp = open(txt)
elif url:
ifp = urllib2.urlopen(url)
else:
raise ValueError('source text or url is not specified!')
try:
b_header = True
b_body = True
cve = {}
for i, line in enumerate(ifp):
line = line.strip()
if b_body:
if line.startswith('======'):
if cve and not b_header:
yield cve
cve = {}
b_header = False
b_body = False
continue
else: # in body
if not line:
continue
if 'body' not in cve:
cve['body'] = []
cve['body'].append(line)
else: # check header
if not line:
b_body = True
continue
ndx = line.find(':')
if ndx < 0:
sys.stderr.write('[%d] %s : Invalid colon format\n' %
(i+1, line))
else:
field = line[:ndx].lower()
if field not in cve:
cve[field] = []
cve[field].append(line[ndx+1:].strip())
if cve:
yield cve
finally:
ifp.close()
################################################################################
def do_index(index_folder=INDEX_DIR, txt=None, url=None,
yes=False, verbose=False):
# SQL의 DDL을 이용하듯이 특정 문서의 Schema 설정을 우선 합니다.
schema = Schema(
name=ID(stored=True),
status=KEYWORD(stored=True),
url=STORED(),
phase=KEYWORD(stored=True),
category=KEYWORD(stored=True),
reference=STORED(),
body=TEXT(stored=True),
)
if not yes:
confirm = raw_input("Want to delete and reindexing at <%s>? [y/N] "
% index_folder)
if not confirm.lower() in ('y', 'yes'):
return False
if os.path.isdir(index_folder):
os.system('rm -rf "%s/*"' % index_folder)
if verbose:
sys.stdout.write('delete all contents at <%s>\n' % index_folder)
s_ts = datetime.datetime.now()
# 스키마 정보로 색인용 폴더를 생성합니다.
ix = create_in(INDEX_DIR, schema)
# Inverted색인을 위한 writer를 정의합니다.
writer = ix.writer()
sys.stdout.write('start indexing... from ')
sys.stdout.write('%s\n' % (txt if txt else url))
for i, cve in enumerate(cve_generator(txt=txt, url=url)):
fields = {
'name': ''.join(cve['name']),
'status': '' if 'status' not in cve else ' '.join(cve['status']),
'url': '' if 'url' not in cve else '\n'.join(cve['url']),
'phase': '' if 'phase' not in cve else ' '.join(cve['phase']),
'category': '' if 'category' not in cve else ' '.join(cve['category']),
'reference': '' if 'reference' not in cve else '\n'.join(cve['reference']),
'body': '' if 'body' not in cve else '\n'.join(cve['body']),
}
for f in fields.keys():
try:
fields[f] = unicode(fields[f], 'latin1')
except Exception as err:
sys.stderr.write('fields["%s"]=%s unicode error: %s' % (
f, fields[f], str(err)
))
writer.add_document(**fields)
if verbose and i % 1000 == 0:
sys.stdout.write('<<<%d>>> indexing...\n' % (i + 1,))
sys.stdout.write('<<<%d>>> indexing... done\n' % (i + 1,))
sys.stdout.write('commit and optimizing...')
sys.stdout.flush()
writer.commit(optimize=True)
e_ts = datetime.datetime.now()
sys.stdout.write('done! [It takes %s]\n' % (e_ts - s_ts))
return True
################################################################################
def usage(msg=None):
"""
usage for this search program
:param msg:
:return:
"""
if msg:
print(colored(str(msg), 'red'))
print(colored('''
usage: {0} [options] query_string
query and search result from CVE
options are:
-h, --help : show this message
-i, --index_folder : index folder (default is "/opt/scap/cve")
-t, --txt : use text cve file instead url
-u, --url : url to process
(default is "http://cve.mitre.org/data/downloads/allitems.txt")
-y, --yes : do not confirm to rebuild index
-v, --verbose : verbose print
'''.format(sys.argv[0]), 'green'))
sys.exit(1)
################################################################################
if __name__ == '__main__':
kwargs = {
"index_folder": INDEX_DIR,
"txt": None,
"url": "http://cve.mitre.org/data/downloads/allitems.txt",
"yes": False,
"verbose": False,
}
try:
opts, args = getopt.getopt(
sys.argv[1:], "hi:t:u:yv",
["help", "index_folder=", "txt=", "url=", "yes", "verbose"]
)
for o, a in opts:
if o in ("-h", "--help"):
usage()
elif o in ("-i", "--index_folder"):
kwargs['index_folder'] = a
elif o in ("-t", "--txt"):
kwargs['txt'] = a
elif o in ("-u", "--url"):
kwargs['url'] = int(a)
elif o in ("-y", "--yes"):
kwargs['yes'] = True
elif o in ("-v", "--verbose"):
kwargs['verbose'] = True
do_index(**kwargs)
except Exception as e:
usage(str(e))
#!/usr/bin/env python
# coding=utf-8
"""
fulltext search the source "http://cve.mitre.org/data/downloads/allitems.txt"
"""
################################################################################
import os
import getopt
from whoosh.index import open_dir
from whoosh.fields import *
from whoosh.qparser import QueryParser
from pprint import pprint
from termcolor import colored
INDEX_DIR="/opt/scap/cve"
################################################################################
def print_lexicon(index_folder=INDEX_DIR, field='body', offset=0, limit=10):
"""
:param index_folder:
:param field:
:param offset:
:param limit:
:return:
"""
ix = open_dir(index_folder)
with ix.searcher() as searcher:
lx = searcher.lexicon(field)
lxs = [x for x in lx]
pprint('Number of lexicon of body = %d' % len(lxs))
if limit > 0:
lxs = lxs[offset:limit]
pprint(lxs)
################################################################################
def search(ix, qstr, field='body', offset=0):
"""
:param ix:
:param qstr:
:param field:
:param offset:
:return:
"""
if not isinstance(qstr, unicode):
qstr = unicode(qstr, 'latin1')
with ix.searcher() as searcher:
query = QueryParser(field, ix.schema).parse(qstr)
results = searcher.search(query)
length = len(results)
for i, r in enumerate(results[offset:]):
# print("[%d]%s" % (i+1, '='*80))
# print(r['body'])
r_json = {
'n_order': i+1,
'n_length': length,
'name': r['name'],
'status': r['status'],
'url': r['url'],
'phase': r['phase'],
'category': r['category'],
'reference': r['reference'],
'body': r['body'],
}
yield r_json
################################################################################
def do_search(index_folder=INDEX_DIR, qstr=None, field='body',
offset=0, limit=10):
"""
:param index_folder:
:param qstr:
:param field:
:param offset:
:param limit:
:return:
"""
if not qstr:
raise ValueError("Invalid query string")
ix = open_dir(index_folder)
for i, r in enumerate(search(ix, qstr, field, offset)):
print("[%d]%s" % (i + 1, '=' * 80))
pprint(r)
if i + 1 >= limit > 0:
break
################################################################################
def usage(msg=None):
"""
usage for this search program
:param msg:
:return:
"""
if msg:
print(colored(str(msg), 'red'))
print(colored('''
usage: {0} [options] query_string
query and search result from CVE
options are:
-h, --help : show this message
-i, --index_folder : index folder (default is "/opt/scap/cve")
-f, --field : set search field (default is "body")
search filed may one of {{"status", "phase", "category"}}
-o, --offset : offset to skip (0-based)
-l, --limit : to N print (default is 10, 0 means no limit)
-x, --lexicon : print lexicon (from field get all terms)
'''.format(sys.argv[0]), 'green'))
sys.exit(1)
################################################################################
if __name__ == '__main__':
kwargs = {
"index_folder": INDEX_DIR,
"field": "body",
"qstr": None,
"offset": 0,
"limit": 10,
"lexicon": False,
}
try:
opts, args = getopt.getopt(
sys.argv[1:], "hi:f:o:l:x",
["help", "index_folder=", "field=", "offset=", "limit=", "lexicon"]
)
for o, a in opts:
if o in ("-h", "--help"):
usage()
elif o in ("-i", "--index_folder"):
kwargs['index_folder'] = a
elif o in ("-f", "--field"):
kwargs['field'] = a
elif o in ("-o", "--offset"):
kwargs['offset'] = int(a)
elif o in ("-l", "--limit"):
kwargs['limit'] = int(a)
elif o in ("-x", "--lexicon"):
kwargs['lexicon'] = True
if kwargs['lexicon']:
del kwargs['qstr']
del kwargs['lexicon']
print_lexicon(**kwargs)
else:
del kwargs['lexicon']
kwargs["qstr"] = ' '.join(args)
do_search(**kwargs)
except Exception as e:
usage(str(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment