Skip to content

Instantly share code, notes, and snippets.

@nfrasser
Last active October 24, 2022 20:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nfrasser/9618e87512cf3bdc7f3af20e8bce5544 to your computer and use it in GitHub Desktop.
Save nfrasser/9618e87512cf3bdc7f3af20e8bce5544 to your computer and use it in GitHub Desktop.
Restore missing GridFS files in CryoSPARC database using job directories
"""
Restore missing GridFS files for a given project (or job)
Usage:
eval $(cryosparcm env)
python -m restore_gridfs_files --project <puid> [ --job <juid> ]
"""
from io import BytesIO
import os
import os.path
import argparse
import pickle
from pathlib import Path
from time import sleep, time
from gridfs import GridFS
from cryosparc_compute import database_management
from cryosparc_compute.client import CommandClient
CONTENT_TYPE_MAP = {
'png': 'image/png',
'gif': 'image/gif',
'jpeg': 'image/jpeg',
'jpg': 'image/jpeg',
'svg': 'image/svg+xml',
'pdf': 'application/pdf',
'txt': 'text/plain',
'csv': 'text/csv',
'json': 'application/json',
'xml': 'application/xml',
}
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--project', help='Project UID to restore')
parser.add_argument('-j', '--job', help='Job UID to restore (optional)')
parser.add_argument('-d', '--dump', help='Dump each job to disk after succesful restore (RESETS EXISTING gridfs_data DIRECTORY)', action='store_true')
args = parser.parse_args()
mongo = database_management.get_pymongo_client('meteor')
db = mongo['meteor']
gridfs = GridFS(db)
cli = CommandClient(
host=os.getenv('CRYOSPARC_MASTER_HOSTNAME', 'localhost'),
port=int(os.getenv('CRYOSPARC_COMMAND_CORE_PORT', 39002))
)
def upload_file(
filedata,
filename=None,
chunk_size=2096128,
content_type=None,
project_uid=None,
job_or_session_uid=None
):
assert project_uid, "project_uid not specified"
if content_type is None:
assert filename, "filename or content_type required"
content_type = CONTENT_TYPE_MAP.get(
filename.split('.')[-1], "application/octet-stream")
res = gridfs.put(filedata, filename=filename,
chunk_size=chunk_size, content_type=content_type)
db['fs.files'].update_one({'_id': res}, {
'$set': {
'project_uid': project_uid,
'job_uid': job_or_session_uid
}
})
return res
def restore_job_dir(puid: str, job_dir: Path, dump: bool = False):
juid = job_dir.name
gridfs_data_dir = job_dir / "gridfs_data"
job_doc = db.jobs.find_one({"project_uid": puid, "uid": juid})
if not job_doc:
print(f"Could not find job {puid}-{juid} in the database")
return
if not gridfs_data_dir.is_dir():
print(f"Job {juid} does not have a gridfs_data directory, skipping")
return
print(f"Importing gridfs files for {juid}...")
uploaded_files = {}
main_tic = time()
for gridfsdata_file in gridfs_data_dir.glob("gridfsdata_*"):
tic = time()
total_restored = 0
print(f" Uploading files for {juid} from {gridfsdata_file} ...")
with open(gridfsdata_file, 'rb') as openfile:
try:
gridfs_data = pickle.load(openfile)
except UnicodeDecodeError:
# in py3, we need to tell pickle.load() how to decode the 8-bit string instances pickled by py2
# using 'bytes' to read these 8-bit string instances as bytes objects
openfile.seek(0)
gridfs_data = []
for raw_image in pickle.load(openfile, encoding='bytes'):
image = {
'_id': raw_image[b'_id'].decode(),
'filename': raw_image[b'filename'].decode(),
'data': raw_image[b'data'],
'job_uid': raw_image[b'_job_uid'].decode(),
}
gridfs_data.append(image)
for image in gridfs_data:
# for each image in the list, reupload and keep a
# old_id: new_id dict of everything uploaded
print(f" FILE {image['_id']} {image['job_uid']} {image['filename']}")
rawdata = BytesIO(image['data'])
fid = upload_file(
rawdata,
filename=image.get('filename', gridfsdata_file.name),
project_uid=puid,
job_or_session_uid=juid
)
uploaded_files[image['_id']] = str(fid)
total_restored += 1
print(f" Uploaded {total_restored} files for {juid} in {time() - tic:.2f}s")
if not uploaded_files:
print(f" No files uploaded for {juid}")
return
# Update event IDs
print(" Updating database events ...")
tic = time()
old_file_ids = []
for event in db.events.find({
"project_uid": puid,
"job_uid": juid,
"imgfiles.0": {"$exists": True}
}):
imgfiles = event["imgfiles"]
for imgfile in imgfiles:
if imgfile["fileid"] not in uploaded_files:
print(f" Warning: No file upload found for {juid} {imgfile['filename']} {imgfile['fileid']}")
continue
old_file_ids.append(imgfile["fileid"])
imgfile["fileid"] = uploaded_files[imgfile["fileid"]]
db.events.update_one({'_id': event['_id']}, {'$set': {'imgfiles': imgfiles}})
print(" Updating tile images...")
for tile_image in job_doc['ui_tile_images']:
old_file_ids.append(tile_image['fileid'])
tile_image['fileid'] = uploaded_files.get(str(tile_image['fileid']), "")
print(" Updating output group images...")
for key, value in job_doc['output_group_images'].items():
old_file_ids.append(str(value))
job_doc['output_group_images'][key] = uploaded_files.get(str(value), "")
db.jobs.update_one({'_id': job_doc['_id']}, {
'$set': {
'ui_tile_images': job_doc['ui_tile_images'],
'output_group_images': job_doc['output_group_images'],
}
})
print(f" Updated {len(old_file_ids)} event files for {juid} in {time() - tic:.2f}s")
if old_file_ids:
print(" Attempting to clean up old files...")
for fid in old_file_ids:
gridfs.delete(fid)
print(f" Deleted {len(old_file_ids)} old files")
if dump:
print(" Dumping job...")
print(" WARNING: gridfs files in the job directory will be reset")
cli.dump_job_database(project_uid=puid, job_uid=juid)
else:
print((
" ** WARNING: do not attempt to restore this job again until you run the following command:\n\n"
f" cryosparcm cli 'dump_job_database(\"{puid}\", \"{juid}\")'\n\n"
" To avoid this warning, provide the --dump flag "
))
print(f" Restored files for {juid} in {time() - main_tic:.2f}s")
assert args.project, "Project not specified"
project_dir = Path(cli.get_project_dir_abs(args.project))
assert project_dir and project_dir.exists(
), f"Could not find project directory for project {args.project}"
job_uids = []
if isinstance(args.job, str):
job_dirs = [project_dir / args.job]
else:
job_dirs = list(project_dir.glob('J*'))
print(f"Found {len(job_dirs)} to restore")
if args.dump:
print("** WARNING: --dump is enabled, gridfs files in the job directory will be reset after successful restore.")
else:
print((
"** WARNING: do not attempt re-run this command until you run the following command for each job:\n\n"
f" cryosparcm cli 'dump_job_database(\"{args.project}\", \"<juid>\")'\n\n"
"To avoid this warning, provide the --dump flag\n"
))
print("Continuing in 5 seconds...")
sleep(5)
for job_dir in job_dirs:
if not job_dir.is_dir():
print(f"Job directory {job_dir} is not a directory, skipping")
continue
try:
restore_job_dir(args.project, job_dir, args.dump)
except Exception as e:
print(f" ERROR: Could not restore {job_dir.name}: {e}")
raise
@nfrasser
Copy link
Author

Updated for CryoSPARC v4. See previous revision for v3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment