Skip to content

Instantly share code, notes, and snippets.

@synctam
Last active May 19, 2023 07:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save synctam/6db8f3604192f7b67ba3b973efe07d2a to your computer and use it in GitHub Desktop.
Save synctam/6db8f3604192f7b67ba3b973efe07d2a to your computer and use it in GitHub Desktop.
Raspberry Pi のCPU温度、CPU利用率、メモリー利用率を取得し、Ambientにアップロードする。
#!/usr/bin/python3
# Copyright (c) 2023 synctam@gmail.com
# Released under the MIT license
import os
import datetime
import psutil
import subprocess
import time
import logging
from datetime import datetime
import ambient
import requests
def GetCpuStat():
Cmd = 'cat /proc/stat | grep cpu'
result = subprocess.Popen(Cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
Rstdout, Rstderr = result.communicate()
LineList = Rstdout.splitlines()
#
TckList = []
for Line in LineList:
ItemList = Line.split()
TckIdle = int(ItemList[4])
TckBusy = int(ItemList[1])+int(ItemList[2])+int(ItemList[3])
TckAll = TckBusy + TckIdle
TckList.append( [ TckBusy ,TckAll ] )
return TckList
class CpuUsage:
def __init__(self):
self._TckList = GetCpuStat()
def get(self):
TckListPre = self._TckList
TckListNow = GetCpuStat()
self._TckList = TckListNow
CpuRateList = []
for (TckNow, TckPre) in zip(TckListNow, TckListPre):
TckDiff = [ Now - Pre for (Now , Pre) in zip(TckNow, TckPre) ]
TckBusy = TckDiff[0]
TckAll = TckDiff[1]
CpuRate = int(TckBusy * 100 / TckAll)
CpuRateList.append(CpuRate)
return CpuRateList
# シェルコマンドを実行する関数
def run_shell_command(command_str):
proc = subprocess.run(command_str, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
result = proc.stdout.split("=")
return result[1].replace('\n', '')
def main():
AmbiChannelID = os.environ['AMBI_CHANNEL_ID']
AmbientReadKey = os.environ['AMBI_READ_KEY']
AmbiWriteKey = os.environ['AMBI_WRITE_KEY']
AmbiInterval = int(os.environ['AMBI_INTERVAL'])
AdGuardWaitTime = int(os.environ['ADGUARD_WAIT_TIME'])
logger = logging.getLogger("adpi")
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s:%(name)s - %(message)s')
file_handler = logging.FileHandler('adpi.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# AdGuard DNS が起動するまで待機する。
logger.info("Wait for AdGuard DNS to start.")
time.sleep(AdGuardWaitTime)
logger.info("adpi started.")
gCpuUsage = CpuUsage()
ambi = ambient.Ambient(AmbiChannelID, AmbiWriteKey)
while True:
# CPU温度を取得
temp = run_shell_command("vcgencmd measure_temp")
temp_f = float(temp.replace('\'C', ''))
# CPU使用率を取得
CpuRateList = gCpuUsage.get()
CpuRate_i = int(CpuRateList[0])
del CpuRateList[0]
# メモリー使用率を取得
mem = psutil.virtual_memory()
mem_per_f = float(mem.percent)
# 結果送信
#print("temp: {}, CpuRate: {}, mem_per:{}".format(temp_f, CpuRate_i, mem_per_f))
try:
r = ambi.send({"d1": temp_f, "d2": CpuRate_i, "d3": mem_per_f})
#print(r)
msg = "temp: {}, CpuRate: {}, mem_per:{}".format(temp_f, CpuRate_i, mem_per_f)
logger.info(msg)
except requests.exceptions.RequestException as e:
# 送信失敗時はメッセージを出力する。
print('request failed: ', e)
# 待つ
time.sleep(AmbiInterval)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment