Skip to content

Instantly share code, notes, and snippets.

def print_report(self, timestamp):
if self.options.show_process_user:
self.printer ("Timestamp\tUSER\tPID\tprio\tpolicy\tCommand")
else:
self.printer ("Timestamp\tUID\tPID\tprio\tpolicy\tCommand")
processes = self.process_filter.filter_processes(self.process_priority.get_processes())
for process in processes:
if self.options.show_process_user:
self.printer("%s\t%s\t%s\t%s\t%s\t%s" % (timestamp,process.user_name(),process.pid(),process.priority(),process.policy(),process.process_name()))
else:
class TestUserCpuUsage(unittest.TestCase):
def test_get_processes(self):
metric_repository = mock.Mock()
cpu_usage = CpuUsage(metric_repository)
def current_values_side_effect(metric):
if metric == 'proc.psinfo.pid':
return {1: 1, 2: 2, 5: 5, 10: 10}
metric_repository.current_values = mock.Mock(side_effect=current_values_side_effect)
def test_print_report_filtering(self):
cpu_usage = mock.Mock()
process_filter = mock.Mock()
printer = mock.Mock()
# TODO: implement interactions with the mocks
reporter = CpuUsageReporter(cpu_usage, process_filter, 1, printer)
reporter.print_report(123, 4)
printer.assert_called_with("123 1000 0.0 0.0 1.0 ...")
class ProcessFilter:
def __init__(self,options):
self.options = options
def filter_processes(self, processes):
filter(lambda p: self.__predicate(p), processes)
def __predicate(self, process):
return self.__matches_username(process) and self.__matches_pid(process) and ...
# Example
#
# Existing:
class TestProcessPriority(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestProcessPriority, self).__init__(*args, **kwargs)
self.__metric_repository = mock.Mock()
self.__metric_repository.current_value = mock.Mock(side_effect=self.metric_repo_current_value_side_effect)
# Change to:
class ProcessCpuUsage:
def __init__(self, instance, delta_time, metrics_repository):
self.instance = instance
self.__delta_time = delta_time
self.__metric_repository = metrics_repository
def user_percent(self):
# Pulled out of CpuUsage class
percent_of_time = 100 * float(self.__metric_repository.current_value('proc.psinfo.utime', self.instance) - self.__metric_repository.previous_value('proc.psinfo.utime', self.instance)) / float(1000 * self.__delta_time)
return float("%.2f"%percent_of_time)
class CpuUsage:
def __init__(self, metric_repository):
self.__metric_repository = metric_repository
def get_processes(self, delta_time):
# TODO:
return map(lambda pid: (self.create_process_cpu_usage(pid)), self.__pids)
def __pids(self):
# TODO: use ReportingMetricRepository to get all pids
class ProcessCpuUsage:
def __init__(self, user_percent, guest_percent, system_percent, pid, process_name, cpu_number):
self.user_percent = user_percent
self.guest_percent = guest_percent
self.system_percent = system_percent
self.total_percent = user_percent + guest_percent + system_percent
self.pid = pid
self.process_name = process_name
self.cpu_number = cpu_number
class ProcessCpuUsage:
def __init__(self, user_percent, guest_percent, system_percent):
self.user_percent = user_percent
self.guest_percent = guest_percent
self.system_percent = system_percent
self.total_percent = user_percent + guest_percent + system_percent
class CpuUsage:
def __init__(self, metric_repository):
self.__metric_repository = metric_repository
# A new ReportingMetricsRepository should be used for each iteration of the report cycle.
# It wraps
class ReportingMetricsRepository:
def __init__(self, group):
self.group = group
self.__current = {}
self.__previous = {}
def current_value(self, metric, instance):