Created
May 6, 2016 19:52
-
-
Save chuckadams/bb4c1f0febfd59464bde5fbb0d6d0a8e to your computer and use it in GitHub Desktop.
read the npm database and shove it into sqlite. requires ijson (pip install --user ijson)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import argparse | |
import logging | |
import os, os.path | |
import re | |
import sqlite3 | |
import sys | |
import urllib | |
import ijson | |
PROGRESS = 1 | |
BASE = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..')) | |
logging.basicConfig(level=logging.DEBUG) | |
LOG = logging | |
#### DB stuff | |
DB_DSN = os.path.join(BASE, 'npm.db') | |
DB_SCHEMA = """ | |
create table npm_basic(name, description, homepage); | |
create unique index idx_name on npm_basic(name); | |
-- future-proofing | |
create view npm(name, description, homepage) AS SELECT * from npm_basic; | |
""" | |
def connect_db(dsn): | |
conn = sqlite3.connect(dsn) | |
cx = conn.cursor() | |
try: | |
cx.execute('select 1 from npm limit 1') | |
LOG.debug("Using existing database in %s", dsn) | |
except sqlite3.OperationalError: | |
LOG.info("Creating new database in %s", dsn) | |
cx.executescript(DB_SCHEMA); | |
return (conn,cx) | |
def add_name(cx, name): | |
cx.execute("insert or ignore into npm_basic(name) values (?)", (name,)) | |
def _setfield(field): | |
return "update npm_basic set {} = ? where name = ?".format(field) | |
def set_description(cx, name, desc): | |
cx.execute(_setfield('description'), (desc, name)) | |
def set_homepage(cx, name, url): | |
cx.execute(_setfield('homepage'), (url, name)) | |
# my poor excuse for unit tests | |
# add_name('foo-bar') | |
# set_description('foo-bar', "does foo things with bar. or vice versa") | |
# set_homepage('foo-bar', "http://aimless.info/foo-bar") | |
# conn.commit() | |
# conn.close() | |
# sys.exit(1) | |
#### main | |
IGNORED_EVENTS = "start_map end_map start_array end_array map_key".split() | |
# this is a regex | |
INTERESTING = re.compile(r'\.(?:description|homepage|(?<!item\.)(?<!author\.)name|keywords)$') | |
# this is just documentation | |
COMMON_FIELDS = "contributors users homepage bugs license readmeFilename description versions dist-tags time name author repository maintainers keywords".split() | |
def parse_json_stream(stream): | |
parser = ijson.parse(stream) | |
for prefix, event, value in parser: | |
if event in IGNORED_EVENTS: | |
continue | |
if not INTERESTING.search(prefix): | |
continue | |
# prefix = prefix.encode('utf-8') | |
if event == 'string': | |
value = re.sub(r'\s', ' ', value) | |
# value = value.encode('utf-8') | |
(name, field) = prefix.rsplit('.', 1) | |
yield (name, field, value) | |
def sanitize(s): | |
# newlines really screw us | |
s = s.replace('\r', ' ') | |
s = s.replace('\n', ' ') | |
return s | |
def sanitize_url(s): | |
# sanitize quotes too | |
s = sanitize(s) | |
s = s.replace(' ', '%20') | |
s = s.replace('"', '%22') | |
s = s.replace("'", '%27') | |
return s | |
def __main__(options, args): | |
if len(args) == 0: | |
url = 'http://registry.npmjs.org/-/all' | |
else: | |
url = args[0] | |
stream = urllib.urlopen(url) | |
(conn,cx) = connect_db(DB_DSN) | |
current_name = None | |
iterations = 0 | |
for (name, field, value) in parse_json_stream(stream): | |
if name != current_name: | |
name = sanitize(name) # I *hope* this isn't necessary... | |
iterations += 1 | |
if iterations % 1000 == 0: | |
if PROGRESS: | |
sys.stderr.write('\rprocessed: {0} {1:<80}'.format(iterations, name)) | |
conn.commit() | |
current_name = name | |
# LOG.debug(name) | |
add_name(cx, name) | |
if field == 'description': | |
value = sanitize(value) | |
set_description(cx, name, value) | |
elif field == 'homepage': | |
value = sanitize_url(value) | |
set_homepage(cx, name, value) | |
if PROGRESS: sys.stderr.write('\n') | |
conn.commit() | |
conn.close() | |
if __name__ == '__main__': | |
import sys | |
__main__({}, sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment