Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active October 12, 2017 23:04
Show Gist options
  • Save onefoursix/5c482aab501472983d52 to your computer and use it in GitHub Desktop.
Save onefoursix/5c482aab501472983d52 to your computer and use it in GitHub Desktop.
Example of how to retrieve BDR history using the Cloudera Manager API
#!/usr/bin/python
## ********************************************************************************
## get-bdr-history.py
##
## Example of how to retrieve BDR command history using the Cloudera Manager API
##
## Usage: ./get-bdr-history.py <limit>
##
## <limit> is the maximum number of replication commands to retrieve
## history for per scheduled replcation; the default is 20
##
## for example: ./get-bdr-history.py 100
##
## The rest of the values are set in the script below; edit the settings
## below to connect to your Backup Cluster
##
## The script assumes one HDFS and one Hive Service exist on the Backup Cluster
##
## ********************************************************************************
## ** imports *******************************
import sys
import time
from cm_api.api_client import ApiResource
## ** Settings to connect to Backup cluster **
cm_host = "BACKUP_CLUSTER_CM_HOST"
cm_port = "7180"
cm_login = "BACKUP_CLUSTER_CM_LOGIN"
cm_password = "BACKUP_CLUSTER_CM_PASSWORD"
backup_cluster_name = "BACKUP_CLUSTER_NAME"
## ** Get command line args ******************
if len(sys.argv) == 1:
limit = 20
else:
if len(sys.argv) == 2 and sys.argv[1].isdigit():
limit = sys.argv[1]
else:
print "Usage: ./get-bdr-history.py <limit>"
quit(1)
## Used for formatting dates
fmt = '%Y-%m-%d %H:%M:%S %Z'
print "\n\nCloudera BDR Replication History for Hive and HDFS -- " + time.strftime(fmt)
## Connect to CM
print "\nConnecting to Cloudera Manager at " + cm_host + ":" + cm_port
api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password)
## Get Backup Cluster
backup_cluster = None
clusters = api.get_all_clusters()
for cluster in clusters:
if cluster.displayName == backup_cluster_name:
backup_cluster = cluster
break
if backup_cluster is None:
print "\nError: Cluster '" + backup_cluster_name + "' not found"
quit(1)
## Get HDFS Service
hdfs_service = None
service_list = backup_cluster.get_all_services()
for service in service_list:
if service.type == "HDFS":
hdfs_service = service
break
if hdfs_service is None:
print "Error: Could not locate HDFS Service"
quit(1)
## Get Hive Service
hive_service = None
service_list = backup_cluster.get_all_services()
for service in service_list:
if service.type == "HIVE":
hive_service = service
break
if hive_service is None:
print "Error: Could not locate Hive Service"
quit(1)
## Get Hive replication schedules
schedules = hive_service.get_replication_schedules()
## Iterate through the Hive replication schedules
for schedule in schedules:
print "\n\n******************************************************************************"
print "** Hive Replication Schedule (ID = " + str(schedule.id) + ") ****************************************"
print "******************************************************************************"
## Get the Hive Replication Arguments in order to get Hive-specific replication schedule settings
hive_args = schedule.hiveArguments
## Will be true if the Hive replication is configured to also copy data from HDFS
replicate_data = hive_args.replicateData
## If applicable, get the HDFS Replication Arguments for the Hive job
if replicate_data:
hdfs_args = hive_args.hdfsArguments
## Print details of the replication schedule
print "Start Time: " + schedule.startTime.strftime(fmt)
if schedule.interval is not None and schedule.nextRun is not None:
print "Recurrance: " + str(schedule.interval) + " " + schedule.intervalUnit
print "Next Scheduled: " + schedule.nextRun.strftime(fmt)
print "Source Cluster: " + hive_args.sourceService.peerName
print "Destination Cluster: " + backup_cluster_name
## Print list of selected tables in the replication schedule
tableFilters = hive_args.tableFilters
if tableFilters is None:
print "\nTables Specified for Replication: All Tables in All Databases"
else:
for table in tableFilters:
print "\nTables Specified for Replication:"
print " " + table.database + "." + table.tableName
## Get the history of replication commands for the Hive replication schedule
command_history = hive_service.get_replication_command_history(schedule_id=schedule.id, limit=limit, view='full')
print "\nReplications:"
## print some details for each command
for command in command_history:
hive_result = command.hiveResult
print "\n** Hive Replication (Command ID = " + str(command.id) + ") "
print "Start Time: " + command.startTime.strftime(fmt)
if command.active:
print "Command is still running"
break
print "End Time: " + command.endTime.strftime(fmt)
if not command.success:
print "Result Status: Failed"
else:
print "Result Status: Success"
## Get tables that were replicated
print " Tables Replicated:"
tables = hive_result.tables
for table in tables:
print " " + table.database + "." + table.tableName
## Get the tables' HDFS Data replication details
## I'll print a few attributes here, There are many more attributes in the class Class ApiHdfsReplicationResult
if replicate_data:
hdfs_result = hive_result.dataReplicationResult
print " Table data -- HDFS replication"
print " JobId: " + hdfs_result.jobId
print " Number of Files Expected: " + str(hdfs_result.numFilesExpected)
print " Number of Files Copied: " + str(hdfs_result.numFilesCopied)
print " Number of Files Skipped: " + str(hdfs_result.numFilesSkipped)
print " Number of Files Copy Failed: " + str(hdfs_result.numFilesCopyFailed)
## ## Get top level HDFS replication schedules
schedules = hdfs_service.get_replication_schedules()
## Iterate through all HDFS replication schedules
for schedule in schedules:
## Get the HDFS Arguments
hdfs_args = schedule.hdfsArguments
print "\n\n******************************************************************************"
print "** HDFS Replication Schedule (ID = " + str(schedule.id) + ") ****************************************"
print "******************************************************************************"
## Print details of the replication schedule
print "Start Time: " + schedule.startTime.strftime(fmt)
if schedule.interval is not None and schedule.nextRun is not None:
print "Recurrance: " + str(schedule.interval) + " " + schedule.intervalUnit
print "Next Scheduled: " + schedule.nextRun.strftime(fmt)
print "Source Cluster: " + hdfs_args.sourceService.peerName
print "Destination Cluster: " + backup_cluster_name
print " Source Path: " + hdfs_args.sourcePath
print " Destination Path: " + hdfs_args.destinationPath
print "\nReplications:"
## Get the history of commands for the scheduled HDFS replication
command_history = hdfs_service.get_replication_command_history(schedule_id=schedule.id, limit=limit, view='full')
for command in command_history:
hdfs_result = command.hdfsResult
print "\n** HDFS Replication (Command ID = " + str(command.id) + ") "
print "Start Time: " + command.startTime.strftime(fmt)
if command.active:
print "Command is still running"
break
print "End Time: " + command.endTime.strftime(fmt)
if not command.success:
print "Result Status: Failed"
else:
print "Result Status: Success"
## I'll print a few attributes here, There are many more attributes in the class Class ApiHdfsReplicationResult
print " JobId: " + hdfs_result.jobId
print " Source Path: " + hdfs_args.sourcePath
print " Destination Path: " + hdfs_args.destinationPath
print " Number of Files Expected: " + str(hdfs_result.numFilesExpected)
print " Number of Files Copied: " + str(hdfs_result.numFilesCopied)
print " Number of Files Skipped: " + str(hdfs_result.numFilesSkipped)
print " Number of Files Copy Failed: " + str(hdfs_result.numFilesCopyFailed)
print "\n\n\n** End of Report **"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment