Skip to content

Instantly share code, notes, and snippets.

@aaronlake
Created February 13, 2015 20:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronlake/a081e5a8280f043abc8f to your computer and use it in GitHub Desktop.
Save aaronlake/a081e5a8280f043abc8f to your computer and use it in GitHub Desktop.
Solaris Disk Info
#!/usr/bin/python -tt
'''
dskinfo - Solaris utility for listing disk information
dskinfo collects and summarizes information for local, fiber-channel and iSCSI
disks in Solaris. It can display size of disks, use in imported or exported
zpools, mounted slices, LUN id and product-vendor data for disks.
While it was primarily written to handle systems using ZFS it can be useful in
any Solaris environment. It especially makes it easier handling large amounts
of SAN attached disks using the MPxIO naming scheme.
Given the lack of disk managing commands and interfaces in Solaris dskinfo
currently relies heavily on parsing output from various other commands. This is
not a stable interface but dskinfo have been tested on Solaris 10 update 5
through 9, OpenSolaris up to and including build 146, Solaris 11 Express,
OpenIndiana and NexentaCore 3.0.
The regular expression to exclude CD-ROM/DVD devices partly relies on parsing
a string that varies between systems so this might fail on some systems.
It has been tested on the following hardware:
ThinkPad T61, various generic X86 servers, Ultra 2, M3000, M4000, M5000,
M8000, T5240, T5240, v240, 245, v440, v490. These servers have had a mixture
of local disks and several was connected to a SAN. Some testing has also
been done in VirtualBox and VMWare.
It does not require root permission to run but FC/iSCSI data and path dedup
requires running as a privileged user.
CDDL HEADER START
The contents of this file are subject to the terms of the
Common Development and Distribution License, Version 1.0 only
(the "License"). You may not use this file except in compliance
with the License.
You can obtain a copy of the license at Docs/cddl1.txt
or http://www.opensolaris.org/os/licensing.
See the License for the specific language governing permissions
and limitations under the License.
CDDL HEADER END
Copyright (c) 2010, 2011, Henrik Johansson, henrikj (at) gigamega.se.
Changelog:
20100907 0.5 Initial version.
20110426 0.6 Small fixes and minor readability changes.
20110617 0.8 Fixed LUN dec 0 False bug, cached exec data for scalability.
20110620 0.9 Fixed iscsi even without -s and pylint fixes, added help.
20110710 1.0 Warning when SVM used or VxVM installed, use prtconf for serial,
stricter regexps, quiet option, added instance names.
'''
import getopt
import os
import re
import subprocess
import sys
import signal
class ExecuteException(Exception):
''' Execption for failed executions or !0 return codes '''
pass
class DskInfo:
''' DskInfo class collects data for local, FC and iSCSI disks '''
def __init__(self, scan, summary):
self.summary = summary
try:
self.iostat_disks = self.get_iostat_disks()
except ExecuteException, ex:
print >> sys.stderr, "Failed to list disks with iostat: %s" % ( ex)
sys.exit(1)
try:
self.iostat_en = execute("/usr/bin/iostat -En")
self.cfgadm_data = execute("/usr/sbin/cfgadm -al")
except ExecuteException, ex:
print >> sys.stderr, "Failed collect system data: %s" % ( ex )
sys.exit(1)
r_pools = re.compile("^(.+?)\t.*$", re.MULTILINE)
data = execute("/usr/sbin/zpool list -H")
self.zpools = r_pools.findall(data)
self.zpool_data = dict()
for pool in self.get_zpools():
self.zpool_data[pool] = execute("/usr/sbin/zpool status %s"
% (pool))
self.zimport_data = list()
self.host_adapters = False
self.iscsiadm_data = False
if os.getuid() != 0:
return
try:
self.luxadm_disks = self.get_luxadm_disks()
self.host_adapters = self.get_hba_ports()
except ExecuteException:
self.luxadm_disks = False
self.host_adapters = False
try:
self.iscsiadm_data = execute("/usr/sbin/iscsiadm list target -S")
except ExecuteException:
self.iscsiadm_data = False
if scan:
try:
self.zimport_data = execute("/usr/sbin/zpool import")
except ExecuteException:
self.zimport_data = list()
def get_luxadm_disks(self):
''' Get disk as seen by luxadm probe '''
r_rdsk = re.compile("\/rdsk\/(.+)s2")
lux_rdata = execute("/usr/sbin/luxadm probe")
return r_rdsk.findall(lux_rdata)
def get_iostat_disks(self):
''' Get disk as seen by iostat '''
r_iostat = re.compile("c\d.*d\d+")
io_rdata = execute("/usr/bin/iostat -en")
return r_iostat.findall(io_rdata)
def get_hba_ports(self):
''' Return a dict with HBA ports on host with speed '''
hbas = dict()
r_port = re.compile("HBA Port WWN: (.+)")
r_speed = re.compile("Current Speed: (.+Gb)")
data = execute("/usr/sbin/fcinfo hba-port")
ports = r_port.findall(data)
for port in ports:
data = execute("/usr/sbin/fcinfo hba-port %s" % ( port ))
match = r_speed.search(data)
if match:
hbas[port] = match.group(1)
else:
hbas[port] = False
return hbas
def get_hba_speed(self, wwn):
''' Return speed of HBA '''
return self.host_adapters[wwn]
def get_disk_type(self, disk):
''' Get disk type from cfgadm '''
r_type = re.compile(".*/%s\s+(.+?)\s" % disk)
match = r_type.search(self.cfgadm_data)
if match:
return match.group(1)
else:
return False
def get_disk_bytes(self, disk):
''' Get disk size in bytes from iostat '''
r_size = re.compile("^%s.*?Size:.*?<(\d+) bytes>" % ( disk ), re.M|re.S)
match = r_size.search(self.iostat_en)
size_bytes = match.group(1)
if match:
return size_bytes
else:
return False
def get_luxadm_data(self, disk):
''' Get luxadm display data structure for disk. '''
lunid = False
luxdata = dict()
numpath = 0
data = execute("/usr/sbin/luxadm disp /dev/rdsk/%ss2" % (disk))
r_port = re.compile("Host controller port WWN.*\t+?(.+)")
match = r_port.search(data)
if match:
luxdata['port'] = match.group(1)
else:
luxdata['port'] = False
r_ids = re.compile("Device Address.+,(.+)")
r_serial = re.compile("Serial Num:\s+(\S+)\s+")
match = r_serial.search(data)
if match:
luxdata['serial'] = match.group(1)
else:
luxdata['serial'] = False
devids = r_ids.findall(data)
if not devids:
r_ids = re.compile("/devices.+,(.+):.,raw")
devids = r_ids.findall(data)
for devid in devids:
if lunid and lunid != devid:
lunid = False
numpath = False
break
else:
numpath = numpath + 1
lunid = devid
luxdata['lunid'] = lunid
luxdata['numpath'] = numpath
return luxdata
def get_zpools(self):
''' Returns a list of zpools on sytem '''
return self.zpools
def get_disk_zpool(self, disk):
''' Check if disk is used in any ZFS pool. If data from zpool import
is available we check if it's used in any expoted pool '''
r_pool = re.compile("pool: (.+)")
r_disk = re.compile("\W%s(s\d)*\W" % (disk))
for pool in self.zpool_data.keys():
if r_disk.search(self.zpool_data[pool]):
return pool
if not os.getuid() == 0 or not self.zimport_data:
return False
for line in self.zimport_data:
match = r_pool.search(line)
if match:
pool = match.group(1)
if r_disk.search(line):
return "%s*" % (pool)
return False
def get_disk_driver(self, disk):
''' Returns the name and instance of the driver used for the disk '''
r_driver = re.compile("([a-z]+)(\d+)")
data = execute("/usr/bin/iostat -e %s" % (disk))
match = r_driver.search(data)
return match.group(1), match.group(2)
def get_disk_serial(self, disk):
''' Returns the drive serial using prtconf '''
r_serial = re.compile("inquiry-serial-no.*\n.+value='(.+?)'", re.M)
try:
data = execute("/usr/sbin/prtconf -v /dev/dsk/%ss2" % ( disk ))
except ExecuteException:
return False
match = r_serial.search(data)
if match:
return match.group(1)
else:
return False
def is_disk_iscsi(self, disk):
''' Check if disk is on iSCSI transport '''
r_disk = re.compile("OS Device Name: /dev/rdsk/%ss2" % (disk))
if not self.iscsiadm_data:
return False
match = r_disk.search(self.iscsiadm_data)
if match:
return True
else:
return False
def get_disk_dict(self, disk):
''' Collect disk data and return it in a dictionary '''
data = dict()
driver, instance = self.get_disk_driver(disk)
data['name'] = disk
data['size'] = self.get_disk_bytes(disk)
data['zpool'] = self.get_disk_zpool(disk)
data['driver'] = driver
data['lunhex'] = False
data['lundec'] = False
data['fcspeed'] = False
data['numpath'] = False
data['product'] = False
data['vendor'] = False
data['nopatch'] = False
data['serial'] = False
data['port'] = False
data['instance'] = driver + instance
if os.getuid() == 0 and self.is_disk_iscsi(disk):
disktype = "iscsi"
elif data['driver'] == "ssd":
disktype = "fc"
else:
disktype = "disk"
data['type'] = disktype
if self.summary:
return data
data['vendor'] = self.get_disk_vendor(disk)
data['product'] = self.get_disk_product(disk)
if os.getuid() == 0:
if self.is_disk_labeled(disk):
data['label'] = 'y'
else:
data['label'] = 'n'
else:
data['label'] = "-"
if os.getuid() != 0:
return data
try:
luxdata = self.get_luxadm_data(disk)
except ExecuteException:
logger("luxadm display failed for %s" % disk)
data['serial'] = self.get_disk_serial(disk)
return data
# we make both values strings otherwise a 0 would be false for int.
if luxdata['lunid']:
data['lunhex'] = "0x%s" % (luxdata['lunid'])
data['lundec'] = "%s" % int(luxdata['lunid'], 16)
if luxdata['port']:
data['fcspeed'] = self.get_hba_speed(luxdata['port'])
if luxdata['serial']:
data['serial'] = luxdata['serial']
if luxdata['port']:
data['port'] = luxdata['port']
data['numpath'] = luxdata['numpath']
return data
def get_disk_product(self, disk):
''' Return the product part of the disk data from iostat'''
r_prod = re.compile(
"^%s.+\n(?:.+\n){0,2}.+Product:\s+?(.+?)\s+?Revision:" %
( disk ), re.M)
match = r_prod.search(self.iostat_en)
if match:
return match.group(1)
else:
return False
def get_disk_vendor(self, disk):
''' Return the vendor part of the disk data from iostat '''
r_vendor = re.compile(
"^%s.*\n(?:.+\n){0,2}Vendor:\s+(\S+)\s+Product:" %
( disk ), re.M)
match = r_vendor.search(self.iostat_en)
if match:
return match.group(1)
else:
return False
def is_disk_dvd(self, disk):
''' Try to determine if the disk is a CD/DVD-ROM. '''
r_dvd = re.compile("^%s.*\n.*Product: .*(DVD|CDROM|CD-ROM).* Revision:"
% ( disk ), re.M)
if self.get_disk_type(disk) == "CD-ROM":
return True
if r_dvd.search(self.iostat_en):
return True
else:
return False
def is_disk_labeled(self, disk):
''' Check if disk is labeled '''
try:
execute("/usr/sbin/prtvtoc /dev/rdsk/%ss2" % (disk))
except ExecuteException:
return False
return True
def is_disk_available(self, disk):
''' Check if the disk name is available on the host '''
if disk in self.iostat_disks:
return True
else:
return False
def sigint(signum, frame):
''' Exit gracefully if interrupted by SIGINT '''
sys.exit(0)
def logger(message):
''' Log message to stderr if option verbose is set '''
if opt_verbose:
print >> sys.stderr, message
def print_warning(message):
''' Log message to stderr unless quiet is set '''
if not opt_quiet:
print >> sys.stderr, message
def execute(cmd, return_zero=True):
'''Execute command and return stdout, throws ExecuteException command
has a non zero exit status and return_zero is True '''
re_args = re.compile("[^ \"]+|\"[\w\s]*\"")
args = re_args.findall(cmd)
logger("Executing: %s" % ( cmd ))
try:
proc = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
except OSError, ex:
raise ExecuteException(ex)
if proc.returncode != 0 and return_zero:
message = stderr
raise ExecuteException("Execute returned !0 for %s: %s" %
( args[0], message))
return stdout
def volume_manager_warning():
''' Print warning if volume managers are used or installed '''
try:
execute("/usr/sbin/metadb")
print_warning("Notice: Metadb exist, SVM disk usage not available in "
"listing.")
print_warning ("Notice: Try using metastat to view additional usage of "
"disks.")
except ( ExecuteException, OSError ):
pass
if os.path.exists("/usr/sbin/vxdisk"):
print_warning("Notice: VxVM installed, VxVM disk usage not available "
"in listing.")
print_warning("Notice: Try using vxdisk to view additional usage of "
"disks.")
def get_pretty_size(size_bytes):
''' Convert bytes of data into pretty string with unit '''
mbyte = 2 ** 20
gbyte = 2 ** 30
tbyte = 2 ** 40
pbyte = 2 ** 50
ebyte = 2 ** 60
try:
size_bytes = float(size_bytes)
except ValueError:
return False
if size_bytes / ebyte > 1:
unit = 'E'
value = size_bytes / ebyte
if size_bytes / pbyte > 1:
unit = 'P'
value = size_bytes / tbyte
if size_bytes / tbyte > 1:
unit = 'T'
value = size_bytes / tbyte
elif size_bytes / gbyte > 1:
unit = 'G'
value = size_bytes / gbyte
elif size_bytes / mbyte > 1:
unit = 'M'
value = size_bytes / mbyte
else:
unit = 'B'
value = size_bytes
# Since we want a pretty value, we do not show decimals for numbers over 10.
if value >= 10 or value == 0:
return "%s%s" % (int(value), unit)
else:
return "%s%s" % (round(value, 1), unit)
def print_short_header():
''' print header for print_short_info() '''
print "%-38s %-7s %-4s %-21s %-5s" % ('disk', 'inst', 'size', 'use', 'type')
def print_long_header():
''' print header for print_long_info() '''
print "%-38s %-4s %-5s %-13s %-3s %-3s %-4s %-2s" % ('disk', 'size',
'lun', 'use', 'p', 'spd', 'type', 'lb')
def print_full_header():
''' print two line header for print_full_info() '''
print "%-38s %-4s %-5s %-3s %-3s %-3s %-5s %-2s" % ('disk', 'size',
'hex', 'dec', 'p', 'spd', 'type', 'lb')
print " %-24s %-16s %-16s %-16s" % ("use", "vendor", "product", "serial")
def print_machine_parsable(disk):
''' Print a colon separated list of all data collected '''
print "%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s" % (disk['name'],
disk['instance'], get_pretty_size(disk['size']) or "-",
disk['lunhex'] or "-", disk['lundec'] or "-", disk['zpool'] or "-",
disk['numpath'] or "-", disk['fcspeed'] or "-", disk['type'],
disk['label'], disk['vendor'], disk['product'], disk['serial'])
def print_full_info(disk):
''' Full disk information on two lines with all collected data'''
product = disk['product']
if product and len(product) > 16:
product = product[0:16]
vendor = disk['vendor']
if vendor and len(vendor) > 16:
vendor = vendor[0:16]
serial = disk['serial']
if serial and len(serial) > 16:
serial = serial[0:16]
print "%-38s %-4s %-5s %-3s %-3s %-3s %-5s %-2s" % (disk['name'],
get_pretty_size(disk['size']) or "-", disk['lunhex'] or "-",
disk['lundec'] or "-", disk['numpath'] or "-",
disk['fcspeed'] or "-", disk['type'], disk['label'])
print " %-24s %-16s %-16s %-16s" % (disk['zpool'] or "-",
vendor or "-", product or "-", serial or "-")
def print_short_info(disk):
''' Short verson of disk information, only name, inst, size, usage and
type.'''
print "%-38s %-7s %-4s %-21s %-5s" % (disk['name'], disk['instance'],
get_pretty_size(disk['size']) or "-",
disk['zpool'] or "-", disk['type'])
def print_long_info(disk, dec):
''' Long version of disk information, including FC related data'''
used = disk['zpool']
if used and len(used) > 13:
used = used[0:13]
if dec:
lunid = disk['lundec']
else:
lunid = disk['lunhex']
print "%-38s %-4s %-5s %-13s %-3s %-3s %-5s %-1s" % (disk['name'],
get_pretty_size(disk['size']) or "-", lunid or "-", used or "-",
disk['numpath'] or "-", disk['fcspeed'] or "-", disk['type'],
disk['label'])
def usage(ret=1):
''' Print program usage and exit with ret '''
print >> sys.stderr, """Usage:
dskinfo -h
dskinfo <subcommand> [-pusq] [disk]...
commands:
dskinfo list
dskinfo list-long [-d]
dskinfo list-full
dskinfo list-parsable
"""
sys.exit(ret)
def print_help():
''' Print help text and exit '''
print '''SYNOPSIS
dskinfo list [-p] [-s] [-u] [-q]
dskinfo list-long [-p] [-s] [-u] [-d] [-q]
dskinfo list-full [-p] [-s] [-u] [-q]
dskinfo list-parsable [-p] [-s] [-u] [-q]
dskinfo -h
SUBCOMMANDS
list: disk, instance, size, use and type on single line
list-long: list disk, size, lun (hex or dec), paths, spd, type and if
labeled on one line
list-full: Lists all available fields
list-parsable: All available fields in parsable format.
OPTIONS
-p
Path deduplication, show only disks with unique serial numbers. Used to
eliminate disks with multiple paths without MPxIO.
-s
Scan exported zpools. With this option all disks on the system will
be scanned to see if they are part of an active ZFS pool.
This can take a while on large systems since it executes "zpool import".
-u
Only show disks not currently used by any imported ZFS pool.
-d
Only for list-long, show logical units in decimal instead of hex.
-h
Prints this help and exits.
-q
Quiet, do not print warnings.
DATA FIELDS
1 disk: Solaris name of disk
2 instance: driver and instance number
3 size: Size of disk
4 hex: Logical Unit Number in hex
5 dec: Logical Unit Number in dec
6 use: Used in imported, exported pool (Marked with "*")
7 p: Number of paths to disk
8 spd: Speed of Fibre Channel link to disk
9 type: Driver type
10 lb: Is the disk labeled (y/n)
11 vendor: The disk vendor field from the disk
12 product: The product field from the disk
13 serial: The disk serial number if availabe
'''
sys.exit(0)
def main():
''' Main function, process arguments, itterate and print out disk info'''
global opt_verbose
global opt_quiet
opt_verbose = False
opt_quiet = False
options = {
'suppress_used' : False,
'dedup' : False,
'scan' : False,
'decimal' : False,
'summary' : False
}
commands = ['list', 'list-long', 'list-full', 'list-parsable', '-h']
if len(sys.argv) == 1:
usage(1)
cmd = sys.argv[1]
if not cmd in commands:
print >> sys.stderr, "Unknown command: %s" % ( cmd )
usage(1)
else:
command = cmd
if cmd == "-h":
print_help()
if command == "list-long":
valid_options = "vspduq"
else:
valid_options = "vspuq"
try:
opts, args = getopt.getopt(sys.argv[2:], valid_options)
except getopt.GetoptError, ex:
print str(ex)
usage(2)
for opt, arg in opts:
if opt == "-v":
opt_verbose = True
elif opt == "-q":
opt_quiet = True
elif opt == "-s":
options['scan'] = True
elif opt == "-p":
options['dedup'] = True
elif opt == "-u":
options['suppress_used'] = True
elif opt == "-d":
options['decimal'] = True
else:
assert False, "Unhandled option."
if os.getuid() != 0 and options['dedup']:
print >> sys.stderr, "ERROR: Must be root to used dedup."
sys.exit(1)
signal.signal(signal.SIGINT, sigint)
# We can only use the faster "summary" method of collectin data if we do a
# simple list without using disk dedup since it requires the serial.
if command == "list" and not options['dedup']:
options['summary'] = True
listed_serials = list()
dskinfo = DskInfo(options['scan'], options['summary'])
# If specific disks have been selected, check that they exist.
if args:
for disk in args:
if not dskinfo.is_disk_available(disk):
print >> sys.stderr, "Disk %s not available on system." % (disk)
sys.exit(1)
disks = args
else:
disks = dskinfo.get_iostat_disks()
r_drivers = re.compile("[sd|ssd|cmdk]")
if command != "list" and os.getuid() != 0:
print_warning("Notice: Not run as root, unable to display all "
"fields.")
# Print headers for all but the machine parsable output.
if cmd == "list-long":
print_long_header()
elif cmd == "list-full":
print_full_header()
elif cmd == "list":
print_short_header()
disks.sort()
# Iterate trough all available disks and print them according to
# command if not deselected by any option.
disk_dict = dict()
for disk in disks:
if dskinfo.is_disk_dvd(disk) and not args:
continue
disk_dict[disk] = dskinfo.get_disk_dict(disk)
if options['dedup']:
serial = disk_dict[disk]['serial']
if serial and serial in listed_serials:
continue
else:
listed_serials.append(disk_dict[disk]['serial'])
if not r_drivers.search(disk_dict[disk]['driver']):
continue
if options['suppress_used'] and disk_dict[disk]['zpool']:
continue
if cmd == "list-long":
print_long_info(disk_dict[disk], dec=options['decimal'])
if cmd == "list-full":
print_full_info(disk_dict[disk])
elif cmd == "list":
print_short_info(disk_dict[disk])
elif cmd == "list-parsable":
print_machine_parsable(disk_dict[disk])
volume_manager_warning()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment