Created
June 30, 2019 16:45
-
-
Save garethr/502d6e07632fd1535844854543205d65 to your computer and use it in GitHub Desktop.
An example using the new Python Snky API client to populate a sqlite database for use for datasette
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
""" | |
This script provides a basic example of using the pysnyk Snyk API client | |
to populate a SQLite database, currently with a list of organizations, | |
members, projects, vulnerabilies and dependencies. | |
This is particularly useful combined with datasette, which provides a | |
handy interface for exploring the data. | |
The script should be fine to be run multiple times. It should update any | |
existing records if they have changed (for instance if a project has | |
new vulnerabilities) and also add any newly found records. | |
In order to run this script you will need to install pysnyk and sqlite_utils. | |
""" | |
import os | |
import sys | |
import snyk | |
import sqlite_utils | |
import sqlite3 | |
def enable_or_update_search_index(table, fields): | |
try: | |
table.enable_fts(fields) | |
except sqlite3.OperationalError: | |
table.populate_fts(fields) | |
try: | |
token = os.environ["SNYK_API_TOKEN"] | |
except KeyError: | |
sys.exit("You must provide a SNYK_API_TOKEN") | |
client = snyk.SnykClient(token) | |
db = sqlite_utils.Database("snyk.db") | |
organizations = client.organizations.all() | |
db["organizations"].upsert_all([x.to_dict() for x in organizations], pk="id") | |
projects = client.projects.all() | |
projects_data = [] | |
for project in projects: | |
project_data = project.to_dict() | |
project_data["organization_id"] = project.organization.id | |
project_data["mediumSeverityIssues"] = project.issueCountsBySeverity.medium | |
project_data["highSeverityIssues"] = project.issueCountsBySeverity.high | |
project_data["lowSeverityIssues"] = project.issueCountsBySeverity.low | |
for remove in ["organization", "issueCountsBySeverity"]: | |
project_data.pop(remove) | |
projects_data.append(project_data) | |
db["projects"].upsert_all( | |
projects_data, | |
pk="id", | |
foreign_keys=["organization_id"], | |
column_order=( | |
"id", | |
"name", | |
"origin", | |
"type", | |
"highSeverityIssues", | |
"mediumSeverityIssues", | |
"lowSeverityIssues", | |
), | |
) | |
for org in organizations: | |
integrations_data = [x.to_dict() for x in org.integrations.all()] | |
for integration in integrations_data: | |
integration["organization_id"] = integration["organization"]["id"] | |
integration.pop("organization") | |
db["integrations"].upsert_all( | |
integrations_data, pk="id", foreign_keys=["organization_id"] | |
) | |
for org in organizations: | |
dependency_data = [x.to_dict() for x in org.dependencies.all()] | |
for dep in dependency_data: | |
dep["organization_id"] = org.id | |
db["dependencies"].upsert_all( | |
dependency_data, pk="id", foreign_keys=["organization_id"] | |
) | |
for org in organizations: | |
members_data = [x.to_dict() for x in org.members.all()] | |
for member in members_data: | |
member["organization_id"] = org.id | |
db["members"].upsert_all(members_data, pk="id", foreign_keys=["organization_id"]) | |
for project in projects: | |
vuln_data = [x.to_dict() for x in project.vulnerabilities] | |
for vuln in vuln_data: | |
vuln["project_id"] = project.id | |
db["vulnerabilities"].upsert_all( | |
vuln_data, | |
pk="id", | |
foreign_keys=["project_id"], | |
column_order=("id", "title", "description", "url", "project_id"), | |
) | |
enable_or_update_search_index(db["vulnerabilities"], ["description", "title"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment