Skip to content

Instantly share code, notes, and snippets.

@garethr
Created June 30, 2019 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save garethr/502d6e07632fd1535844854543205d65 to your computer and use it in GitHub Desktop.
Save garethr/502d6e07632fd1535844854543205d65 to your computer and use it in GitHub Desktop.
An example using the new Python Snky API client to populate a sqlite database for use for datasette
#!/bin/env python3
"""
This script provides a basic example of using the pysnyk Snyk API client
to populate a SQLite database, currently with a list of organizations,
members, projects, vulnerabilies and dependencies.
This is particularly useful combined with datasette, which provides a
handy interface for exploring the data.
The script should be fine to be run multiple times. It should update any
existing records if they have changed (for instance if a project has
new vulnerabilities) and also add any newly found records.
In order to run this script you will need to install pysnyk and sqlite_utils.
"""
import os
import sys
import snyk
import sqlite_utils
import sqlite3
def enable_or_update_search_index(table, fields):
try:
table.enable_fts(fields)
except sqlite3.OperationalError:
table.populate_fts(fields)
try:
token = os.environ["SNYK_API_TOKEN"]
except KeyError:
sys.exit("You must provide a SNYK_API_TOKEN")
client = snyk.SnykClient(token)
db = sqlite_utils.Database("snyk.db")
organizations = client.organizations.all()
db["organizations"].upsert_all([x.to_dict() for x in organizations], pk="id")
projects = client.projects.all()
projects_data = []
for project in projects:
project_data = project.to_dict()
project_data["organization_id"] = project.organization.id
project_data["mediumSeverityIssues"] = project.issueCountsBySeverity.medium
project_data["highSeverityIssues"] = project.issueCountsBySeverity.high
project_data["lowSeverityIssues"] = project.issueCountsBySeverity.low
for remove in ["organization", "issueCountsBySeverity"]:
project_data.pop(remove)
projects_data.append(project_data)
db["projects"].upsert_all(
projects_data,
pk="id",
foreign_keys=["organization_id"],
column_order=(
"id",
"name",
"origin",
"type",
"highSeverityIssues",
"mediumSeverityIssues",
"lowSeverityIssues",
),
)
for org in organizations:
integrations_data = [x.to_dict() for x in org.integrations.all()]
for integration in integrations_data:
integration["organization_id"] = integration["organization"]["id"]
integration.pop("organization")
db["integrations"].upsert_all(
integrations_data, pk="id", foreign_keys=["organization_id"]
)
for org in organizations:
dependency_data = [x.to_dict() for x in org.dependencies.all()]
for dep in dependency_data:
dep["organization_id"] = org.id
db["dependencies"].upsert_all(
dependency_data, pk="id", foreign_keys=["organization_id"]
)
for org in organizations:
members_data = [x.to_dict() for x in org.members.all()]
for member in members_data:
member["organization_id"] = org.id
db["members"].upsert_all(members_data, pk="id", foreign_keys=["organization_id"])
for project in projects:
vuln_data = [x.to_dict() for x in project.vulnerabilities]
for vuln in vuln_data:
vuln["project_id"] = project.id
db["vulnerabilities"].upsert_all(
vuln_data,
pk="id",
foreign_keys=["project_id"],
column_order=("id", "title", "description", "url", "project_id"),
)
enable_or_update_search_index(db["vulnerabilities"], ["description", "title"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment