Skip to content

Instantly share code, notes, and snippets.

@moyix
Created September 4, 2022 08:41
Embed
What would you like to do?
import sys
import os
import re
import json
import zipfile
from collections import defaultdict, namedtuple
from collections.abc import Mapping
from email.parser import HeaderParser
from email.policy import compat32
from base64 import urlsafe_b64decode
import csv
import configparser
# For entry points
class CaseSensitiveConfigParser(configparser.ConfigParser):
optionxform = staticmethod(str)
# From PEP 566:
# It may be necessary to store metadata in a data structure which does not
# allow for multiple repeated keys, such as JSON.
#
# The canonical method to transform metadata fields into such a data structure
# is as follows:
#
# 1. The original key-value format should be read with email.parser.HeaderParser;
# 2. All transformed keys should be reduced to lower case. Hyphens should be
# replaced with underscores, but otherwise should retain all other characters;
# 3. The transformed value for any field marked with “(Multiple-use”) should be a
# single list containing all the original values for the given key;
# 4. The Keywords field should be converted to a list by splitting the original
# value on whitespace characters;
# 5. The message body, if present, should be set to the value of the description key.
# 6. The result should be stored as a string-keyed dictionary.
multiple_use = {
"dynamic",
"platform",
"classifier",
"requires-dist",
"requires-external",
"project-url",
"provides-extra",
"provides-pist",
"obsoletes-dist",
}
def msg_to_json(msg):
d = defaultdict(list)
for k, v in msg.items():
if k.lower() in multiple_use:
d[k.lower().replace('-', '_')].append(v)
elif k.lower() == 'keywords':
d['keywords'] = v.split()
else:
d[k.lower().replace('-', '_')] = v
if msg.is_multipart():
desc = msg.get_payload(0).get_payload()
else:
desc = msg.get_payload()
if desc: d['description'] = desc
return dict(d)
def record_to_json(record):
fields = ['path', 'hash', 'size']
rows = []
for row in csv.DictReader(record.get_payload().splitlines(), fieldnames=fields):
if row['size'] and row['size'].strip():
row['size'] = int(row['size'])
else:
row['size'] = None
if row['hash']:
hash_desc = row['hash'].split('=',1)
if len(hash_desc) == 2:
kind, value = hash_desc
else:
kind, value = 'sha256', hash_desc[0]
row['hash_algorithm'] = kind
row['hash_value'] = urlsafe_b64decode(value+'==').hex()
else:
row['hash_algorithm'] = None
row['hash_value'] = None
del row['hash']
rows.append(row)
return rows
# Decoders
def msg_decode(b):
return HeaderParser(policy=compat32).parsestr(utf8_decode(b))
def utf8_decode(b):
for codec in ('utf-8', 'windows-1252', 'latin1'):
try:
text = b.decode(codec)
break
except UnicodeDecodeError:
pass
else:
raise UnicodeDecodeError("Could not decode as utf-8 or windows-1252")
return text
def ident(s):
return s
# Convert a mapping to a dict (recursively)
def mapping2dict(mapping):
return {
k: mapping2dict(v) if isinstance(v, Mapping) else v
for k, v in mapping.items() if v
}
def parse_entry_points(s):
cp = CaseSensitiveConfigParser()
cp.read_string(s)
return mapping2dict(cp)
def parse_true(s):
return True
def parse_false(s):
return False
excluded_kinds = {'txt', 'rst', 'md'}
def parse_kind(s, **kwargs):
if 'kind' not in kwargs or not kwargs['kind'] or kwargs['kind'] in excluded_kinds:
kind = 'default'
else:
kind = kwargs['kind']
return {kind: s}
def parse_lines(s):
return s.splitlines()
# Basic actions:
def action_insert(d, k, v):
d[k] = v
def action_append(d, k, v):
if k not in d:
d[k] = []
d[k].append(v)
def action_update(d, k, v):
if k not in d:
d[k] = {}
d[k].update(v)
# Extended name matchers
def filter_kind_args(args):
if 'kind' in args:
kind = args['kind']
if not kind or kind in excluded_kinds:
kind = 'default'
else:
for ex in excluded_kinds:
if kind.endswith("."+ex):
kind = kind[:-(len(ex)+1)]
break
args['kind'] = kind
return args
def regex_name(regex, debug_name):
regex_comp = re.compile(regex)
def matcher(s):
m = regex_comp.search(s)
if m:
return filter_kind_args(m.groupdict())
else:
return None
matcher.__debug_name__ = debug_name
return matcher
def name_with_kind(name, kind):
def matcher(s):
if s == name:
return {'kind': kind}
else:
return None
matcher.__debug_name__ = name
return matcher
def parse_json(s):
if s and s.strip():
return json.loads(s)
else:
return {}
# Describe how to parse a wheel file
# If name matches a zip file entry, then we will do:
# Simple string match:
# value = parser(decoder(zf.read(name))
# Extended name match:
# value = parser(decoder(zf.read(name)), **name_matcher(name))
# action(dict, key, value)
WheelMeta = namedtuple('WheelMeta', [
# Filename for matching. Can be either a simple string, or a function
# that returns args to pass to the parser for a match, or None for no match.
'name',
'key', # Key to put in the info dict. Can be None to ignore.
'decoder', # Function to decode the value
'parser', # Function to parse the value
'action', # How to handle the value
])
handlers = [
WheelMeta('METADATA', 'metadata', msg_decode, msg_to_json, action_insert),
WheelMeta('WHEEL', 'wheel', msg_decode, msg_to_json, action_insert),
WheelMeta('RECORD', 'record', msg_decode, record_to_json, action_insert),
WheelMeta('RECORD.jws', 'record_signature', utf8_decode, parse_json, action_insert),
WheelMeta('DESCRIPTION.rst', 'description', utf8_decode, ident, action_insert),
WheelMeta('metadata.json', 'metadata_json', utf8_decode, parse_json, action_insert),
WheelMeta('pbr.json', 'pbr', utf8_decode, parse_json, action_insert),
WheelMeta('top_level.txt', 'top_level', utf8_decode, ident, action_insert),
WheelMeta('top_level.txt.orig', 'top_level', utf8_decode, ident, action_insert),
WheelMeta('entry_points.txt', 'entry_points', utf8_decode, parse_entry_points, action_insert),
WheelMeta('zip-safe', 'zip_safe', ident, parse_true, action_insert),
WheelMeta('not-zip-safe', 'zip_safe', ident, parse_false, action_insert),
WheelMeta('namespace_packages.txt', 'namespace_packages', utf8_decode, ident, action_insert),
WheelMeta('direct_url.json', 'direct_url', utf8_decode, parse_json, action_insert),
WheelMeta('INSTALLER', 'installer', utf8_decode, ident, action_insert),
WheelMeta('REQUESTED', 'requested', utf8_decode, ident, action_insert),
WheelMeta('dependency_links.txt', 'dependency_links', utf8_decode, parse_lines, action_insert),
WheelMeta('eager_resources.txt', 'eager_resources', utf8_decode, parse_lines, action_insert),
WheelMeta('SOURCES.txt', 'sources', utf8_decode, parse_lines, action_insert),
WheelMeta('SOURCES.txt.orig', 'sources', utf8_decode, parse_lines, action_insert),
WheelMeta('top_list.txt', 'top_list', utf8_decode, ident, action_insert),
WheelMeta('CHANGES.rst', 'changes', utf8_decode, ident, action_insert),
WheelMeta('RELEASE-NOTES.rst', 'release_notes', utf8_decode, ident, action_insert),
WheelMeta(name_with_kind('Apache-2.0.txt', 'Apache-2.0'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('BSD-3-Clause.txt', 'BSD-3-Clause'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('CC-BY-4.0.txt', 'CC-BY-4.0'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('MIT.txt', 'MIT'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('CC-PDDC.txt', 'CC-PDDC'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('dep5', 'dep5'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('LI_en.txt', 'default'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('legal.txt', 'default'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('open_source_license.txt', 'open_source'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('EULA.txt', 'EULA'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('REDIST.txt', 'REDIST'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('THIRDPARTY.txt', 'THIRDPARTY'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('HYPER_API_OSS_disclosure.txt', 'HYPER_API_OSS_disclosure'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('AUTHORS.google-crc32c', 'authors'), 'authors', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'^\..*$', 'dotfiles'), None, None, None, None), # Ignore dotfiles
WheelMeta(regex_name(r'(~|\.bak)$', 'backups'), None, None, None, None), # Ignore backup files
WheelMeta(regex_name(r'conflicted copy', 'dropbox'), None, None, None, None), # Ignore Dropbox conflict files
WheelMeta(regex_name(r'/$', 'dirs'), None, None, None, None), # Ignore directories
WheelMeta(regex_name(r'WHEELe', 'spelling'), None, None, None, None), # Ignore misspelled WHEEL
WheelMeta('LICENSE.pdf', None, None, None, None), # A PDF File?? Really???
WheelMeta(regex_name(r'(?i)((?P<kind>[\w.-]+)[_-])?third-party-programs\.txt$', 'third_party_programs'), 'third_party_programs', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?\.ABOUT$', 'about'), 'about', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)NOTICES?(-WHEEL)?(\.txt|\.rst|\.md|)$', 'notice'), 'notice', utf8_decode, parse_kind, action_insert),
WheelMeta(regex_name(r'(?i)(README|misc)(\.txt|\.rst|\.md|)$', 'notice'), 'readme', utf8_decode, ident, action_insert),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?AUTHORS(\.txt|\.rst|\.md|\.py|)$', 'authors'), 'authors', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)[-._](LICEN[CS]ES?)?$', 'license_before'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(LICEN[SC]ES?)[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|)?$', 'license_after'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)COPYING[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|\.md)?$', 'copying'), 'copying', utf8_decode, parse_kind, action_update),
]
simple_handlers = {h.name: h for h in handlers}
extended_handlers = [h for h in handlers if not isinstance(h.name, str)]
distinfo_re = re.compile(r'\.dist-info/(?P<name>.+)$')
def wheel_info(file):
info = {}
with zipfile.ZipFile(file) as zf:
for name in zf.namelist():
if m := distinfo_re.search(name):
data = zf.read(name)
d_name = m.group('name')
if d_name in simple_handlers:
h = simple_handlers[d_name]
if h.key is None: continue
h.action(info, h.key, h.parser(h.decoder(data)))
else:
for h in extended_handlers:
if (args := h.name(d_name)) is not None:
if h.key is None: break
value = h.parser(h.decoder(data), **args)
h.action(info, h.key, value)
break
else:
print(f'{file}: Unknown file in .dist-info: {name}', file=sys.stderr)
else:
pass
for h in handlers:
if h.key and h.key not in info:
info[h.key] = None
return info
def debug_name_handler(name):
if m := distinfo_re.search(name):
d_name = m.group('name')
if d_name in simple_handlers:
h = simple_handlers[d_name]
if h.key is None:
print(f'{d_name} -> simple: ignored')
else:
print(f"{d_name} -> simple: {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}")
else:
for h in extended_handlers:
if (args := h.name(m.group('name'))) is not None:
if h.key is None:
print(f'{d_name} -> extended: ignored by {h.name.__debug_name__})')
else:
argstr = ', '.join(f'{k}={v}' for k, v in args.items())
print(f"{d_name} -> extended: {h.name.__debug_name__}({argstr}) {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}")
break
else:
print(f'{d_name} -> Unknown')
def main():
import sys
for file in sys.argv[1:]:
print(f"Working on {file}", file=sys.stderr)
info = wheel_info(file)
out_name = os.path.basename(file) + '.json'
out_name = os.path.join('wheel_meta', out_name)
with open(out_name, 'w') as out:
json.dump(info, out)
out.write('\n')
def debug_main():
import sys
for line in sys.stdin:
debug_name_handler(line.strip())
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment