Created
May 20, 2020 18:41
-
-
Save pdxjohnny/28d9bf64a9003a003bc66ae2a3f00b75 to your computer and use it in GitHub Desktop.
Original cve-bin-tool CVE db code (async)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Retrieval access and caching of NIST CVE database | |
''' | |
import os | |
import re | |
import gzip | |
import json | |
import glob | |
import asyncio | |
import hashlib | |
import logging | |
import aiohttp | |
from aiohttp import web | |
from bs4 import BeautifulSoup | |
from .log import LOGGER | |
class CVEDB(object): | |
CACHEDIR = os.path.join(os.path.expanduser('~'), '.cache') | |
FEED = 'https://nvd.nist.gov/vuln/data-feeds' | |
LOGGER = LOGGER.getChild('CVEDB') | |
def __init__(self, verify = True): | |
self.feed = self.FEED | |
self.cachedir = self.CACHEDIR | |
self.lock = asyncio.Lock() | |
self.dataset = {} | |
self.session = None | |
self.verify = verify | |
async def getmeta(self, session, metaurl): | |
async with session.get(metaurl) as response: | |
return metaurl.replace('.meta', '.json.gz'), \ | |
dict([line.split(':', maxsplit=1) \ | |
for line in (await response.text()).split('\r\n') \ | |
if ':' in line]) | |
async def nist_scrape(self, session, feed): | |
async with session.get(feed) as response: | |
page = await response.text() | |
data = BeautifulSoup(page, 'html.parser') | |
jsonmetalinks = [project.get('href') for project in \ | |
data.find_all(href=re.compile('/json/.*.meta'))] | |
return dict(await asyncio.gather(*[ | |
self.getmeta(session, metaurl) for metaurl in jsonmetalinks])) | |
async def refresh(self, session): | |
if not os.path.isdir(self.cachedir): | |
os.makedirs(self.cachedir) | |
update = await self.nist_scrape(session, self.feed) | |
await asyncio.gather(*[self.cache_update(session, url, meta['sha256']) \ | |
for url, meta in update.items()]) | |
async def cache_update(self, session, url, sha): | |
filename = url.split('/')[-1].replace('.gz', '') | |
filepath = os.path.join(self.cachedir, filename) | |
if os.path.isfile(filepath): | |
with open(filepath, 'rb') as handle: | |
if self.sha_validate(url, sha, handle.read()): | |
handle.seek(0) | |
await self.dataset_load(filename, handle.read().decode()) | |
return | |
self.LOGGER.info('Updating CVE cache for %s', filename) | |
async with session.get(url) as response: | |
jsondata = gzip.decompress(await response.read()) | |
if not self.sha_validate(url, sha, jsondata): | |
return | |
with open(filepath, 'wb') as handle: | |
handle.write(jsondata) | |
await self.dataset_load(filename, jsondata) | |
def sha_validate(self, key, sha, data): | |
sha = sha.upper() | |
gotsha = hashlib.sha256(data).hexdigest().upper() | |
if gotsha != sha: | |
self.LOGGER.critical('SHA mismatch for %s ' | |
'(have: %r, want: %r)', key, gotsha, sha) | |
return False | |
return True | |
async def dataset_load(self, key, jsondata): | |
data = json.loads(jsondata) | |
async with self.lock: | |
for cve in data['CVE_Items']: | |
self.dataset[cve['cve']['CVE_data_meta']['ID']] = cve | |
self.LOGGER.debug('Loaded %s(%d) into %s.dataset', key, | |
len(data['CVE_Items']), self.__class__.__qualname__) | |
async def load_no_verify(self): | |
for filepath in glob.glob(os.path.join(self.cachedir, '*')): | |
if os.path.isfile(filepath): | |
with open(filepath, 'rb') as handle: | |
await self.dataset_load(filepath, handle.read().decode()) | |
async def cve(self, cveid): | |
return self.dataset[cveid] | |
async def cves(self): | |
for cveID, cve in self.dataset.items(): | |
yield cveID, cve | |
async def __aenter__(self): | |
client = aiohttp.ClientSession(trust_env=True) | |
self.session = await client.__aenter__() | |
if not self.verify: | |
self.LOGGER.warning('Not verifying CVE DB cache') | |
await self.load_no_verify() | |
else: | |
await self.refresh(self.session) | |
return self | |
async def __aexit__(self, exc_type, exc_value, traceback): | |
await self.session.__aexit__(exc_type, exc_value, traceback) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment