Last active
January 27, 2017 00:38
-
-
Save trishankkarthik/e5d8134a4052f712b7416c76787b077b to your computer and use it in GitHub Desktop.
Pseudocode for TAP 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fnmatch | |
import json | |
import os.path | |
import tuf.client.updater | |
import tuf.settings | |
def get_targetinfo(target_filename): | |
'''Return the TARGETINFO, if any, for a target with the given filename.''' | |
with open('map.json', 'rb') as map_file: | |
the_map = json.load(map_file) | |
# repository dirname: list of mirror URLs | |
dirname_to_mirrors = the_map['repositories'] | |
for dirname in dirname_to_mirrors: | |
# Each repository must cache its metadata in a separate location. | |
assert os.path.isdir(dirname) | |
# The latest known root metadata file must already be on disk. | |
assert os.path.isfile('{}/current/root.json'.format(dirname)) | |
# Iterate over mappings. | |
for mapping in the_map['mappings']: | |
# If this mapping is relevant to the target... | |
if paths_match_target(mapping['paths'], target_filename): | |
targetinfos = [] | |
# Use the *unmodified* TUF updater for a single repository to fetch the | |
# target info from each repository. | |
for dirname in mapping['repositories']: | |
targetinfo = update_from_repository(dirname, dirname_to_mirrors, | |
target_filename) | |
targetinfos.append(targetinfo) | |
# If the target info on each repository is equal to the others, | |
# and it is not empty, then return the target info. | |
if targets_are_equal_and_not_empty(targetinfos): return targetinfo | |
# If we are here, it means either the mapping is irrelevant to the target, | |
# or the targets were missing from all repositories in this mapping, or the | |
# targets on all repositories did not match. In that case, are we allowed to | |
# continue to the next mapping? | |
if mapping['terminating']: return None | |
# If we are here, it means either there were no mappings, or none of the | |
# mappings provided the target. | |
return None | |
def paths_match_target(paths, target_filename): | |
for path in paths: | |
if fnmatch.fnmatch(path, target_filename): return True | |
# If we are here, then none of the paths are relevant to the target. | |
return False | |
# A global dictionary mapping repositories to TUF updaters. | |
dirname_to_updaters = {} | |
def get_updater(dirname, dirname_to_mirrors): | |
# NOTE: Do not refresh metadata for a repository that has been visited. | |
global dirname_to_updaters | |
updater = dirname_to_updaters.get(dirname) | |
if not updater: | |
# Unimportant: some massaging for unmodified TUF client. | |
mirrors = {} | |
for url in dirname_to_mirrors[dirname]: | |
mirrors[url] = { | |
'url_prefix': url, | |
'metadata_path': 'metadata', | |
'targets_path': 'targets', | |
'confined_target_dirs': [''] | |
} | |
# NOTE: State (e.g., keys) should NOT be shared across different updater | |
# instances. | |
updater = tuf.client.updater.Updater(dirname, mirrors) | |
try: updater.refresh() | |
except: return None | |
else: dirname_to_updaters[dirname] = updater | |
return updater | |
def update_from_repository(dirname, dirname_to_mirrors, target_filename): | |
# Set the repository directory containing the metadata. | |
tuf.settings.repository_directory = dirname | |
updater = get_updater(dirname, dirname_to_mirrors) | |
try: return updater.get_one_valid_targetinfo(target_filename) | |
except: return None | |
# If not empty, check only that length and hashes are equal; ignore custom | |
# targets metadata. | |
def targets_are_equal_and_not_empty(targetinfos): | |
# Target is empty. | |
if len(targetinfos) == 0: return False | |
else: | |
prev_targetinfo = targetinfos[0] | |
# Target is empty. | |
if not prev_targetinfo: return False | |
else: | |
for curr_targetinfo in targetinfos[1:]: | |
# Target is empty. | |
if not curr_targetinfo: return False | |
else: | |
prev_length = prev_targetinfo['length'] | |
curr_length = curr_targetinfo['length'] | |
if prev_length != curr_length: return False | |
prev_hashes = prev_targetinfo['hashes'] | |
curr_hashes = curr_targetinfo['hashes'] | |
if prev_hashes.keys() != curr_hashes.keys(): return False | |
for function, prev_digest in prev_hashes.items(): | |
if prev_digest != curr_hashes[function]: return False | |
prev_targetinfo = curr_targetinfo | |
# If we are here, then all the targets are equal. | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment