Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Last active June 17, 2020 07:19
Show Gist options
  • Save Ivlyth/b00d2d66b074bca19522d45fe7d90975 to your computer and use it in GitHub Desktop.
Save Ivlyth/b00d2d66b074bca19522d45fe7d90975 to your computer and use it in GitHub Desktop.
network work card interface stats
# -*- coding:utf8 -*-
"""
Author : Myth
Date : 2020/6/16
Email : email4myth at gmail.com
"""
from __future__ import unicode_literals
import argparse
import math
import signal
import sys
import time
from collections import namedtuple
import psutil
from datetime import datetime
RUNNING = True
options = None
def quit(sig, handler):
global RUNNING
RUNNING = False
signal.signal(signal.SIGINT, quit)
NicCounter = namedtuple('NicCounter', ('ts',
'packets', 'packets_dropped', 'bytes'))
NicStats = namedtuple('NicStats', ('ts',
'packets', 'packets_dropped', 'pps', 'max_pps', 'bytes', 'bandwidth',
'max_bandwidth'))
class ArgumentDefaultsHelpFormatter(argparse.HelpFormatter):
"""Help message formatter which adds default values to argument help.
Only the name of this class is considered a public API. All the methods
provided by the class are considered an implementation detail.
"""
def _get_help_string(self, action):
help = action.help
if '%(default)' not in action.help:
if action.default is not argparse.SUPPRESS:
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
if action.option_strings or action.nargs in defaulting_nargs:
if action.default is not None:
help += ' (default: %(default)s)'
return help
def define_and_parse_args():
global options
parser = argparse.ArgumentParser(prog="PROG",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('-i', '--ifname', required=True, help=u'network card interface name')
parser.add_argument('-R', '--no-rx', action='store_true', help='no rx stats')
parser.add_argument('-T', '--no-tx', action='store_true', help='no tx stats')
parser.add_argument('-d', '--plus-dropped', action='store_true', help='plus dropped to total packets')
parser.add_argument('-u', '--unit-base', default=1000.0, type=float, help='unit base')
parser.add_argument('-m', '--max-history', default=1000, type=int, help='max history count')
parser.add_argument('-k', '--keep-old-data', action='store_true', help='keep old data')
parser.add_argument('-N', '--no-print-when-continue-zero', default=3, type=int, help='no print when continue zero')
parser.add_argument('-f', '--date-format', default='%H:%M:%S', help='date format')
parser.add_argument('-r', '--reset-counter-when-continue-zero', action='store_true',
help='reset old counter when continue zero')
options = parser.parse_args()
if options.ifname not in psutil.net_io_counters(pernic=True):
parser.error('NIC %s not found!' % options.ifname)
try:
datetime.now().strftime(options.date_format)
except Exception:
parser.error("invalid date format: %s" % options.date_format)
options.old_rx_counter, options.old_tx_counter = get_counter(options.ifname, True)
return options
def get_counter(ifname, keep_old_data=False):
counter = psutil.net_io_counters(pernic=True)[ifname]
ts = time.time()
rx_counter = NicCounter(ts,
counter.packets_recv, counter.dropin, counter.bytes_recv)
tx_counter = NicCounter(ts,
counter.packets_sent, counter.dropout, counter.bytes_sent)
if not keep_old_data:
old_rx_counter = options.old_rx_counter
old_tx_counter = options.old_tx_counter
rx_counter = NicCounter(rx_counter.ts,
rx_counter.packets - old_rx_counter.packets,
rx_counter.packets_dropped - old_rx_counter.packets_dropped,
rx_counter.bytes - old_rx_counter.bytes)
tx_counter = NicCounter(tx_counter.ts,
tx_counter.packets - old_tx_counter.packets,
tx_counter.packets_dropped - old_tx_counter.packets_dropped,
tx_counter.bytes - old_tx_counter.bytes)
return rx_counter, tx_counter
def main():
options = define_and_parse_args()
last_rx_counter = None
last_tx_counter = None
last_rx_stats = None
last_tx_stats = None
rx_continue_zero_count = 0
tx_continue_zero_count = 0
reset_rx = reset_tx = False
while RUNNING:
rx_counter, tx_counter = get_counter(options.ifname, options.keep_old_data)
if last_rx_counter:
time_diff = rx_counter.ts - last_rx_counter.ts
rx_packets = (
rx_counter.packets + rx_counter.packets_dropped) if options.plus_dropped else rx_counter.packets
rx_packets_dropped = rx_counter.packets_dropped
if options.plus_dropped:
rx_pps = math.ceil(((rx_counter.packets + rx_counter.packets_dropped) - (
last_rx_counter.packets + last_rx_counter.packets_dropped)) * 1.0 / time_diff)
else:
rx_pps = math.ceil((rx_counter.packets - last_rx_counter.packets) * 1.0 / time_diff)
if last_rx_stats:
rx_max_pps = max(rx_pps, last_rx_stats.max_pps)
else:
rx_max_pps = rx_pps
rx_bytes = rx_counter.bytes
rx_bandwidth = round((rx_counter.bytes - last_rx_counter.bytes) * 1.0 / time_diff, 2)
if last_rx_stats:
rx_max_bandwidth = max(rx_bandwidth, last_rx_stats.max_bandwidth)
else:
rx_max_bandwidth = rx_bandwidth
rx_stats = NicStats(rx_counter.ts,
rx_packets, rx_packets_dropped, rx_pps, rx_max_pps, rx_bytes, rx_bandwidth,
rx_max_bandwidth)
last_rx_stats = rx_stats
if not options.no_rx:
if rx_stats.pps == 0:
rx_continue_zero_count += 1
else:
rx_continue_zero_count = 0
if options.no_print_when_continue_zero > 0 and rx_continue_zero_count > options.no_print_when_continue_zero:
if options.reset_counter_when_continue_zero:
reset_rx = True
if rx_continue_zero_count == options.no_print_when_continue_zero + 1:
print '[%s] RX - [will continue when non-zero stats coming ... %s]' % (
datetime.fromtimestamp(rx_stats.ts).strftime(options.date_format),
'counter reset' if options.reset_counter_when_continue_zero else ''
)
else:
print '[%s] RX - Packets: %10d(dropped: %8d), PPS: %8d(max: %8d), Traffic: %10.3fMB, Bandwidth: %8.3fMbps(max: %8.3fMbps)' % (
datetime.fromtimestamp(rx_stats.ts).strftime(options.date_format),
rx_stats.packets, rx_stats.packets_dropped, rx_pps, rx_max_pps,
rx_bytes * 1.0 / options.unit_base / options.unit_base,
rx_bandwidth * 8.0 / options.unit_base / options.unit_base,
rx_max_bandwidth * 8.0 / options.unit_base / options.unit_base
)
sys.stdout.flush()
# -------------------------------------
if last_tx_counter:
tx_packets = (
tx_counter.packets + tx_counter.packets_dropped) if options.plus_dropped else tx_counter.packets
tx_packets_dropped = tx_counter.packets_dropped
if options.plus_dropped:
tx_pps = math.ceil(((tx_counter.packets + tx_counter.packets_dropped) - (
last_tx_counter.packets + last_tx_counter.packets_dropped)) * 1.0 / time_diff)
else:
tx_pps = math.ceil((tx_counter.packets - last_tx_counter.packets) * 1.0 / time_diff)
if last_tx_stats:
tx_max_pps = max(tx_pps, last_tx_stats.max_pps)
else:
tx_max_pps = tx_pps
tx_bytes = tx_counter.bytes
tx_bandwidth = round((tx_counter.bytes - last_tx_counter.bytes) * 1.0 / time_diff, 2)
if last_tx_stats:
tx_max_bandwidth = max(tx_bandwidth, last_tx_stats.max_bandwidth)
else:
tx_max_bandwidth = tx_bandwidth
tx_stats = NicStats(tx_counter.ts,
tx_packets, tx_packets_dropped, tx_pps, tx_max_pps, tx_bytes, tx_bandwidth,
tx_max_bandwidth)
last_tx_stats = tx_stats
if not options.no_tx:
if tx_stats.pps == 0:
tx_continue_zero_count += 1
else:
tx_continue_zero_count = 0
if options.no_print_when_continue_zero > 0 and tx_continue_zero_count > options.no_print_when_continue_zero:
if options.reset_counter_when_continue_zero:
reset_tx = True
if tx_continue_zero_count == options.no_print_when_continue_zero + 1:
print '[%s] TX - [will continue when non-zero stats coming ... %s]' % (
datetime.fromtimestamp(tx_stats.ts).strftime(options.date_format),
'counter reset' if options.reset_counter_when_continue_zero else ''
)
else:
print '[%s] TX - Packets: %10d(dropped: %8d), PPS: %8d(max: %8d), Traffic: %10.3fMB, Bandwidth: %8.3fMbps(max: %8.3fMbps)' % (
datetime.fromtimestamp(tx_stats.ts).strftime(options.date_format),
tx_stats.packets, tx_stats.packets_dropped, tx_pps, tx_max_pps,
tx_bytes * 1.0 / options.unit_base / options.unit_base,
tx_bandwidth * 8.0 / options.unit_base / options.unit_base,
tx_max_bandwidth * 8.0 / options.unit_base / options.unit_base
)
sys.stdout.flush()
if reset_rx:
options.old_rx_counter, _ = get_counter(options.ifname, keep_old_data=True)
reset_rx = False
last_rx_counter = None
last_rx_stats = None
else:
last_rx_counter = rx_counter
if reset_tx:
_, options.old_tx_counter = get_counter(options.ifname, keep_old_data=True)
reset_tx = False
last_tx_counter = None
last_tx_stats = None
else:
last_tx_counter = tx_counter
time.sleep(0.999)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment