Skip to content

Instantly share code, notes, and snippets.

@s-fujimoto
Last active October 21, 2018 10:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s-fujimoto/02a690cf6302f1299efe to your computer and use it in GitHub Desktop.
Save s-fujimoto/02a690cf6302f1299efe to your computer and use it in GitHub Desktop.
collect vulnerability for AWS Lambda
#####################################################################################
### Select from low, middle, high. If you choice middle, script collect middle and high.
SEVERITY = "middle"
### Select from daily, weekly, monthly
INTERVAL = "daily"
### Specify Publish Topic Arn for SNS
TOPIC_ARN = "arn:aws:sns:*******:************:**********"
### Specify notify email subject
NOTIFY_SUBJECT = "[ALERT] Found Vulnerability Information"
### Select from product name from result to call getProductList API (http://jvndb.jvn.jp/myjvn?method=getProductList)
PRODUCTS = [
"Linux Kernel",
"OpenSSL",
"Apache HTTP Server",
"Apache Tomcat"
]
#####################################################################################
import datetime
from dateutil import relativedelta
import boto3
import urllib2
from xml.dom import minidom
URL_PATH = "http://jvndb.jvn.jp/myjvn"
PRODUCT_LIST_METHOD = "getProductList"
VULN_OVERVIEW_LIST_METHOD = "getVulnOverviewList"
NS_STATUS = "http://jvndb.jvn.jp/myjvn/Status"
NS_SEC = "http://jvn.jp/rss/mod_sec/"
NS = "http://purl.org/rss/1.0/"
NS_PRODUCT = "http://jvndb.jvn.jp/myjvn/Results"
def lambda_handler(event, context):
product_ids = get_product_ids(PRODUCTS)
print product_ids
vulnerabilities = []
for product in product_ids:
vuls = get_vulnerability_list(product, SEVERITY, INTERVAL)
if vuls:
vulnerabilities.extend(vuls)
if not vulnerabilities:
print "don't exist vulnerability."
return
notify(vulnerabilities, TOPIC_ARN)
def get_product_ids(product_names):
product_ids = set()
for name in product_names:
url = URL_PATH + "?method=" + PRODUCT_LIST_METHOD + "&keyword=" + name
res = urllib2.urlopen(url).read().encode("utf-8")
dom = minidom.parseString(res)
for product in dom.getElementsByTagNameNS(NS_PRODUCT, "Product"):
if product.getAttribute("pname") == name:
product_ids.add(product.getAttribute("pid"))
return product_ids
def get_vulnerability_list(product, severity, interval):
vulnerabilities = []
if severity == "low":
severity = "l"
elif severity == "middle":
severity = "m"
elif severity == "high":
severity = "h"
start = None
now = datetime.datetime.now()
end = now - datetime.timedelta(days=1)
if interval == "daily":
start = end
elif interval == "weekly":
start = now - datetime.timedelta(days=7)
elif interval == "monthly":
start = now - relativedelta.relativedelta(months=1)
url = URL_PATH + "?method=" + VULN_OVERVIEW_LIST_METHOD \
+ "&productId=" + product \
+ "&severity=" + severity \
+ "&dateFirstPublishedStartY=" + str(start.year) \
+ "&dateFirstPublishedStartM=" + str(start.month) \
+ "&dateFirstPublishedStartD=" + str(start.day) \
+ "&dateFirstPublishedEndY=" + str(end.year) \
+ "&dateFirstPublishedEndM=" + str(end.month) \
+ "&dateFirstPublishedEndD=" + str(end.day) \
+ "&rangeDatePublished=n&rangeDatePublic=n"
res = urllib2.urlopen(url).read().encode("utf-8")
dom = minidom.parseString(res)
total = dom.getElementsByTagNameNS(NS_STATUS, "Status")[0].getAttribute("totalRes")
for num in range(1, int(total), 50):
url = url + "&startItem=" + str(num)
res = urllib2.urlopen(url)
dom = minidom.parseString(res.read().encode("utf-8"))
for item in dom.getElementsByTagNameNS(NS, "item"):
vul = {}
if len(item.getElementsByTagNameNS(NS_SEC, "identifier")) == 1:
vul["id"] = item.getElementsByTagNameNS(NS_SEC, "identifier")[0].childNodes[0].nodeValue
if len(item.getElementsByTagNameNS(NS, "link")) == 1:
vul["link"] = item.getElementsByTagNameNS(NS, "link")[0].childNodes[0].nodeValue
if len(item.getElementsByTagNameNS(NS, "title")) == 1:
vul["title"] = item.getElementsByTagNameNS(NS, "title")[0].childNodes[0].nodeValue
if len(item.getElementsByTagNameNS(NS, "description")) == 1:
vul["description"] = item.getElementsByTagNameNS(NS, "description")[0].childNodes[0].nodeValue
if len(item.getElementsByTagNameNS(NS_SEC, "cvss")) == 1:
cvss = item.getElementsByTagNameNS(NS_SEC, "cvss")[0]
vul["cvss_score"] = cvss.getAttribute("score")
vul["cvss_severity"] = cvss.getAttribute("severity")
vulnerabilities.append(vul)
return vulnerabilities
def notify(vulnerabilities, topic_arn):
message = ""
for vul in vulnerabilities:
if vul.get("title"):
message += "Title: " + vul["title"] + "\n"
if vul.get("id"):
message += "ID: " + vul["id"] + "\n"
if vul.get("cvss_score"):
message += "CVSS Score: " + vul["cvss_score"] + ", Severity: " + vul["cvss_severity"] + "\n"
if vul.get("description"):
message += "Detail: " + vul["description"] + "\n"
if vul.get("link"):
message += "URL: " + vul["link"] + "\n"
message += "\n"
client = boto3.client("sns")
print(
client.publish(TopicArn=topic_arn,
Subject=NOTIFY_SUBJECT,
Message=message
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment