Skip to content

Instantly share code, notes, and snippets.

@pdxjohnny
Created May 20, 2020 18:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pdxjohnny/28d9bf64a9003a003bc66ae2a3f00b75 to your computer and use it in GitHub Desktop.
Save pdxjohnny/28d9bf64a9003a003bc66ae2a3f00b75 to your computer and use it in GitHub Desktop.
Original cve-bin-tool CVE db code (async)
'''
Retrieval access and caching of NIST CVE database
'''
import os
import re
import gzip
import json
import glob
import asyncio
import hashlib
import logging
import aiohttp
from aiohttp import web
from bs4 import BeautifulSoup
from .log import LOGGER
class CVEDB(object):
CACHEDIR = os.path.join(os.path.expanduser('~'), '.cache')
FEED = 'https://nvd.nist.gov/vuln/data-feeds'
LOGGER = LOGGER.getChild('CVEDB')
def __init__(self, verify = True):
self.feed = self.FEED
self.cachedir = self.CACHEDIR
self.lock = asyncio.Lock()
self.dataset = {}
self.session = None
self.verify = verify
async def getmeta(self, session, metaurl):
async with session.get(metaurl) as response:
return metaurl.replace('.meta', '.json.gz'), \
dict([line.split(':', maxsplit=1) \
for line in (await response.text()).split('\r\n') \
if ':' in line])
async def nist_scrape(self, session, feed):
async with session.get(feed) as response:
page = await response.text()
data = BeautifulSoup(page, 'html.parser')
jsonmetalinks = [project.get('href') for project in \
data.find_all(href=re.compile('/json/.*.meta'))]
return dict(await asyncio.gather(*[
self.getmeta(session, metaurl) for metaurl in jsonmetalinks]))
async def refresh(self, session):
if not os.path.isdir(self.cachedir):
os.makedirs(self.cachedir)
update = await self.nist_scrape(session, self.feed)
await asyncio.gather(*[self.cache_update(session, url, meta['sha256']) \
for url, meta in update.items()])
async def cache_update(self, session, url, sha):
filename = url.split('/')[-1].replace('.gz', '')
filepath = os.path.join(self.cachedir, filename)
if os.path.isfile(filepath):
with open(filepath, 'rb') as handle:
if self.sha_validate(url, sha, handle.read()):
handle.seek(0)
await self.dataset_load(filename, handle.read().decode())
return
self.LOGGER.info('Updating CVE cache for %s', filename)
async with session.get(url) as response:
jsondata = gzip.decompress(await response.read())
if not self.sha_validate(url, sha, jsondata):
return
with open(filepath, 'wb') as handle:
handle.write(jsondata)
await self.dataset_load(filename, jsondata)
def sha_validate(self, key, sha, data):
sha = sha.upper()
gotsha = hashlib.sha256(data).hexdigest().upper()
if gotsha != sha:
self.LOGGER.critical('SHA mismatch for %s '
'(have: %r, want: %r)', key, gotsha, sha)
return False
return True
async def dataset_load(self, key, jsondata):
data = json.loads(jsondata)
async with self.lock:
for cve in data['CVE_Items']:
self.dataset[cve['cve']['CVE_data_meta']['ID']] = cve
self.LOGGER.debug('Loaded %s(%d) into %s.dataset', key,
len(data['CVE_Items']), self.__class__.__qualname__)
async def load_no_verify(self):
for filepath in glob.glob(os.path.join(self.cachedir, '*')):
if os.path.isfile(filepath):
with open(filepath, 'rb') as handle:
await self.dataset_load(filepath, handle.read().decode())
async def cve(self, cveid):
return self.dataset[cveid]
async def cves(self):
for cveID, cve in self.dataset.items():
yield cveID, cve
async def __aenter__(self):
client = aiohttp.ClientSession(trust_env=True)
self.session = await client.__aenter__()
if not self.verify:
self.LOGGER.warning('Not verifying CVE DB cache')
await self.load_no_verify()
else:
await self.refresh(self.session)
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.__aexit__(exc_type, exc_value, traceback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment