Skip to content

Instantly share code, notes, and snippets.

@pcstout
Last active January 11, 2021 18:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pcstout/d6ff7b7dbb82cf509c1dcda115a56f9e to your computer and use it in GitHub Desktop.
Save pcstout/d6ff7b7dbb82cf509c1dcda115a56f9e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Copyright 2017-present, Bill & Melinda Gates Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script runs in three separate action steps to migrate a Synapse project from one storage location to another.
#
# 1. MOVE: Recursively Walks a Synapse project and creates new versions of all files at the new storage location,
# creating an index of the moved file entities as it does so.
# 2. DELETE: Deletes all older file entity versions that aren't in the new storage location. Can either use
# previously created index file from a MOVE action, or can be run fresh in which case a new index file
# is created after an additional recursive walk that records the deleted entities.
# 3. PURGE: Removes the file handles for all file entities logged from a DELETE step. This can only be run after
# a DELETE action using the same index file, because once the entities are deleted the file handles cannot
# be read from Synapse meta data any longer.
#
# Example usage:
# # make new versions of the files in project syn123456 to storage location 98765.
# # the storage location for project syn123456 should have already been set to 98765 or an error is generated.
# synapse_storage_fixer.py syn123456 -s 98765 -a move -u <user> -p <pass> -i /tmp/migrate.db
#
# # delete the old version(s) of the entity at the previous storage location (previous to 98765).
# # this deletes the Synapse records only and does not remove the file objects from the underlying storage layer
# # (e.g. S3)
# synapse_storage_fixer.py syn123456 -s 98765 -a delete -u <user> -p <pass> -i /tmp/migrate.db
#
# # remove the file handles from a previous delete action. this must use the same -i index file that was created
# # from a delete action.
# synapse_storage_fixer.py syn123456 -s 98765 -a purge -u <user> -p <pass> -i /tmp/migrate.db
import concurrent
import concurrent.futures
import sys
import os
import argparse
import getpass
import logging
import pathlib
from datetime import datetime
import synapseclient
import synapseutils as syn_utils
import sqlite3
class SynapseStorageFixer:
class Actions:
MOVE = 'move' # creates new versions of the entities in the specified storage location
DELETE = 'delete' # deletes the Synapse entities of the older versions
PURGE = 'purge' # deletes the file handles of all entities deleted by a previously run DELETE action
class MoveStatus:
UNMOVED = 0
MOVED = 1
DELETED = 2
class FileHandleStatus:
UNPURGED = 0
PURGED = 1
"""
Sets the storage location on a Synapse Project.
"""
def __init__(self, action, project_id, storage_location_id, username=None, password=None, synapse_cache=None,
max_threads=None, dry_run=False, index_db_path=None):
self._action = action
self._project_id = project_id
self._storage_location_id = storage_location_id
if not self._storage_location_id or str(self._storage_location_id).strip() == '':
raise Exception('storage_location_id is required!')
self._username = username
self._password = password
self._synapse_cache = synapse_cache
self._max_threads = max_threads
self._dry_run = dry_run
self._index_db_path = index_db_path
if not self._index_db_path:
self._index_db_path = '{0}_index.db'.format(self._project_id)
self._index_db_path = os.path.abspath(os.path.expandvars(os.path.expanduser(self._index_db_path)))
if self._synapse_cache:
self._synapse_cache = os.path.join(self._synapse_cache, '.synapseCache')
def _check_table(self, cursor, table_name):
cursor.execute("select name from sqlite_master where type = 'table' and name = ?", [table_name])
if not cursor.fetchone():
raise ValueError("{} table does not exist")
def run(self):
self.synapse_login()
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor, \
sqlite3.connect(self._index_db_path) as conn:
if self._action == self.Actions.PURGE:
self._run_deleted_version_purge(executor, conn)
else:
self._run_storage_location_action(executor, conn)
def _get_max_workers(self, executor):
# accessing a private variable here, but useful to know how many
# workers there were even in the default case where we let ThreadPoolExecutor pick a default
return executor._max_workers
def _index_files(self, conn, cursor, project):
# initialize a sqlite db for collecting the changes. for mass changes involving hundreds of thousands
# of existing files it will be less taxing on Synapse if the changes are executed ordered by their Synapse id
# so we create an sqlite db to collect
logging.info("Creating index of file entities at {}".format(self._index_db_path))
cursor.execute("""
create table if not exists file_entities (
id text primary key,
parent_path text not null,
parent_id text not null,
filename text not null,
status integer not null
);
""")
cursor.execute("""
create table if not exists deleted_version_file_handles (
id text,
version integer,
storage_location_id integer not null,
file_handle_id text not null,
status integer not null,
primary key(id, version),
foreign key (id) references file_entities(id)
);
""")
existing_count = 0
inserted_count = 0
walked_path = syn_utils.walk(self._synapse_client, project.id)
for dirpath, dirnames, filenames in walked_path:
parent_path, parent_id = dirpath
dir_existing_count, dir_inserted_count = self._insert_file_entities(
cursor, parent_path, parent_id, filenames
)
existing_count += dir_existing_count
inserted_count += dir_inserted_count
logging.info("Scanned {} files from {} ({}), {} new".format(
existing_count + inserted_count, parent_id, parent_path, inserted_count
))
conn.commit()
logging.info("Scanned {} total files, {} new".format(existing_count + inserted_count, inserted_count))
def _chunk(self, iterable, n):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def _insert_file_entities(self, cursor, parent_path, parent_id, filenames):
existing_count = 0
insert_count = 0
for filename_chunk in self._chunk(filenames, 500):
files = {f[1]: f[0] for f in filename_chunk}
# check if entity is already indexed (in case we need to abort and resume
# during indexing of a large project).
file_ids = list(files.keys())
results = cursor.execute(
"select id from file_entities where id in ({})".format(
",".join(['?' for _ in range(len(files))])
),
file_ids
)
existing_file_ids = set(i[0] for i in results)
existing_count += len(existing_file_ids)
missing_file_ids = set(file_ids).difference(existing_file_ids)
if missing_file_ids:
insert_values = []
for missing_file_id in missing_file_ids:
insert_values.append((missing_file_id, parent_path, parent_id, files[missing_file_id], 0))
cursor.executemany(
"insert into file_entities (id, parent_path, parent_id, filename, status) values (?, ?, ?, ?, ?)",
insert_values
)
insert_count += len(missing_file_ids)
return existing_count, insert_count
def _run_storage_location_action(self, executor, conn):
logging.info('Executing action: {0}'.format(self._action))
if self._dry_run:
logging.info('Executing Dry Run Only!')
project = self._synapse_client.get(self._project_id)
logging.info('Fixing Storage Location on Project: {0}'.format(project.name))
logging.info('Storage Location ID: {0}'.format(self._storage_location_id))
if self._synapse_cache:
logging.info('Synapse Cache: {0}'.format(self._synapse_cache))
project_setting = self._synapse_client.getProjectSetting(project, 'upload')
if project_setting is None or self._storage_location_id not in project_setting['locations']:
logging.info('')
logging.warning('WARNING: Project storage location does not match intended location.')
uinput = input('{0} [y/n]: '.format('Continue?'))
if uinput != 'y':
return
cursor = conn.cursor()
try:
self._check_table(cursor, 'file_entities')
logging.warning("Found existing file entity index file at {}".format(self._index_db_path))
uinput = input("Use existing file entity index ({})? [y/n]: ".format(self._index_db_path))
if uinput != 'y':
logging.info("Aborting, remove the database file at {} or use a different index_db_path".format(
self._index_db_path
))
return
except ValueError:
# no existing index yet, that's fine we'll build one as part of this action
self._index_files(conn, cursor, project)
# if we're moving files then we want any files that are scanned.
# if we're deleting files then we want any files that
update_status = self.MoveStatus.DELETED if self._action == self.Actions.DELETE else self.MoveStatus.MOVED
futures = []
last_id = ''
total_finished_count = 0
total_errored_count = 0
if self._action == self.Actions.DELETE:
action_fn = self._perform_delete
finish_fn = self._index_deleted_versions_file_handles
elif self._action == self.Actions.MOVE:
action_fn = self._perform_move
finish_fn = None
else:
raise ValueError("Shouldn't reach here")
count = cursor.execute(
"""
select count(*)
from file_entities
where status < ?
""",
[update_status]
).fetchone()[0]
while True:
limit = self._get_max_workers(executor) - len(futures)
results = cursor.execute(
"""
select id, parent_path, parent_id, filename
from file_entities
where id > ? and status < ?
order by id asc
limit ?
""",
(last_id, update_status, limit)
)
rows = [r for r in results]
if not rows:
break
for r in rows:
syn_id = r[0]
dirpath = (r[1], r[2])
filename = r[3]
syn_path = '{0}/{1} ({2})'.format('/'.join(dirpath), filename, syn_id)
logging.info('Checking: {0}'.format(syn_path))
future = executor.submit(self._perform_storage_action(action_fn), syn_path, syn_id)
futures.append(future)
concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
futures, finished_count, errored_count = \
self._mark_finished(conn, cursor, futures, update_status, finish_fn)
total_finished_count += finished_count
total_errored_count += errored_count
last_id = rows[-1][0]
completed_count = total_finished_count + total_errored_count
logging.info("Completed {} of {}, ({}%)".format(completed_count, count, (completed_count * 100) // count))
# wait for any remaining actions to finish
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
_, finished_count, errored_count = self._mark_finished(conn, cursor, futures, update_status, finish_fn)
total_finished_count += finished_count
total_errored_count += errored_count
if self._dry_run:
logging.info('')
logging.info('Dry Run Only, no changes committed.')
logging.info('')
logging.info(
"Done. Processed {} entities. Encountered errors on {}. See log output for details".format(
total_finished_count, total_errored_count
)
)
def _mark_finished(self, conn, cursor, futures, update_status, finish_fn):
# update the index records of all files on which the action has been performed.
# this is done from the primary thread to operate with a single threaded SQLite
unfinished = []
finished_ids = []
errored_count = 0
for future in futures:
if future.done():
if future.exception():
errored_count += 1
else:
# for any errored Futures we rely on the execution itself to have logged useful info
result = future.result()
syn_id = result if not finish_fn else finish_fn(cursor, result)
finished_ids.append(syn_id)
else:
unfinished.append(future)
if finished_ids and not self._dry_run:
cursor.executemany(
"update file_entities set status = ? where id = ?",
[(update_status, i,) for i in finished_ids]
)
conn.commit()
return unfinished, len(finished_ids), errored_count
def _perform_storage_action(self, fn):
def wrapped_fn(syn_path, syn_id):
try:
return fn(syn_path, syn_id)
except Exception:
logging.exception("Encountered an error during {} of {}".format(self._action, syn_id))
raise
return wrapped_fn
def _perform_move(self, syn_path, syn_id):
# Fix the storage location.
syn_file = self._synapse_client.get(syn_id, downloadFile=False)
syn_file_storage_location_id = syn_file['_file_handle']['storageLocationId']
if syn_file_storage_location_id != self._storage_location_id:
logging.info('Moving: {0}'.format(syn_path))
if not self._dry_run:
syn_file = self._synapse_client.get(syn_id, downloadFile=True)
old_md5 = syn_file['_file_handle']['contentMd5']
self._synapse_client.cache.remove(syn_file)
updated_syn_file = self._synapse_client.store(syn_file, forceVersion=True)
new_md5 = updated_syn_file['_file_handle']['contentMd5']
if new_md5 != old_md5: raise Exception('ERROR: MD5 does not match: {0}'.format(syn_path))
self._synapse_client.cache.remove(syn_file, delete=True)
if updated_syn_file['_file_handle']['storageLocationId'] != self._storage_location_id:
raise Exception('ERROR: Storage location does not match intended location: {0}'.format(syn_path))
else:
logging.info('Moved: {0}'.format(syn_path))
else:
logging.info('Skipping Move (Dry Run): {0}'.format(syn_path))
return syn_id
def _perform_delete(self, syn_path, syn_id):
# Delete the versions in the old bucket.
current_syn_file = self._synapse_client.get(syn_id, downloadFile=False)
current_syn_file_handle = current_syn_file['_file_handle']
current_syn_file_storage_location_id = current_syn_file_handle['storageLocationId']
deleted_version_file_handles = []
# Make sure the current version of the file is on the correct bucket before deleting anything.
if current_syn_file_storage_location_id == self._storage_location_id:
versions = list(self._synapse_client._GET_paginated('/entity/{0}/version'.format(syn_id)))
for version_data in versions:
version_number = version_data['versionNumber']
if version_number != current_syn_file.versionNumber:
check_file, file_handle_id, check_storage_location_id = \
self.get_syn_file_by_version(syn_id, version_number)
if check_storage_location_id != self._storage_location_id:
if not self._dry_run:
logging.info('Deleting: {0}, Version: {1}, Storage Location: {2}'.format(
syn_path,
version_number,
check_storage_location_id
))
self._synapse_client.restDELETE("/entity/{}/version/{}?skipTrashCan=true".format(
syn_id,
version_number
))
deleted_version_file_handles.append((
version_number,
check_storage_location_id,
file_handle_id,
))
else:
logging.info(
'Skipping Delete (Dry Run): {0}, Version: {1}, Storage Location: {2}'.format(
syn_path,
version_number,
check_storage_location_id
))
else:
logging.error('Storage location of current version does not match: {0}'.format(syn_path))
return syn_id, deleted_version_file_handles
def _index_deleted_versions_file_handles(self, cursor, result):
syn_id, version_file_handles = result
insert_values = []
for version_file_handle in version_file_handles:
insert_values.append((syn_id, *version_file_handle))
cursor.executemany("""
insert into deleted_version_file_handles (id, version, storage_location_id, file_handle_id, status)
values (?, ?, ?, ?, 0)""",
insert_values
)
return syn_id
def _run_deleted_version_purge(self, executor, conn):
logging.info('Executing action: {0}'.format(self._action))
# 1/7/2021 (Per Jordan @ Synapse)
logging.info('Purge not enabled. Exporting database to CSV for upload to Synapse support for purging.')
csv_filename = os.path.join(os.path.dirname(self._index_db_path),
'{0}_deleted_version_file_handles.csv'.format(self._project_id))
sys_cmd = "sqlite3 -header -csv {0} 'select id, version, storage_location_id, file_handle_id from deleted_version_file_handles' > {1}".format(
self._index_db_path, csv_filename)
logging.info('Executing: {0}'.format(sys_cmd))
os.system(sys_cmd)
logging.info('Upload file: {0} to Synapse and share with user: jordank'.format(csv_filename))
return False
# 1/7/2021
if self._dry_run:
logging.info('Executing Dry Run Only!')
cursor = conn.cursor()
try:
self._check_table(cursor, 'deleted_version_file_handles')
except ValueError as ex:
raise ValueError("""
No existing deletion index found in {}.
The {} action only be run after the {} acton, using the same index_db_path
""".format(
self._index_db_path,
self.Actions.PURGE,
self.Actions.DELETE,
)) from ex
last_id = ''
last_version = -1
futures = []
total_finished_count = 0
total_errored_count = 0
count = cursor.execute(
"""
select count(*)
from deleted_version_file_handles
where status = ?
""",
[self.FileHandleStatus.UNPURGED]
).fetchone()[0]
if count == 0:
# there are no deletable records in the sqlite db.
# Either the delete action wasn't run first, there are no deletable versions in the project,
# or the purge action was already run and complete.
logging.warning("No deletable file versions found in {}. Was the delete action run first?".format(
self._index_db_path)
)
return
while True:
limit = self._get_max_workers(executor) - len(futures)
results = cursor.execute(
"""
select id, version, file_handle_id
from deleted_version_file_handles
where status = ? and (id > ? or (id = ? and version > ?))
order by id, version
limit ?
""",
[self.FileHandleStatus.UNPURGED, last_id, last_id, last_version, limit]
)
rows = [r for r in results]
if not rows:
break
for r in rows:
syn_id = r[0]
version = r[1]
file_handle_id = r[2]
future = executor.submit(self._perform_file_handle_purge, syn_id, version, file_handle_id)
futures.append(future)
concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
futures, finished_count, errored_count = self._handle_purge_futures(cursor, futures)
total_finished_count += finished_count
total_errored_count += errored_count
last_id = syn_id
last_version = version
completed_count = total_finished_count + total_errored_count
logging.info("Completed {} of {}, ({}%)".format(completed_count, count, (completed_count * 100) // count))
# wait for all remaining
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
_, finished_count, errored_count = self._handle_purge_futures(cursor, futures)
total_finished_count += finished_count
total_errored_count += errored_count
logging.info("Successfully removed {} file handles. Encountered errors on {}, see the output for causes".format(
total_finished_count, total_errored_count
))
def _handle_purge_futures(self, cursor, futures):
unfinished = []
finished_versions = []
errored_count = 0
for future in futures:
if future.done():
if future.exception():
# for any errored Futures we rely on the execution itself to have logged useful info
errored_count += 1
else:
result = future.result()
finished_versions.append(result)
else:
unfinished.append(future)
if finished_versions and not self._dry_run:
cursor.executemany(
"update deleted_version_file_handles set status = 1 where id = ? and version = ?",
finished_versions
)
return unfinished, len(finished_versions), errored_count
def _perform_file_handle_purge(self, syn_id, version, file_handle_id):
# note that this will error if there are any remaining references to the Synapse entity
# (i.e. another entity referencing the same file handle, the entity still in the trash can, etc)
try:
if not self._dry_run:
self._synapse_client.restDELETE(
f"/fileHandle/{file_handle_id}",
endpoint=self._synapse_client.fileHandleEndpoint
)
logging.info("Removed file handle {0}, Version: {1}, File handle id: {2}".format(
syn_id,
version,
file_handle_id
))
else:
logging.info(
"Skipping File handle removal (Dry Run): {0}, Version: {1}, File handle id: {2}".format(
syn_id,
version,
file_handle_id
))
return syn_id, version
except Exception:
logging.exception(
"Unable to remove file handle {} for id {}. There may sill be Synapse references to it".format(
file_handle_id, syn_id
)
)
raise
def get_syn_file_by_version(self, syn_id, version):
syn_file = None
file_handle_id = None
check_storage_location_id = None
if version is None:
raise Exception('version is required.')
try:
syn_file = self._synapse_client.get(syn_id, downloadFile=False, version=version)
file_handle = syn_file['_file_handle']
file_handle_id = file_handle['id']
check_storage_location_id = file_handle['storageLocationId']
except Exception as ex:
if 'Cannot find a node with id' in str(ex):
# Version does not exist.
pass
else:
logging.exception(ex)
return syn_file, file_handle_id, check_storage_location_id
def synapse_login(self):
logging.info('Logging into Synapse...')
self._username = self._username or os.getenv('SYNAPSE_USERNAME')
self._password = self._password or os.getenv('SYNAPSE_PASSWORD')
if not self._username:
self._username = input('Synapse username: ')
if not self._password:
self._password = getpass.getpass(prompt='Synapse password: ')
try:
self._synapse_client = synapseclient.Synapse(skip_checks=True)
self._synapse_client.login(self._username, self._password, silent=True)
self._synapse_client.multi_threaded = True
if self._synapse_cache is not None:
self._synapse_client.cache.cache_root_dir = self._synapse_cache
except Exception as ex:
self._synapse_client = None
logging.info('Synapse login failed: {0}'.format(str(ex)))
return self._synapse_client is not None
class LogFilter(logging.Filter):
FILTERS = [
'##################################################',
'Uploading file to Synapse storage',
'Connection pool is full, discarding connection:'
]
def filter(self, record):
for filter in self.FILTERS:
if filter in record.msg:
return False
return True
def main():
parser = argparse.ArgumentParser(description="""
A script to migrate a project to a new Synapse storage location, with optional additional stages for deleting
the previous versions at the previous storage location and removing the associated file handles from the
underlying storage layer.
""")
parser.add_argument('project_id', metavar='project-id',
help='The Synapse Project ID to fix.')
parser.add_argument('-a', '--action',
choices=[
SynapseStorageFixer.Actions.MOVE,
SynapseStorageFixer.Actions.DELETE,
SynapseStorageFixer.Actions.PURGE,
],
default=SynapseStorageFixer.Actions.MOVE,
help="""The action to take: MOVE the files to a different storage location,
DELETE the versions not on the storage location,
or PURGE the file handles from project previously deleted with this script.""")
parser.add_argument('-s', '--storage-location', metavar='storage-location', type=int, required=True,
help='The storage location ID that should be set. (Gates: 33266, Test: 41104)')
parser.add_argument('-t', '--threads',
help='The maximum number of threads to use.',
type=int,
default=None)
parser.add_argument('-u', '--username',
help='Synapse username.', default=None)
parser.add_argument('-p', '--password',
help='Synapse password.', default=None)
parser.add_argument('-i', '--index_db_path', default=None,
help='Path to store a sqlite index of files to be moved/deleted. Defaults to synapse project ID.')
parser.add_argument('-ll', '--log-level',
help='Set the logging level.',
default='INFO')
parser.add_argument('-ld', '--log-dir',
help='Set the directory where the log file will be written.')
parser.add_argument('-sc', '--synapse-cache',
help='Set the directory where the synapse cache will reside.')
parser.add_argument('-d', '--dry-run', default=False,
action='store_true', help='Dry run only. Do not update or delete anything.')
args = parser.parse_args()
log_level = getattr(logging, args.log_level.upper())
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
log_filename = '{0}.log'.format(timestamp)
if args.log_dir:
log_filename = os.path.join(expand_path(args.log_dir), log_filename)
else:
log_filename = os.path.join(app_log_dir(), log_filename)
ensure_dirs(os.path.dirname(log_filename))
logging.basicConfig(
filename=log_filename,
filemode='w',
format='%(asctime)s %(levelname)s: %(message)s',
level=log_level
)
# Add console logging.
console = logging.StreamHandler()
console.setLevel(log_level)
console.setFormatter(logging.Formatter('%(message)s'))
logging.getLogger().addHandler(console)
log_filter = LogFilter()
for logger in [logging.getLogger(name) for name in logging.root.manager.loggerDict]:
logger.addFilter(log_filter)
print('Logging output to: {0}'.format(log_filename))
syn_cache_path = expand_path(args.synapse_cache) if args.synapse_cache else None
try:
SynapseStorageFixer(
args.action,
args.project_id,
args.storage_location,
username=args.username,
password=args.password,
synapse_cache=syn_cache_path,
max_threads=args.threads,
dry_run=args.dry_run,
index_db_path=args.index_db_path,
).run()
except Exception as ex:
logging.exception(ex)
raise
finally:
print('Logged output to: {0}'.format(log_filename))
def expand_path(local_path):
var_path = os.path.expandvars(local_path)
expanded_path = os.path.expanduser(var_path)
return os.path.abspath(expanded_path)
def ensure_dirs(local_path):
if not os.path.isdir(local_path):
os.makedirs(local_path)
def app_dir():
return os.path.join(pathlib.Path.home(), '.syntools')
def app_log_dir():
return os.path.join(app_dir(), 'logs')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment