Created
February 8, 2020 10:09
-
-
Save Leetroch/00e857c1c9948d2197765a099fa0f548 to your computer and use it in GitHub Desktop.
Personal RAM program
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psutil | |
import math | |
import time | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import winsound | |
#import sys | |
import numpy as np | |
import copy | |
def main(): | |
while 1: | |
content = input() | |
# 输入exit退出 | |
if content == 'exit': | |
exit() | |
# 输入pause暂停后台任务执行 | |
elif content == 'pause': | |
scheduler.pause() | |
# 输入resume继续执行后台任务 | |
elif content == 'resume': | |
scheduler.resume() | |
else: | |
continue | |
def occ_rate() -> float: | |
memory = psutil.virtual_memory() | |
occ_rate = float(memory.used) / float(memory.total) * 100 | |
return float(occ_rate) | |
def time_details() -> str: | |
now = int(round(time.time() * 1000)) | |
now_1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now / 1000)) | |
"上面这两行检测通过,数据正常;occ_warning为警告值" | |
return now_1 | |
def statistics(): | |
# ram occupancy MAX,Average,minim. | |
occ_history = [] | |
occ = occ_rate() | |
occ_history.append(occ) | |
print(occ_history) | |
max_occ_rate = max(occ_history) | |
min_occ_rate = min(occ_history) | |
avg_occ_rate = np.mean(occ_history) | |
print("max = : %2.f" % max_occ_rate) | |
print("minum = : %2.f" % min_occ_rate) | |
print("avgerage =: %2.f" % avg_occ_rate) | |
def memory_panel(): | |
memory = psutil.virtual_memory() | |
ratio = math.pow(1024, 3) # X Byte = X GB / 1024^3 | |
print('已使用内存: %.2f GB' % (memory.used / ratio)) | |
'''需要一个利用上行数据进行统计得出最大值和最小值以及超出设定阈值并记录次数。''' | |
print('总内存: %.2f GB' % (memory.total / ratio)) | |
'''occ_rate代表内存占用率''' | |
m_rate = occ_rate() | |
print("内存占用率: ") # .2f是表示保留两位小数。 | |
print('{0}{1}'.format("¦" * int(m_rate / 10), '%.2f%%' % (m_rate))) | |
statistics() | |
print(psutil.swap_memory()) | |
print("时间: %s" % time_details()) | |
print("退出请输入exit(请区分大小写): ") | |
print('==========================') | |
'_______________________________________________________________________________' | |
def occ_warning_recorder(): | |
""" | |
:type rate: float | |
""" | |
# global file | |
# global now2 | |
occ_warning: str | |
rate = occ_rate() | |
if rate < 90: | |
time.sleep(10) | |
pass | |
elif rate >= 90: | |
duration = 1000 # millisecond | |
freq = 440 # Hz | |
winsound.Beep(freq, duration) | |
# length_of_w = len(occ_warning) | |
now2 = time_details() | |
occ_warning = str(rate) + '%' | |
try: | |
file = open('//memory_float_log.txt', | |
'a') # 写入警告值到log.txt当中,在'w'写入模式下,当我们下次写入变量时,会覆盖txt,有一个追加模式'a',可以实现多次写入 | |
output_data = [now2, occ_warning] | |
"—".join(output_data) # 在这个程序中,join方法输出的是list类型,默认是str类型 | |
file.write(str(output_data)) | |
file.write('\n') | |
except(IOError): | |
print(f'save failed: unable to write to memory_float_log.txt') | |
return False | |
else: | |
pass | |
'_________________________________________________________________________________________________' | |
if __name__ == '__main__': | |
try: | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(memory_panel, 'interval', seconds=8) | |
scheduler.add_job(occ_warning_recorder, 'interval', minutes=1.1) | |
scheduler.start() | |
main() | |
except(KeyboardInterrupt, SystemExit): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment