Last active
May 19, 2023 07:48
-
-
Save synctam/6db8f3604192f7b67ba3b973efe07d2a to your computer and use it in GitHub Desktop.
Raspberry Pi のCPU温度、CPU利用率、メモリー利用率を取得し、Ambientにアップロードする。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Copyright (c) 2023 synctam@gmail.com | |
# Released under the MIT license | |
import os | |
import datetime | |
import psutil | |
import subprocess | |
import time | |
import logging | |
from datetime import datetime | |
import ambient | |
import requests | |
def GetCpuStat(): | |
Cmd = 'cat /proc/stat | grep cpu' | |
result = subprocess.Popen(Cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) | |
Rstdout, Rstderr = result.communicate() | |
LineList = Rstdout.splitlines() | |
# | |
TckList = [] | |
for Line in LineList: | |
ItemList = Line.split() | |
TckIdle = int(ItemList[4]) | |
TckBusy = int(ItemList[1])+int(ItemList[2])+int(ItemList[3]) | |
TckAll = TckBusy + TckIdle | |
TckList.append( [ TckBusy ,TckAll ] ) | |
return TckList | |
class CpuUsage: | |
def __init__(self): | |
self._TckList = GetCpuStat() | |
def get(self): | |
TckListPre = self._TckList | |
TckListNow = GetCpuStat() | |
self._TckList = TckListNow | |
CpuRateList = [] | |
for (TckNow, TckPre) in zip(TckListNow, TckListPre): | |
TckDiff = [ Now - Pre for (Now , Pre) in zip(TckNow, TckPre) ] | |
TckBusy = TckDiff[0] | |
TckAll = TckDiff[1] | |
CpuRate = int(TckBusy * 100 / TckAll) | |
CpuRateList.append(CpuRate) | |
return CpuRateList | |
# シェルコマンドを実行する関数 | |
def run_shell_command(command_str): | |
proc = subprocess.run(command_str, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) | |
result = proc.stdout.split("=") | |
return result[1].replace('\n', '') | |
def main(): | |
AmbiChannelID = os.environ['AMBI_CHANNEL_ID'] | |
AmbientReadKey = os.environ['AMBI_READ_KEY'] | |
AmbiWriteKey = os.environ['AMBI_WRITE_KEY'] | |
AmbiInterval = int(os.environ['AMBI_INTERVAL']) | |
AdGuardWaitTime = int(os.environ['ADGUARD_WAIT_TIME']) | |
logger = logging.getLogger("adpi") | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s:%(name)s - %(message)s') | |
file_handler = logging.FileHandler('adpi.log') | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
# AdGuard DNS が起動するまで待機する。 | |
logger.info("Wait for AdGuard DNS to start.") | |
time.sleep(AdGuardWaitTime) | |
logger.info("adpi started.") | |
gCpuUsage = CpuUsage() | |
ambi = ambient.Ambient(AmbiChannelID, AmbiWriteKey) | |
while True: | |
# CPU温度を取得 | |
temp = run_shell_command("vcgencmd measure_temp") | |
temp_f = float(temp.replace('\'C', '')) | |
# CPU使用率を取得 | |
CpuRateList = gCpuUsage.get() | |
CpuRate_i = int(CpuRateList[0]) | |
del CpuRateList[0] | |
# メモリー使用率を取得 | |
mem = psutil.virtual_memory() | |
mem_per_f = float(mem.percent) | |
# 結果送信 | |
#print("temp: {}, CpuRate: {}, mem_per:{}".format(temp_f, CpuRate_i, mem_per_f)) | |
try: | |
r = ambi.send({"d1": temp_f, "d2": CpuRate_i, "d3": mem_per_f}) | |
#print(r) | |
msg = "temp: {}, CpuRate: {}, mem_per:{}".format(temp_f, CpuRate_i, mem_per_f) | |
logger.info(msg) | |
except requests.exceptions.RequestException as e: | |
# 送信失敗時はメッセージを出力する。 | |
print('request failed: ', e) | |
# 待つ | |
time.sleep(AmbiInterval) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment