Skip to content

Instantly share code, notes, and snippets.

@ibspoof
Last active February 22, 2019 16:04
Show Gist options
  • Save ibspoof/40edc3d7dc0b73e5c108a3728df7e10c to your computer and use it in GitHub Desktop.
Save ibspoof/40edc3d7dc0b73e5c108a3728df7e10c to your computer and use it in GitHub Desktop.
Restore a single nodes SSTables from OpsCenters S3 Backup Location using multi-threaded downloads
[s3]
#s3 bucket name
bucket_name = my_backups
download_threads = 6
# other s3 access is defined in the default aws cli settings file
[opscenter]
backup_job_uuid = # get this from s3 bucket
[node]
host_id = # get this from nodetool status
restore_dir = ./restore/
# should be the /var/lib/cassandra/data or similar parent dir
cassandra_data_dir = ./cassandra/
invalid_keyspaces = OpsCenter, system, system_traces
import json
import os
import subprocess
import ConfigParser
import time
import logging
import shutil
from glob import glob
import re
from multiprocessing.pool import ThreadPool
# enable logging to console and output.log file
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
file_out = logging.FileHandler("./output.log")
file_out.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
file_out.setLevel(logging.DEBUG)
logging.getLogger().addHandler(file_out)
Config = ConfigParser.ConfigParser()
Config.read("./restore_node_from_opscenter_backups.ini")
conf = {
's3': {
'bucket_name': "s3://" + Config.get('s3', 'bucket_name'),
'download_threads': Config.getint('s3', 'download_threads')
},
'opscenter': {
'backup_job_uuid': Config.get('opscenter', 'backup_job_uuid')
},
'node': {
'host_id': Config.get('node', 'host_id'),
'restore_dir': Config.get('node', 'restore_dir'),
'cassandra_data_dir': Config.get('node', 'cassandra_data_dir'),
'invalid_keyspaces': Config.items('node', 'invalid_keyspaces')
}
}
def aws_cmd(*args):
cmd_list = ['aws', 's3']
for a in args:
cmd_list.append(a)
logging.debug("Command run: %s", " ".join(cmd_list))
return subprocess.check_output(cmd_list, shell=False, stderr=subprocess.STDOUT)
def bucket_path(trailing=False, *args):
S3_PATH_SEP = "/"
path = S3_PATH_SEP.join(args)
if trailing:
path = path + S3_PATH_SEP
return path
def get_host_ids_s3_snapshot_path(trailing=True):
return bucket_path(trailing, conf['s3']['bucket_name'], 'snapshots', conf['node']['host_id'])
def get_latest_backup_json():
backups = aws_cmd('ls', get_host_ids_s3_snapshot_path())
backup_list_clean = []
for backup in backups.split("\n"):
if backup.find(conf['opscenter']['backup_job_uuid']) > 0:
backup_list_clean.append(backup.strip(' \rPRE/\n'))
last_backup = backup_list_clean[-1]
logging.info("Last backup is: %s", last_backup)
local_backup_file = conf['node']['restore_dir'] + 'backup.json'
remote_backup_file = bucket_path(False, get_host_ids_s3_snapshot_path(False), last_backup, 'backup.json')
aws_cmd('cp', remote_backup_file, local_backup_file, '--profile', 'cassandra')
return json.load(open(local_backup_file))
def get_keyspace_tables_to_restore(backup_json):
keyspace_sstables = {}
for keyspace in backup_json['keyspaces']:
if keyspace in conf['node']['invalid_keyspaces']:
continue
for table in backup_json['keyspaces'][keyspace]:
if keyspace not in keyspace_sstables:
keyspace_sstables[keyspace] = {}
sstable_list = []
for sstable in backup_json['sstables']:
if sstable['type'] == "Data":
if sstable['keyspace'] == keyspace and sstable['cf'] == table:
sstable_split = sstable['name'].split("-")
sstable_split.pop()
sstable_list.append("-".join(sstable_split) + '-')
if len(sstable_list) > 0:
keyspace_sstables[keyspace][table] = sstable_list
return keyspace_sstables
def create_dir(path):
"""
Create restore path
:param path:
:return:
"""
if not os.path.exists(path):
os.makedirs(path)
def download_all_sstables_from_s3(keyspace_tables_to_restore):
"""
Downloads all the files to restore directory
:param tables_to_restore:
:return:
"""
base_sstable_s3_dir = bucket_path(True, get_host_ids_s3_snapshot_path(False), 'sstables')
sstable_files = ['CompressionInfo.db', 'Data.db', 'Filter.db', 'Index.db', 'Statistics.db', 'Summary.db']
all_downloads_start = time.time()
for keyspace in keyspace_tables_to_restore:
pool = ThreadPool(processes=conf['s3']['download_threads'])
keyspace_files_to_get = []
if len(keyspace_tables_to_restore[keyspace]) < 1:
logging.debug("Skipping keyspace %s since there are no tables to restore", keyspace)
continue
keyspace_dir = conf['node']['restore_dir'] + keyspace
create_dir(keyspace_dir)
for table in keyspace_tables_to_restore[keyspace]:
table_dir = keyspace_dir + "/" + table + "/"
create_dir(table_dir)
table_time_start = time.time()
if len(keyspace_tables_to_restore[keyspace][table]) < 1:
logging.debug("No SSTables found for %s:%s skipping download.", keyspace, table)
continue
for sstable in keyspace_tables_to_restore[keyspace][table]:
for type in sstable_files:
sstable_copy = sstable + type
keyspace_files_to_get.append((base_sstable_s3_dir + sstable_copy, table_dir))
keyspace_time_start = time.time()
logging.info("Downloading %d files for keyspace %s using %d threads", len(keyspace_files_to_get),
keyspace, conf['s3']['download_threads'])
pool.map(download, keyspace_files_to_get)
logging.info("Completed downloading all SSTables for keyspace %s in %.2fs", keyspace,
(time.time() - keyspace_time_start))
pool.close()
pool.join()
logging.info("Complete downloading all backed up SSTables in %.2fs", time.time() - all_downloads_start)
def download(to_download):
logging.debug("Downloading %s to: %s", to_download[0], to_download[1])
aws_cmd('cp', to_download[0], to_download[1], '--profile', 'cassandra')
def get_restore_table_paths_mapped_to_casssandra(cassandra_keyspace_dir):
cassandra_table_dirs_with_uuid = glob(cassandra_keyspace_dir + "/*")
table_to_cassandra_path = {}
for path in cassandra_table_dirs_with_uuid:
new_path = re.sub("\\\\", "/", path).split("/")
table_name = new_path[-1].split("-")[0]
table_to_cassandra_path[table_name] = new_path[-1]
return table_to_cassandra_path
def move_downloaded_sstables_to_cassandra_location(keyspace_tables_to_restore):
moving_start_time = time.time()
cassandra_base_dir = conf['node']['cassandra_data_dir']
logging.info("Moving downloaded files to Cassandra dir %s", cassandra_base_dir)
sstable_files = ['CompressionInfo.db', 'Data.db', 'Filter.db', 'Index.db', 'Statistics.db', 'Summary.db']
for keyspace in keyspace_tables_to_restore:
if len(keyspace_tables_to_restore[keyspace]) < 1:
logging.debug("Skipping keyspace %s since there are no tables to restore", keyspace)
continue
keyspace_dir = conf['node']['restore_dir'] + keyspace
cassandra_keyspace_dir = cassandra_base_dir + keyspace + "/"
path_mapping = get_restore_table_paths_mapped_to_casssandra(cassandra_keyspace_dir)
for table in keyspace_tables_to_restore[keyspace]:
table_dir = keyspace_dir + "/" + table + "/"
cassandra_table_path = cassandra_keyspace_dir + path_mapping[table] + "/"
for sstable in keyspace_tables_to_restore[keyspace][table]:
sstable_split = sstable.split("-")
sstable_split.pop(0)
cassandra_filename_prefix = "-".join(sstable_split)
for type in sstable_files:
downloaded_sstable_filename = sstable + type
downloaded_sstable_path = table_dir + downloaded_sstable_filename
cassandra_filename = cassandra_filename_prefix + type
cassandra_sstable_path = cassandra_table_path + cassandra_filename
logging.debug("Copying downloaded file from %s to %s", downloaded_sstable_path,
cassandra_sstable_path)
shutil.copy(downloaded_sstable_path, cassandra_sstable_path)
logging.info("Complete moving SSTables to Cassandra dir in %.2fs", time.time() - moving_start_time)
backup_json = get_latest_backup_json()
keyspace_tables_to_restore = get_keyspace_tables_to_restore(backup_json)
download_all_sstables_from_s3(keyspace_tables_to_restore)
move_downloaded_sstables_to_cassandra_location(keyspace_tables_to_restore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment