Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python
import argparse
import logging
import re
import sys
import pprint
#-------------------------------------------------
UDEV_FILE = '/etc/udev/rules.d/70-persistent-net.rules'
NIC_ORDER = [
'igb', # Onboard NIC (Intel)
'e1000e', # GbE NIC (Intel)
'bnx2x', # 10GbE NIC (Broadcom)
]
# If you need to sort NICs with a special order based on
# mac address offset, please set the following list.
# If a order is not specified for a nic type, NICs are sorted
# based on MAC address in ascending order.
SORT_ORDER_PER_CARD = {
'e1000e': [1, 0, 3, 2],
'bnx2x': [0, 2],
}
# For test
#SORT_ORDER_PER_CARD = {
# 'e1000e': [3, 2, 1, 0],
# 'bnx2x': [3, 2, 1, 0],
#}
#-------------------------------------------------
def load_file(fn):
with open(fn) as f:
lines = f.readlines()
entries = {}
note = None
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('#'):
if line.startswith('# PCI device'):
note = line
continue
# udev entry
entry_dict = line2dict(line)
nic_type = get_nic_type_from_note(note) if note else '**NONE**'
entry = {'raw_data': line,
'note': note,
'mac_address': get_mac_addr(entry_dict)}
if nic_type not in entries:
entries[nic_type] = []
entries[nic_type].append(entry)
logging.debug('load data from "%(fn)s":\n%(data)s',
{'fn': fn, 'data': pprint.pformat(entries)})
return entries
def get_mac_addr(entry_dict):
mac_addr_str = entry_dict['ATTR{address}'].strip('"')
mac_addr_str = mac_addr_str.replace(':', '')
return int(mac_addr_str, 16)
def get_nic_type_from_note(line):
"""Extract a nic TYPE from '# PCI ..... (TYPE)'."""
return line.split('(')[1].replace(')', '')
def line2dict(line):
return dict([x.split('=') for x in line.replace('==', '=').split(', ')])
def check_unknown_nics(nic_types):
unknown_nic_types = set(nic_types) - set(NIC_ORDER)
if unknown_nic_types:
logging.error( '#################################')
logging.error( 'Unknown NIC type(s) found ---> %s',
', '.join(unknown_nic_types))
logging.error( 'Please add them to NIC_ORDER list.')
logging.error( '#################################')
sys.exit(1)
def sort_entries(nic_type, group_entries):
logging.debug('sort_entries: nic_type=%s', nic_type)
group_entries.sort(key=lambda x: x['mac_address'])
sort_special_order(nic_type, group_entries)
def sort_special_order(nic_type, group_entries):
"""Sort NICs with special order.
This method must be called after sort group entries with MAC address.
"""
order = SORT_ORDER_PER_CARD.get(nic_type)
if not order:
return
max_addr_diff = max(order)
base_addr = 0
stack_list = []
sorted_list = []
for e in group_entries:
logging.debug('sort_special_order: --> entry=%s', e)
addr_diff = e['mac_address'] - base_addr
if addr_diff <= max_addr_diff:
logging.debug('ADD')
stack_list.append((addr_diff, e))
if addr_diff == max_addr_diff:
logging.debug('FLUSH NOW')
flush_group_entry(sorted_list, stack_list, order)
else:
if stack_list:
logging.debug('FLUSH PREV')
flush_group_entry(sorted_list, stack_list, order)
logging.debug('INIT')
base_addr = e['mac_address']
stack_list.append((0, e))
if stack_list:
logging.debug('FLUSH LAST')
flush_group_entry(sorted_list, stack_list, order)
group_entries[:] = sorted_list
def flush_group_entry(sorted_list, stack_list, order):
logging.debug('(stack_list)\n%s', pprint.pformat(stack_list))
offset_dict = dict((v, i) for i, v in enumerate(order))
stack_list.sort(key=lambda x: offset_dict[x[0]])
sorted_list.extend([x[1] for x in stack_list])
stack_list[:] = []
logging.debug('(sorted_list)\n%s', pprint.pformat(sorted_list))
def sort_entries_group(entries):
for nic_type in entries:
sort_entries(nic_type, entries[nic_type])
def output_sorted_data(entries):
logging.debug('------- Final results -------')
index = 0
output_data = []
for nic_type in NIC_ORDER:
for entry in entries.get(nic_type, []):
data = re.sub('NAME="eth[0-9]+"', 'NAME="eth%d"' % index,
entry['raw_data'])
index += 1
output_data.extend(['', entry['note'], data])
for line in output_data:
logging.debug('%s', line)
output_str = '\n'.join(output_data)
print output_str
def count_entries(entries):
return sum([len(group) for group in entries.values()])
def check_final_result(entries, num_before):
num_after = count_entries(entries)
if num_before != num_after:
logging.critical('')
logging.critical('------------------------------------------------')
logging.critical('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
logging.critical('The number of udev entries does not match.'
' (before=%s, after=%s)' % (num_before, num_after))
logging.critical('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
sys.exit(2)
#---------------------------------------
def optparse():
parser = argparse.ArgumentParser()
parser.add_argument('--debug', action='store_true')
parser.add_argument('--udevfile', default=UDEV_FILE,
help='default:%s' % UDEV_FILE)
return parser.parse_args(sys.argv[1:])
def main():
conf = optparse()
level = logging.DEBUG if conf.debug else logging.WARNING
logging.basicConfig(level=level)
entries = load_file(conf.udevfile)
num_before = count_entries(entries)
check_unknown_nics(entries.keys())
sort_entries_group(entries)
check_final_result(entries, num_before)
output_sorted_data(entries)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment