Last active
October 24, 2022 20:13
-
-
Save nfrasser/9618e87512cf3bdc7f3af20e8bce5544 to your computer and use it in GitHub Desktop.
Restore missing GridFS files in CryoSPARC database using job directories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Restore missing GridFS files for a given project (or job) | |
Usage: | |
eval $(cryosparcm env) | |
python -m restore_gridfs_files --project <puid> [ --job <juid> ] | |
""" | |
from io import BytesIO | |
import os | |
import os.path | |
import argparse | |
import pickle | |
from pathlib import Path | |
from time import sleep, time | |
from gridfs import GridFS | |
from cryosparc_compute import database_management | |
from cryosparc_compute.client import CommandClient | |
CONTENT_TYPE_MAP = { | |
'png': 'image/png', | |
'gif': 'image/gif', | |
'jpeg': 'image/jpeg', | |
'jpg': 'image/jpeg', | |
'svg': 'image/svg+xml', | |
'pdf': 'application/pdf', | |
'txt': 'text/plain', | |
'csv': 'text/csv', | |
'json': 'application/json', | |
'xml': 'application/xml', | |
} | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-p', '--project', help='Project UID to restore') | |
parser.add_argument('-j', '--job', help='Job UID to restore (optional)') | |
parser.add_argument('-d', '--dump', help='Dump each job to disk after succesful restore (RESETS EXISTING gridfs_data DIRECTORY)', action='store_true') | |
args = parser.parse_args() | |
mongo = database_management.get_pymongo_client('meteor') | |
db = mongo['meteor'] | |
gridfs = GridFS(db) | |
cli = CommandClient( | |
host=os.getenv('CRYOSPARC_MASTER_HOSTNAME', 'localhost'), | |
port=int(os.getenv('CRYOSPARC_COMMAND_CORE_PORT', 39002)) | |
) | |
def upload_file( | |
filedata, | |
filename=None, | |
chunk_size=2096128, | |
content_type=None, | |
project_uid=None, | |
job_or_session_uid=None | |
): | |
assert project_uid, "project_uid not specified" | |
if content_type is None: | |
assert filename, "filename or content_type required" | |
content_type = CONTENT_TYPE_MAP.get( | |
filename.split('.')[-1], "application/octet-stream") | |
res = gridfs.put(filedata, filename=filename, | |
chunk_size=chunk_size, content_type=content_type) | |
db['fs.files'].update_one({'_id': res}, { | |
'$set': { | |
'project_uid': project_uid, | |
'job_uid': job_or_session_uid | |
} | |
}) | |
return res | |
def restore_job_dir(puid: str, job_dir: Path, dump: bool = False): | |
juid = job_dir.name | |
gridfs_data_dir = job_dir / "gridfs_data" | |
job_doc = db.jobs.find_one({"project_uid": puid, "uid": juid}) | |
if not job_doc: | |
print(f"Could not find job {puid}-{juid} in the database") | |
return | |
if not gridfs_data_dir.is_dir(): | |
print(f"Job {juid} does not have a gridfs_data directory, skipping") | |
return | |
print(f"Importing gridfs files for {juid}...") | |
uploaded_files = {} | |
main_tic = time() | |
for gridfsdata_file in gridfs_data_dir.glob("gridfsdata_*"): | |
tic = time() | |
total_restored = 0 | |
print(f" Uploading files for {juid} from {gridfsdata_file} ...") | |
with open(gridfsdata_file, 'rb') as openfile: | |
try: | |
gridfs_data = pickle.load(openfile) | |
except UnicodeDecodeError: | |
# in py3, we need to tell pickle.load() how to decode the 8-bit string instances pickled by py2 | |
# using 'bytes' to read these 8-bit string instances as bytes objects | |
openfile.seek(0) | |
gridfs_data = [] | |
for raw_image in pickle.load(openfile, encoding='bytes'): | |
image = { | |
'_id': raw_image[b'_id'].decode(), | |
'filename': raw_image[b'filename'].decode(), | |
'data': raw_image[b'data'], | |
'job_uid': raw_image[b'_job_uid'].decode(), | |
} | |
gridfs_data.append(image) | |
for image in gridfs_data: | |
# for each image in the list, reupload and keep a | |
# old_id: new_id dict of everything uploaded | |
print(f" FILE {image['_id']} {image['job_uid']} {image['filename']}") | |
rawdata = BytesIO(image['data']) | |
fid = upload_file( | |
rawdata, | |
filename=image.get('filename', gridfsdata_file.name), | |
project_uid=puid, | |
job_or_session_uid=juid | |
) | |
uploaded_files[image['_id']] = str(fid) | |
total_restored += 1 | |
print(f" Uploaded {total_restored} files for {juid} in {time() - tic:.2f}s") | |
if not uploaded_files: | |
print(f" No files uploaded for {juid}") | |
return | |
# Update event IDs | |
print(" Updating database events ...") | |
tic = time() | |
old_file_ids = [] | |
for event in db.events.find({ | |
"project_uid": puid, | |
"job_uid": juid, | |
"imgfiles.0": {"$exists": True} | |
}): | |
imgfiles = event["imgfiles"] | |
for imgfile in imgfiles: | |
if imgfile["fileid"] not in uploaded_files: | |
print(f" Warning: No file upload found for {juid} {imgfile['filename']} {imgfile['fileid']}") | |
continue | |
old_file_ids.append(imgfile["fileid"]) | |
imgfile["fileid"] = uploaded_files[imgfile["fileid"]] | |
db.events.update_one({'_id': event['_id']}, {'$set': {'imgfiles': imgfiles}}) | |
print(" Updating tile images...") | |
for tile_image in job_doc['ui_tile_images']: | |
old_file_ids.append(tile_image['fileid']) | |
tile_image['fileid'] = uploaded_files.get(str(tile_image['fileid']), "") | |
print(" Updating output group images...") | |
for key, value in job_doc['output_group_images'].items(): | |
old_file_ids.append(str(value)) | |
job_doc['output_group_images'][key] = uploaded_files.get(str(value), "") | |
db.jobs.update_one({'_id': job_doc['_id']}, { | |
'$set': { | |
'ui_tile_images': job_doc['ui_tile_images'], | |
'output_group_images': job_doc['output_group_images'], | |
} | |
}) | |
print(f" Updated {len(old_file_ids)} event files for {juid} in {time() - tic:.2f}s") | |
if old_file_ids: | |
print(" Attempting to clean up old files...") | |
for fid in old_file_ids: | |
gridfs.delete(fid) | |
print(f" Deleted {len(old_file_ids)} old files") | |
if dump: | |
print(" Dumping job...") | |
print(" WARNING: gridfs files in the job directory will be reset") | |
cli.dump_job_database(project_uid=puid, job_uid=juid) | |
else: | |
print(( | |
" ** WARNING: do not attempt to restore this job again until you run the following command:\n\n" | |
f" cryosparcm cli 'dump_job_database(\"{puid}\", \"{juid}\")'\n\n" | |
" To avoid this warning, provide the --dump flag " | |
)) | |
print(f" Restored files for {juid} in {time() - main_tic:.2f}s") | |
assert args.project, "Project not specified" | |
project_dir = Path(cli.get_project_dir_abs(args.project)) | |
assert project_dir and project_dir.exists( | |
), f"Could not find project directory for project {args.project}" | |
job_uids = [] | |
if isinstance(args.job, str): | |
job_dirs = [project_dir / args.job] | |
else: | |
job_dirs = list(project_dir.glob('J*')) | |
print(f"Found {len(job_dirs)} to restore") | |
if args.dump: | |
print("** WARNING: --dump is enabled, gridfs files in the job directory will be reset after successful restore.") | |
else: | |
print(( | |
"** WARNING: do not attempt re-run this command until you run the following command for each job:\n\n" | |
f" cryosparcm cli 'dump_job_database(\"{args.project}\", \"<juid>\")'\n\n" | |
"To avoid this warning, provide the --dump flag\n" | |
)) | |
print("Continuing in 5 seconds...") | |
sleep(5) | |
for job_dir in job_dirs: | |
if not job_dir.is_dir(): | |
print(f"Job directory {job_dir} is not a directory, skipping") | |
continue | |
try: | |
restore_job_dir(args.project, job_dir, args.dump) | |
except Exception as e: | |
print(f" ERROR: Could not restore {job_dir.name}: {e}") | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated for CryoSPARC v4. See previous revision for v3.