/synapse_storage_fixer.py Secret
Last active
November 19, 2020 20:09
-
-
Save jkiang13/5de5c062921916367defd8bf711b3bdc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright 2017-present, Bill & Melinda Gates Foundation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# This script runs in three separate action steps to migrate a Synapse project from one storage location to another. | |
# | |
# 1. MOVE: Recursively Walks a Synapse project and creates new versions of all files at the new storage location, | |
# creating an index of the moved file entities as it does so. | |
# 2. DELETE: Deletes all older file entity versions that aren't in the new storage location. Can either use | |
# previously created index file from a MOVE action, or can be run fresh in which case a new index file | |
# is created after an additional recursive walk that records the deleted entities. | |
# 3. PURGE: Removes the file handles for all file entities logged from a DELETE step. This can only be run after | |
# a DELETE action using the same index file, because once the entities are deleted the file handles cannot | |
# be read from Synapse meta data any longer. | |
# | |
# Example usage: | |
# # make new versions of the files in project syn123456 to storage location 98765. | |
# # the storage location for project syn123456 should have already been set to 98765 or an error is generated. | |
# synapse_storage_fixer.py syn123456 -s 98765 -a move -u <user> -p <pass> -i /tmp/migrate.db | |
# | |
# # delete the old version(s) of the entity at the previous storage location (previous to 98765). | |
# # kthis deletes the Synapse records only and does not remove the file objects from the underlying storage layer | |
# # (e.g. S3) | |
# synapse_storage_fixer.py syn123456 -s 98765 -a delete -u <user> -p <pass> -i /tmp/migrate.db | |
# | |
# # remove the file handles from a previous delete action. this must use the same -i index file that was created | |
# # from a delete action. | |
# synapse_storage_fixer.py syn123456 -s 98765 -a purge -u <user> -p <pass> -i /tmp/migrate.db | |
import concurrent | |
import concurrent.futures | |
import sys | |
import os | |
import argparse | |
import getpass | |
import logging | |
import pathlib | |
from datetime import datetime | |
import synapseclient | |
import synapseutils as syn_utils | |
import sqlite3 | |
class SynapseStorageFixer: | |
class Actions: | |
MOVE = 'move' # creates new versions of the entities in the specified storage location | |
DELETE = 'delete' # deletes the Synapse entities of the older versions | |
PURGE = 'purge' # deletes the file handles of all entities deleted by a previously run DELETE action | |
class MoveStatus: | |
UNMOVED = 0 | |
MOVED = 1 | |
DELETED = 2 | |
class FileHandleStatus: | |
UNPURGED = 0 | |
PURGED = 1 | |
""" | |
Sets the storage location on a Synapse Project. | |
""" | |
def __init__(self, action, project_id, storage_location_id, username=None, password=None, synapse_cache=None, | |
max_threads=None, dry_run=False, index_db_path=None): | |
self._action = action | |
self._project_id = project_id | |
self._storage_location_id = storage_location_id | |
if not self._storage_location_id or str(self._storage_location_id).strip() == '': | |
raise Exception('storage_location_id is required!') | |
self._username = username | |
self._password = password | |
self._synapse_cache = synapse_cache | |
self._max_threads = max_threads | |
self._dry_run = dry_run | |
self._index_db_path = index_db_path | |
if self._synapse_cache: | |
self._synapse_cache = os.path.join(self._synapse_cache, '.synapseCache') | |
def _check_table(self, cursor, table_name): | |
cursor.execute("select name from sqlite_master where type = 'table' and name = ?", [table_name]) | |
if not cursor.fetchone(): | |
raise ValueError("{} table does not exist") | |
def run(self): | |
self.synapse_login() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor, \ | |
sqlite3.connect(self._index_db_path) as conn: | |
if self._action == self.Actions.PURGE: | |
self._run_deleted_version_purge(executor, conn) | |
else: | |
self._run_storage_location_action(executor, conn) | |
def _get_max_workers(self, executor): | |
# accessing a private variable here, but useful to know how many | |
# workers there were even in the default case where we let ThreadPoolExecutor pick a default | |
return executor._max_workers | |
def _index_files(self, conn, cursor, project): | |
# initialize a sqlite db for collecting the changes. for mass changes involving hundreds of thousands | |
# of existing files it will be less taxing on Synapse if the changes are executed ordered by their Synapse id | |
# so we create an sqlite db to collect | |
logging.info("Creating index of file entities at {}".format(self._index_db_path)) | |
cursor.execute(""" | |
create table if not exists file_entities ( | |
id text primary key, | |
parent_path text not null, | |
parent_id text not null, | |
filename text not null, | |
status integer not null | |
); | |
""") | |
cursor.execute(""" | |
create table if not exists deleted_version_file_handles ( | |
id text, | |
version integer, | |
storage_location_id integer not null, | |
file_handle_id text not null, | |
status integer not null, | |
primary key(id, version), | |
foreign key (id) references file_entities(id) | |
); | |
""") | |
existing_count = 0 | |
inserted_count = 0 | |
walked_path = syn_utils.walk(self._synapse_client, project.id) | |
for dirpath, dirnames, filenames in walked_path: | |
parent_path, parent_id = dirpath | |
dir_existing_count, dir_inserted_count = self._insert_file_entities( | |
cursor, parent_path, parent_id, filenames | |
) | |
existing_count += dir_existing_count | |
inserted_count += dir_inserted_count | |
logging.info("Scanned {} files from {} ({}), {} new".format( | |
existing_count + inserted_count, parent_id, parent_path, inserted_count | |
)) | |
conn.commit() | |
logging.info("Scanned {} total files, {} new".format(existing_count + inserted_count, inserted_count)) | |
def _chunk(self, iterable, n): | |
l = len(iterable) | |
for ndx in range(0, l, n): | |
yield iterable[ndx:min(ndx + n, l)] | |
def _insert_file_entities(self, cursor, parent_path, parent_id, filenames): | |
existing_count = 0 | |
insert_count = 0 | |
for filename_chunk in self._chunk(filenames, 500): | |
files = {f[1]: f[0] for f in filename_chunk} | |
# check if entity is already indexed (in case we need to abort and resume | |
# during indexing of a large project). | |
file_ids = list(files.keys()) | |
results = cursor.execute( | |
"select id from file_entities where id in ({})".format( | |
",".join(['?' for _ in range(len(files))]) | |
), | |
file_ids | |
) | |
existing_file_ids = set(i[0] for i in results) | |
existing_count += len(existing_file_ids) | |
missing_file_ids = set(file_ids).difference(existing_file_ids) | |
if missing_file_ids: | |
insert_values = [] | |
for missing_file_id in missing_file_ids: | |
insert_values.append((missing_file_id, parent_path, parent_id, files[missing_file_id], 0)) | |
cursor.executemany( | |
"insert into file_entities (id, parent_path, parent_id, filename, status) values (?, ?, ?, ?, ?)", | |
insert_values | |
) | |
insert_count += len(missing_file_ids) | |
return existing_count, insert_count | |
def _run_storage_location_action(self, executor, conn): | |
logging.info('Executing action: {0}'.format(self._action)) | |
if self._dry_run: | |
logging.info('Executing Dry Run Only!') | |
project = self._synapse_client.get(self._project_id) | |
logging.info('Fixing Storage Location on Project: {0}'.format(project.name)) | |
logging.info('Storage Location ID: {0}'.format(self._storage_location_id)) | |
if self._synapse_cache: | |
logging.info('Synapse Cache: {0}'.format(self._synapse_cache)) | |
project_setting = self._synapse_client.getProjectSetting(project, 'upload') | |
if project_setting is None or self._storage_location_id not in project_setting['locations']: | |
logging.info('') | |
logging.warning('WARNING: Project storage location does not match intended location.') | |
uinput = input('{0} [y/n]: '.format('Continue?')) | |
if uinput != 'y': | |
return | |
cursor = conn.cursor() | |
try: | |
self._check_table(cursor, 'file_entities') | |
logging.warning("Found existing file entity index file at {}".format(self._index_db_path)) | |
uinput = input("Use existing file entity index ({})? [y/n]: ".format(self._index_db_path)) | |
if uinput != 'y': | |
logging.info("Aborting, remove the database file at {} or use a different index_db_path".format( | |
self._index_db_path | |
)) | |
return | |
except ValueError: | |
# no existing index yet, that's fine we'll build one as part of this action | |
self._index_files(conn, cursor, project) | |
# if we're moving files then we want any files that are scanned. | |
# if we're deleting files then we want any files that | |
update_status = self.MoveStatus.DELETED if self._action == self.Actions.DELETE else self.MoveStatus.MOVED | |
futures = [] | |
last_id = '' | |
total_finished_count = 0 | |
total_errored_count = 0 | |
if self._action == self.Actions.DELETE: | |
action_fn = self._perform_delete | |
finish_fn = self._index_deleted_versions_file_handles | |
elif self._action == self.Actions.MOVE: | |
action_fn = self._perform_move | |
finish_fn = None | |
else: | |
raise ValueError("Shouldn't reach here") | |
count = cursor.execute( | |
""" | |
select count(*) | |
from file_entities | |
where status < ? | |
""", | |
[update_status] | |
).fetchone()[0] | |
while True: | |
limit = self._get_max_workers(executor) - len(futures) | |
results = cursor.execute( | |
""" | |
select id, parent_path, parent_id, filename | |
from file_entities | |
where id > ? and status < ? | |
order by id asc | |
limit ? | |
""", | |
(last_id, update_status, limit) | |
) | |
rows = [r for r in results] | |
if not rows: | |
break | |
for r in rows: | |
syn_id = r[0] | |
dirpath = (r[1], r[2]) | |
filename = r[3] | |
syn_path = '{0}/{1} ({2})'.format('/'.join(dirpath), filename, syn_id) | |
logging.info('Checking: {0}'.format(syn_path)) | |
future = executor.submit(self._perform_storage_action(action_fn), syn_path, syn_id) | |
futures.append(future) | |
concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) | |
futures, finished_count, errored_count =\ | |
self._mark_finished(conn, cursor, futures, update_status, finish_fn) | |
total_finished_count += finished_count | |
total_errored_count += errored_count | |
last_id = rows[-1][0] | |
completed_count = total_finished_count + total_errored_count | |
logging.info("Completed {} of {}, ({}%)".format(completed_count, count, (completed_count * 100) // count)) | |
# wait for any remaining actions to finish | |
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED) | |
_, finished_count, errored_count = self._mark_finished(conn, cursor, futures, update_status, finish_fn) | |
total_finished_count += finished_count | |
total_errored_count += errored_count | |
if self._dry_run: | |
logging.info('') | |
logging.info('Dry Run Only, no changes committed.') | |
logging.info('') | |
logging.info( | |
"Done. Processed {} entities. Encountered errors on {}. See log output for details".format( | |
total_finished_count, total_errored_count | |
) | |
) | |
def _mark_finished(self, conn, cursor, futures, update_status, finish_fn): | |
# update the index records of all files on which the action has been performed. | |
# this is done from the primary thread to operate with a single threaded SQLite | |
unfinished = [] | |
finished_ids = [] | |
errored_count = 0 | |
for future in futures: | |
if future.done(): | |
if future.exception(): | |
errored_count += 1 | |
else: | |
# for any errored Futures we rely on the execution itself to have logged useful info | |
result = future.result() | |
syn_id = result if not finish_fn else finish_fn(cursor, result) | |
finished_ids.append(syn_id) | |
else: | |
unfinished.append(future) | |
if finished_ids and not self._dry_run: | |
cursor.executemany( | |
"update file_entities set status = ? where id = ?", | |
[(update_status, i,) for i in finished_ids] | |
) | |
conn.commit() | |
return unfinished, len(finished_ids), errored_count | |
def _perform_storage_action(self, fn): | |
def wrapped_fn(syn_path, syn_id): | |
try: | |
return fn(syn_path, syn_id) | |
except Exception: | |
logging.exception("Encountered an error during {} of {}".format(self._action, syn_id)) | |
raise | |
return wrapped_fn | |
def _perform_move(self, syn_path, syn_id): | |
# Fix the storage location. | |
syn_file = self._synapse_client.get(syn_id, downloadFile=False) | |
syn_file_storage_location_id = syn_file['_file_handle']['storageLocationId'] | |
if syn_file_storage_location_id != self._storage_location_id: | |
logging.info('Moving: {0}'.format(syn_path)) | |
if not self._dry_run: | |
syn_file = self._synapse_client.get(syn_id, downloadFile=True) | |
old_md5 = syn_file['_file_handle']['contentMd5'] | |
self._synapse_client.cache.remove(syn_file) | |
updated_syn_file = self._synapse_client.store(syn_file, forceVersion=True) | |
new_md5 = updated_syn_file['_file_handle']['contentMd5'] | |
if new_md5 != old_md5: raise Exception('ERROR: MD5 does not match: {0}'.format(syn_path)) | |
self._synapse_client.cache.remove(syn_file, delete=True) | |
if updated_syn_file['_file_handle']['storageLocationId'] != self._storage_location_id: | |
raise Exception('ERROR: Storage location does not match intended location: {0}'.format(syn_path)) | |
else: | |
logging.info('Moved: {0}'.format(syn_path)) | |
else: | |
logging.info('Skipping Move (Dry Run): {0}'.format(syn_path)) | |
return syn_id | |
def _perform_delete(self, syn_path, syn_id): | |
# Delete the versions in the old bucket. | |
current_syn_file = self._synapse_client.get(syn_id, downloadFile=False) | |
current_syn_file_handle = current_syn_file['_file_handle'] | |
current_syn_file_storage_location_id = current_syn_file_handle['storageLocationId'] | |
deleted_version_file_handles = [] | |
# Make sure the current version of the file is on the correct bucket before deleting anything. | |
if current_syn_file_storage_location_id == self._storage_location_id: | |
versions = list(self._synapse_client._GET_paginated('/entity/{0}/version'.format(syn_id))) | |
for version_data in versions: | |
version_number = version_data['versionNumber'] | |
if version_number != current_syn_file.versionNumber: | |
check_file, file_handle_id, check_storage_location_id =\ | |
self.get_syn_file_by_version(syn_id, version_number) | |
if check_storage_location_id != self._storage_location_id: | |
if not self._dry_run: | |
logging.info('Deleting: {0}, Version: {1}, Storage Location: {2}'.format( | |
syn_path, | |
version_number, | |
check_storage_location_id | |
)) | |
self._synapse_client.restDELETE("/entity/{}/version/{}?skipTrashCan=true".format( | |
syn_id, | |
version_number | |
)) | |
deleted_version_file_handles.append(( | |
version_number, | |
check_storage_location_id, | |
file_handle_id, | |
)) | |
else: | |
logging.info( | |
'Skipping Delete (Dry Run): {0}, Version: {1}, Storage Location: {2}'.format( | |
syn_path, | |
version_number, | |
check_storage_location_id | |
)) | |
else: | |
logging.error('Storage location of current version does not match: {0}'.format(syn_path)) | |
return syn_id, deleted_version_file_handles | |
def _index_deleted_versions_file_handles(self, cursor, result): | |
syn_id, version_file_handles = result | |
insert_values = [] | |
for version_file_handle in version_file_handles: | |
insert_values.append((syn_id, *version_file_handle)) | |
cursor.executemany(""" | |
insert into deleted_version_file_handles (id, version, storage_location_id, file_handle_id, status) | |
values (?, ?, ?, ?, 0)""", | |
insert_values | |
) | |
return syn_id | |
def _run_deleted_version_purge(self, executor, conn): | |
logging.info('Executing action: {0}'.format(self._action)) | |
if self._dry_run: | |
logging.info('Executing Dry Run Only!') | |
cursor = conn.cursor() | |
try: | |
self._check_table(cursor, 'deleted_version_file_handles') | |
except ValueError as ex: | |
raise ValueError(""" | |
No existing deletion index found in {}. | |
The {} action only be run after the {} acton, using the same index_db_path | |
""".format( | |
self._index_db_path, | |
self.Actions.PURGE, | |
self.Actions.DELETE, | |
)) from ex | |
last_id = '' | |
last_version = -1 | |
futures = [] | |
total_finished_count = 0 | |
total_errored_count = 0 | |
count = cursor.execute( | |
""" | |
select count(*) | |
from deleted_version_file_handles | |
where status = ? | |
""", | |
[self.FileHandleStatus.UNPURGED] | |
).fetchone()[0] | |
if count == 0: | |
# there are no deletable records in the sqlite db. | |
# Either the delete action wasn't run first, there are no deletable versions in the project, | |
# or the purge action was already run and complete. | |
logging.warning("No deletable file versions found in {}. Was the delete action run first?".format( | |
self._index_db_path) | |
) | |
return | |
while True: | |
limit = self._get_max_workers(executor) - len(futures) | |
results = cursor.execute( | |
""" | |
select id, version, file_handle_id | |
from deleted_version_file_handles | |
where status = ? and (id > ? or (id = ? and version > ?)) | |
order by id, version | |
limit ? | |
""", | |
[self.FileHandleStatus.UNPURGED, last_id, last_id, last_version, limit] | |
) | |
rows = [r for r in results] | |
if not rows: | |
break | |
for r in rows: | |
syn_id = r[0] | |
version = r[1] | |
file_handle_id = r[2] | |
future = executor.submit(self._perform_file_handle_purge, syn_id, version, file_handle_id) | |
futures.append(future) | |
concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) | |
futures, finished_count, errored_count = self._handle_purge_futures(cursor, futures) | |
total_finished_count += finished_count | |
total_errored_count += errored_count | |
last_id = syn_id | |
last_version = version | |
completed_count = total_finished_count + total_errored_count | |
logging.info("Completed {} of {}, ({}%)".format(completed_count, count, (completed_count * 100) // count)) | |
# wait for all remaining | |
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED) | |
_, finished_count, errored_count = self._handle_purge_futures(cursor, futures) | |
total_finished_count += finished_count | |
total_errored_count += errored_count | |
logging.info("Successfully removed {} file handles. Encountered errors on {}, see the output for causes".format( | |
total_finished_count, total_errored_count | |
)) | |
def _handle_purge_futures(self, cursor, futures): | |
unfinished = [] | |
finished_versions = [] | |
errored_count = 0 | |
for future in futures: | |
if future.done(): | |
if future.exception(): | |
# for any errored Futures we rely on the execution itself to have logged useful info | |
errored_count += 1 | |
else: | |
result = future.result() | |
finished_versions.append(result) | |
else: | |
unfinished.append(future) | |
if finished_versions and not self._dry_run: | |
cursor.executemany( | |
"update deleted_version_file_handles set status = 1 where id = ? and version = ?", | |
finished_versions | |
) | |
return unfinished, len(finished_versions), errored_count | |
def _perform_file_handle_purge(self, syn_id, version, file_handle_id): | |
# note that this will error if there are any remaining references to the Synapse entity | |
# (i.e. another entity referencing the same file handle, the entity still in the trash can, etc) | |
try: | |
if not self._dry_run: | |
self._synapse_client.restDELETE( | |
f"/fileHandle/{file_handle_id}", | |
endpoint=self._synapse_client.fileHandleEndpoint | |
) | |
logging.info("Removed file handle {0}, Version: {1}, File handle id: {2}".format( | |
syn_id, | |
version, | |
file_handle_id | |
)) | |
else: | |
logging.info( | |
"Skipping File handle removal (Dry Run): {0}, Version: {1}, File handle id: {2}".format( | |
syn_id, | |
version, | |
file_handle_id | |
)) | |
return syn_id, version | |
except Exception: | |
logging.exception( | |
"Unable to remove file handle {} for id {}. There may sill be Synapse references to it".format( | |
file_handle_id, syn_id | |
) | |
) | |
raise | |
def get_syn_file_by_version(self, syn_id, version): | |
syn_file = None | |
file_handle_id = None | |
check_storage_location_id = None | |
if version is None: | |
raise Exception('version is required.') | |
try: | |
syn_file = self._synapse_client.get(syn_id, downloadFile=False, version=version) | |
file_handle = syn_file['_file_handle'] | |
file_handle_id = file_handle['id'] | |
check_storage_location_id = file_handle['storageLocationId'] | |
except Exception as ex: | |
if 'Cannot find a node with id' in str(ex): | |
# Version does not exist. | |
pass | |
else: | |
logging.exception(ex) | |
return syn_file, file_handle_id, check_storage_location_id | |
def synapse_login(self): | |
logging.info('Logging into Synapse...') | |
self._username = self._username or os.getenv('SYNAPSE_USERNAME') | |
self._password = self._password or os.getenv('SYNAPSE_PASSWORD') | |
if not self._username: | |
self._username = input('Synapse username: ') | |
if not self._password: | |
self._password = getpass.getpass(prompt='Synapse password: ') | |
try: | |
self._synapse_client = synapseclient.Synapse(skip_checks=True) | |
self._synapse_client.login(self._username, self._password, silent=True) | |
self._synapse_client.multi_threaded = True | |
if self._synapse_cache is not None: | |
self._synapse_client.cache.cache_root_dir = self._synapse_cache | |
except Exception as ex: | |
self._synapse_client = None | |
logging.info('Synapse login failed: {0}'.format(str(ex))) | |
return self._synapse_client is not None | |
class LogFilter(logging.Filter): | |
FILTERS = [ | |
'##################################################', | |
'Uploading file to Synapse storage', | |
'Connection pool is full, discarding connection:' | |
] | |
def filter(self, record): | |
for filter in self.FILTERS: | |
if filter in record.msg: | |
return False | |
return True | |
def main(): | |
parser = argparse.ArgumentParser(description=""" | |
A script to migrate a project to a new Synapse storage location, with optional additional stages for deleting | |
the previous versions at the previous storage location and removing the associated file handles from the | |
underlying storage layer. | |
""") | |
parser.add_argument('project_id', metavar='project-id', | |
help='The Synapse Project ID to fix.') | |
parser.add_argument('-a', '--action', | |
choices=[ | |
SynapseStorageFixer.Actions.MOVE, | |
SynapseStorageFixer.Actions.DELETE, | |
SynapseStorageFixer.Actions.PURGE, | |
], | |
default=SynapseStorageFixer.Actions.MOVE, | |
help="""The action to take: MOVE the files to a different storage location, | |
DELETE the versions not on the storage location, | |
or PURGE the file handles from project previously deleted with this script.""") | |
parser.add_argument('-s', '--storage-location', metavar='storage-location', type=int, required=True, | |
help='The storage location ID that should be set. (Gates: 33266, Test: 41104)') | |
parser.add_argument('-t', '--threads', | |
help='The maximum number of threads to use.', | |
type=int, | |
default=None) | |
parser.add_argument('-u', '--username', | |
help='Synapse username.', default=None) | |
parser.add_argument('-p', '--password', | |
help='Synapse password.', default=None) | |
parser.add_argument('-i', '--index_db_path', required=True, | |
help='Path to store a sqlite index of files to be moved/deleted') | |
parser.add_argument('-ll', '--log-level', | |
help='Set the logging level.', | |
default='INFO') | |
parser.add_argument('-ld', '--log-dir', | |
help='Set the directory where the log file will be written.') | |
parser.add_argument('-sc', '--synapse-cache', | |
help='Set the directory where the synapse cache will reside.') | |
parser.add_argument('-d', '--dry-run', default=False, | |
action='store_true', help='Dry run only. Do not update or delete anything.') | |
args = parser.parse_args() | |
log_level = getattr(logging, args.log_level.upper()) | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") | |
log_filename = '{0}.log'.format(timestamp) | |
if args.log_dir: | |
log_filename = os.path.join(expand_path(args.log_dir), log_filename) | |
else: | |
log_filename = os.path.join(app_log_dir(), log_filename) | |
ensure_dirs(os.path.dirname(log_filename)) | |
logging.basicConfig( | |
filename=log_filename, | |
filemode='w', | |
format='%(asctime)s %(levelname)s: %(message)s', | |
level=log_level | |
) | |
# Add console logging. | |
console = logging.StreamHandler() | |
console.setLevel(log_level) | |
console.setFormatter(logging.Formatter('%(message)s')) | |
logging.getLogger().addHandler(console) | |
log_filter = LogFilter() | |
for logger in [logging.getLogger(name) for name in logging.root.manager.loggerDict]: | |
logger.addFilter(log_filter) | |
print('Logging output to: {0}'.format(log_filename)) | |
syn_cache_path = expand_path(args.synapse_cache) if args.synapse_cache else None | |
try: | |
SynapseStorageFixer( | |
args.action, | |
args.project_id, | |
args.storage_location, | |
username=args.username, | |
password=args.password, | |
synapse_cache=syn_cache_path, | |
max_threads=args.threads, | |
dry_run=args.dry_run, | |
index_db_path=args.index_db_path, | |
).run() | |
except Exception as ex: | |
logging.exception(ex) | |
raise | |
finally: | |
print('Logged output to: {0}'.format(log_filename)) | |
def expand_path(local_path): | |
var_path = os.path.expandvars(local_path) | |
expanded_path = os.path.expanduser(var_path) | |
return os.path.abspath(expanded_path) | |
def ensure_dirs(local_path): | |
if not os.path.isdir(local_path): | |
os.makedirs(local_path) | |
def app_dir(): | |
return os.path.join(pathlib.Path.home(), '.syntools') | |
def app_log_dir(): | |
return os.path.join(app_dir(), 'logs') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment