Created September 4, 2022 08:41
import sys
import os
import re
import json
import zipfile
from collections import defaultdict, namedtuple
from import Mapping
from email.parser import HeaderParser
from email.policy import compat32
from base64 import urlsafe_b64decode
import csv
import configparser
# For entry points
class CaseSensitiveConfigParser(configparser.ConfigParser):
optionxform = staticmethod(str)
# From PEP 566:
# It may be necessary to store metadata in a data structure which does not
# allow for multiple repeated keys, such as JSON.
# The canonical method to transform metadata fields into such a data structure
# is as follows:
# 1. The original key-value format should be read with email.parser.HeaderParser;
# 2. All transformed keys should be reduced to lower case. Hyphens should be
# replaced with underscores, but otherwise should retain all other characters;
# 3. The transformed value for any field marked with “(Multiple-use”) should be a
# single list containing all the original values for the given key;
# 4. The Keywords field should be converted to a list by splitting the original
# value on whitespace characters;
# 5. The message body, if present, should be set to the value of the description key.
# 6. The result should be stored as a string-keyed dictionary.
multiple_use = {
def msg_to_json(msg):
d = defaultdict(list)
for k, v in msg.items():
if k.lower() in multiple_use:
d[k.lower().replace('-', '_')].append(v)
elif k.lower() == 'keywords':
d['keywords'] = v.split()
d[k.lower().replace('-', '_')] = v
if msg.is_multipart():
desc = msg.get_payload(0).get_payload()
desc = msg.get_payload()
if desc: d['description'] = desc
return dict(d)
def record_to_json(record):
fields = ['path', 'hash', 'size']
rows = []
for row in csv.DictReader(record.get_payload().splitlines(), fieldnames=fields):
if row['size'] and row['size'].strip():
row['size'] = int(row['size'])
row['size'] = None
if row['hash']:
hash_desc = row['hash'].split('=',1)
if len(hash_desc) == 2:
kind, value = hash_desc
kind, value = 'sha256', hash_desc[0]
row['hash_algorithm'] = kind
row['hash_value'] = urlsafe_b64decode(value+'==').hex()
row['hash_algorithm'] = None
row['hash_value'] = None
del row['hash']
return rows
# Decoders
def msg_decode(b):
return HeaderParser(policy=compat32).parsestr(utf8_decode(b))
def utf8_decode(b):
for codec in ('utf-8', 'windows-1252', 'latin1'):
text = b.decode(codec)
except UnicodeDecodeError:
raise UnicodeDecodeError("Could not decode as utf-8 or windows-1252")
return text
def ident(s):
return s
# Convert a mapping to a dict (recursively)
def mapping2dict(mapping):
return {
k: mapping2dict(v) if isinstance(v, Mapping) else v
for k, v in mapping.items() if v
def parse_entry_points(s):
cp = CaseSensitiveConfigParser()
return mapping2dict(cp)
def parse_true(s):
return True
def parse_false(s):
return False
excluded_kinds = {'txt', 'rst', 'md'}
def parse_kind(s, **kwargs):
if 'kind' not in kwargs or not kwargs['kind'] or kwargs['kind'] in excluded_kinds:
kind = 'default'
kind = kwargs['kind']
return {kind: s}
def parse_lines(s):
return s.splitlines()
# Basic actions:
def action_insert(d, k, v):
d[k] = v
def action_append(d, k, v):
if k not in d:
d[k] = []
def action_update(d, k, v):
if k not in d:
d[k] = {}
# Extended name matchers
def filter_kind_args(args):
if 'kind' in args:
kind = args['kind']
if not kind or kind in excluded_kinds:
kind = 'default'
for ex in excluded_kinds:
if kind.endswith("."+ex):
kind = kind[:-(len(ex)+1)]
args['kind'] = kind
return args
def regex_name(regex, debug_name):
regex_comp = re.compile(regex)
def matcher(s):
m =
if m:
return filter_kind_args(m.groupdict())
return None
matcher.__debug_name__ = debug_name
return matcher
def name_with_kind(name, kind):
def matcher(s):
if s == name:
return {'kind': kind}
return None
matcher.__debug_name__ = name
return matcher
def parse_json(s):
if s and s.strip():
return json.loads(s)
return {}
# Describe how to parse a wheel file
# If name matches a zip file entry, then we will do:
# Simple string match:
# value = parser(decoder(
# Extended name match:
# value = parser(decoder(, **name_matcher(name))
# action(dict, key, value)
WheelMeta = namedtuple('WheelMeta', [
# Filename for matching. Can be either a simple string, or a function
# that returns args to pass to the parser for a match, or None for no match.
'key', # Key to put in the info dict. Can be None to ignore.
'decoder', # Function to decode the value
'parser', # Function to parse the value
'action', # How to handle the value
handlers = [
WheelMeta('METADATA', 'metadata', msg_decode, msg_to_json, action_insert),
WheelMeta('WHEEL', 'wheel', msg_decode, msg_to_json, action_insert),
WheelMeta('RECORD', 'record', msg_decode, record_to_json, action_insert),
WheelMeta('RECORD.jws', 'record_signature', utf8_decode, parse_json, action_insert),
WheelMeta('DESCRIPTION.rst', 'description', utf8_decode, ident, action_insert),
WheelMeta('metadata.json', 'metadata_json', utf8_decode, parse_json, action_insert),
WheelMeta('pbr.json', 'pbr', utf8_decode, parse_json, action_insert),
WheelMeta('top_level.txt', 'top_level', utf8_decode, ident, action_insert),
WheelMeta('top_level.txt.orig', 'top_level', utf8_decode, ident, action_insert),
WheelMeta('entry_points.txt', 'entry_points', utf8_decode, parse_entry_points, action_insert),
WheelMeta('zip-safe', 'zip_safe', ident, parse_true, action_insert),
WheelMeta('not-zip-safe', 'zip_safe', ident, parse_false, action_insert),
WheelMeta('namespace_packages.txt', 'namespace_packages', utf8_decode, ident, action_insert),
WheelMeta('direct_url.json', 'direct_url', utf8_decode, parse_json, action_insert),
WheelMeta('INSTALLER', 'installer', utf8_decode, ident, action_insert),
WheelMeta('REQUESTED', 'requested', utf8_decode, ident, action_insert),
WheelMeta('dependency_links.txt', 'dependency_links', utf8_decode, parse_lines, action_insert),
WheelMeta('eager_resources.txt', 'eager_resources', utf8_decode, parse_lines, action_insert),
WheelMeta('SOURCES.txt', 'sources', utf8_decode, parse_lines, action_insert),
WheelMeta('SOURCES.txt.orig', 'sources', utf8_decode, parse_lines, action_insert),
WheelMeta('top_list.txt', 'top_list', utf8_decode, ident, action_insert),
WheelMeta('CHANGES.rst', 'changes', utf8_decode, ident, action_insert),
WheelMeta('RELEASE-NOTES.rst', 'release_notes', utf8_decode, ident, action_insert),
WheelMeta(name_with_kind('Apache-2.0.txt', 'Apache-2.0'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('BSD-3-Clause.txt', 'BSD-3-Clause'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('CC-BY-4.0.txt', 'CC-BY-4.0'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('MIT.txt', 'MIT'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('CC-PDDC.txt', 'CC-PDDC'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('dep5', 'dep5'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('LI_en.txt', 'default'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('legal.txt', 'default'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('open_source_license.txt', 'open_source'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('EULA.txt', 'EULA'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('REDIST.txt', 'REDIST'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('THIRDPARTY.txt', 'THIRDPARTY'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('HYPER_API_OSS_disclosure.txt', 'HYPER_API_OSS_disclosure'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(name_with_kind('', 'authors'), 'authors', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'^\..*$', 'dotfiles'), None, None, None, None), # Ignore dotfiles
WheelMeta(regex_name(r'(~|\.bak)$', 'backups'), None, None, None, None), # Ignore backup files
WheelMeta(regex_name(r'conflicted copy', 'dropbox'), None, None, None, None), # Ignore Dropbox conflict files
WheelMeta(regex_name(r'/$', 'dirs'), None, None, None, None), # Ignore directories
WheelMeta(regex_name(r'WHEELe', 'spelling'), None, None, None, None), # Ignore misspelled WHEEL
WheelMeta('LICENSE.pdf', None, None, None, None), # A PDF File?? Really???
WheelMeta(regex_name(r'(?i)((?P<kind>[\w.-]+)[_-])?third-party-programs\.txt$', 'third_party_programs'), 'third_party_programs', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?\.ABOUT$', 'about'), 'about', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)NOTICES?(-WHEEL)?(\.txt|\.rst|\.md|)$', 'notice'), 'notice', utf8_decode, parse_kind, action_insert),
WheelMeta(regex_name(r'(?i)(README|misc)(\.txt|\.rst|\.md|)$', 'notice'), 'readme', utf8_decode, ident, action_insert),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?AUTHORS(\.txt|\.rst|\.md|\.py|)$', 'authors'), 'authors', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)[-._](LICEN[CS]ES?)?$', 'license_before'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)(LICEN[SC]ES?)[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|)?$', 'license_after'), 'license', utf8_decode, parse_kind, action_update),
WheelMeta(regex_name(r'(?i)COPYING[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|\.md)?$', 'copying'), 'copying', utf8_decode, parse_kind, action_update),
simple_handlers = { h for h in handlers}
extended_handlers = [h for h in handlers if not isinstance(, str)]
distinfo_re = re.compile(r'\.dist-info/(?P<name>.+)$')
def wheel_info(file):
info = {}
with zipfile.ZipFile(file) as zf:
for name in zf.namelist():
if m :=
data =
d_name ='name')
if d_name in simple_handlers:
h = simple_handlers[d_name]
if h.key is None: continue
h.action(info, h.key, h.parser(h.decoder(data)))
for h in extended_handlers:
if (args := is not None:
if h.key is None: break
value = h.parser(h.decoder(data), **args)
h.action(info, h.key, value)
print(f'{file}: Unknown file in .dist-info: {name}', file=sys.stderr)
for h in handlers:
if h.key and h.key not in info:
info[h.key] = None
return info
def debug_name_handler(name):
if m :=
d_name ='name')
if d_name in simple_handlers:
h = simple_handlers[d_name]
if h.key is None:
print(f'{d_name} -> simple: ignored')
print(f"{d_name} -> simple: {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}")
for h in extended_handlers:
if (args :='name'))) is not None:
if h.key is None:
print(f'{d_name} -> extended: ignored by {})')
argstr = ', '.join(f'{k}={v}' for k, v in args.items())
print(f"{d_name} -> extended: {}({argstr}) {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}")
print(f'{d_name} -> Unknown')
def main():
import sys
for file in sys.argv[1:]:
print(f"Working on {file}", file=sys.stderr)
info = wheel_info(file)
out_name = os.path.basename(file) + '.json'
out_name = os.path.join('wheel_meta', out_name)
with open(out_name, 'w') as out:
json.dump(info, out)
def debug_main():
import sys
for line in sys.stdin:
if __name__ == '__main__':
