Last active
August 29, 2015 14:03
-
-
Save ergoithz/0e8f7e37d0950aeaf4a5 to your computer and use it in GitHub Desktop.
More sophisticated 3820TG EC tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
''' | |
==== NOTES ==== | |
``` | |
3820TG LAYOUT | |
_________ | |
\+------+\ | |
\+------+\ | |
ETH > |#######º| < GPU | |
CPU > |##___###| < VGA + HDMI | |
USB > |__L_L___| < USB | |
^ | |
HDD | |
``` | |
''' | |
import os | |
import os.path | |
import time | |
import threading | |
import struct | |
import subprocess | |
import logging | |
import logging.handlers | |
logger = logging.Logger('ec.%s' % __name__) | |
class LocalSingleton(object): | |
local = threading.local() | |
def __new__(cls): | |
if not hasattr(cls.local, 'instance'): | |
cls.local.instances = {} | |
if not cls in cls.local.instances: | |
cls.local.instances[cls] = object.__new__(cls) | |
return cls.local.instances[cls] | |
class IOPort(LocalSingleton): | |
char_struct = struct.Struct('B') | |
fd = None | |
def __init__(self): | |
self.fd = os.open('/dev/port', os.O_RDWR) | |
def __del__(self): | |
if self.fd: | |
os.close(self.fd) | |
def __getitem__(self, pos): | |
os.lseek(self.fd, pos, os.SEEK_SET) | |
return self.char_struct.unpack(os.read(self.fd, 1))[0] | |
def __setitem__(self, pos, value): | |
os.lseek(self.fd, pos, os.SEEK_SET) | |
os.write(self.fd, value if isinstance(value, str) else self.char_struct.pack(value)) | |
class EC(LocalSingleton): | |
CONTROL = 0x66 | |
VALUE = 0x62 | |
READ_READY = 0x01 | |
WRITE_WAIT = 0x02 | |
READ = 0x80 | |
WRITE = 0x81 | |
def __init__(self): | |
self.io = IOPort() | |
def wait_write(self): | |
while self.io[self.CONTROL] & self.WRITE_WAIT: | |
time.sleep(0.01) | |
def wait_read(self): | |
while not self.io[self.CONTROL] & self.READ_READY: | |
time.sleep(0.01) | |
def __getitem__(self, pos): | |
self.wait_write() | |
self.io[self.CONTROL] = self.READ | |
self.wait_write() | |
self.io[self.VALUE] = pos | |
self.wait_read() | |
return self.io[self.VALUE] | |
def __setitem__(self, pos, value): | |
try: | |
if not 0 <= value <= 255: | |
logger.critical('Value %r outside range' % value) | |
value = min(255, max(0, value)) | |
if not isinstance(value, int): | |
value = int(vale) | |
except BaseException as e: | |
logger.exception(e) | |
return | |
self.wait_write() | |
self.io[self.CONTROL] = self.WRITE | |
self.wait_write() | |
self.io[self.VALUE] = pos | |
self.wait_write() | |
self.io[self.VALUE] = value | |
class Value: | |
''' | |
Terms: | |
fix: from EC data to user data | |
unfix: user data to EC data | |
''' | |
class original(object): | |
@staticmethod | |
def fix(val): | |
return val | |
@staticmethod | |
def unfix(val): | |
return val | |
class chain(object): | |
def __init__(self, *transforms): | |
self.transforms = transforms | |
def fix(self, val): | |
for transform in self.transforms: | |
val = transform.fix(val) | |
return val | |
def unfix(self, val): | |
for transform in self.transforms: | |
val = transform.unfix(val) | |
return val | |
class reverse(object): | |
def __init__(self, floor=0xFF): | |
self.floor = floor | |
def fix(self, val): | |
return self.floor-val | |
def unfix(self, val): | |
return self.floor-val | |
class relative(object): | |
def __init__(self, min=0x00, max=0xFF, base=1): | |
self.min = min | |
self.max = max | |
self.delta = self.max - self.min | |
self.base = base | |
def fix(self, val): | |
return max(0, float(val-self.min)/self.delta)*self.base | |
def unfix(self, val): | |
logger.debug("%s %s" % (val, min(1, float(val)/self.base)*self.delta + self.min)) | |
return int(min(1, float(val)/self.base)*self.delta + self.min) | |
class corrected(object): | |
# TODO: x2 segments | |
ec_base = 0xFF | |
ec_half = ec_base/2. | |
def __init__(self, mid=0x40): | |
self.mid = mid | |
self.m1 = mid/self.ec_half | |
self.m2 = (self.ec_base-mid)/self.ec_half | |
def fix(self, val): | |
if val < self.ec_half: | |
return int(val*self.m1) | |
return int((val-self.ec_half)*self.m2+self.mid) | |
def unfix(self, val): | |
if val < self.mid: | |
return int(val/self.m1) | |
return int((val-self.mid)/self.m2+self.ec_half) | |
class GetProperty(object): | |
''' | |
Read-only property for getting an EC register value | |
''' | |
@property | |
def ec(self): | |
return EC() | |
def __init__(self, pos, transform=None): | |
self.pos = pos | |
self.transform = transform or Value.original | |
def __get__(self, obj, objtype=None): | |
return self.transform.fix(self.ec[self.pos]) | |
def __delete__(self, obj): | |
pass | |
class SetProperty(GetProperty): | |
''' | |
Writable GetProperty | |
''' | |
def __set__(self, obj, val): | |
self.ec[self.pos] = self.transform.unfix(val) | |
class AutoProperty(SetProperty): | |
''' | |
SetProperty with another EC register for automatic mode. | |
''' | |
AUTO = 0x04 | |
MANUAL = 0x14 | |
def __init__(self, control, pos, transform=None): | |
SetProperty.__init__(self, pos, transform) | |
self.pos_control = control | |
def __set__(self, obj, val): | |
if val is None: | |
self.ec[self.pos_control] = self.AUTO | |
else: | |
self.ec[self.pos_control] = self.MANUAL | |
SetProperty.__set__(self, obj, val) | |
class Device(LocalSingleton): | |
sensors = () | |
class CPU(Device): | |
fan = AutoProperty(0x93, 0x94, transform=Value.chain( | |
Value.reverse(), | |
Value.corrected(mid=0xFF*0.7) | |
)) | |
fan_speed = GetProperty(0x95, transform=Value.relative(max=0x60)) | |
tzs0 = GetProperty(0xA8) | |
tzs1 = GetProperty(0xA9) | |
@property | |
def sensors(self): | |
return (self.tzs0, self.tzs1) | |
class GPU(Device): | |
fan = AutoProperty(0x96, 0x97, transform=Value.chain( | |
Value.reverse(), | |
Value.corrected(mid=0xFF*0.75), | |
)) | |
fan_speed = GetProperty(0x98, transform=Value.relative(max=0x45)) | |
class App(object): | |
MIN_TEMP = 35 | |
MAX_TEMP = 55 | |
TEMP_FILE = '/run/ec2/temp' | |
LOG_FILE = '/run/ec2/log' | |
LOG_FILESIZE = 524288 | |
TEMP_TIMEOUT = 2 | |
def __init__(self): | |
self.devices = ( | |
CPU(), | |
GPU(), | |
) | |
@property | |
def last_temp(self): | |
try: | |
with open(self.TEMP_FILE) as f: | |
return int(f.read()) | |
except (IOError, ValueError): | |
return self.MIN_TEMP | |
@last_temp.setter | |
def last_temp(self, v): | |
dirname = os.path.dirname(self.TEMP_FILE) | |
if not os.path.isdir(dirname): | |
os.makedirs(dirname) | |
with open(self.TEMP_FILE, 'w') as f: | |
f.write('%d' % v) | |
def daemon(self): | |
# Main loop | |
try: | |
while True: | |
self.auto() | |
time.sleep(self.TEMP_TIMEOUT) | |
except KeyboardInterrupt: | |
pass | |
except BaseException as e: | |
logger.exception(e) | |
# Cleanup | |
try: | |
for device in self.devices: | |
device.fan = None | |
except BaseException as e: | |
logger.exception(e) | |
def temp(self): | |
for device in self.devices: | |
for n, sensor in enumerate(device.sensors): | |
logger.info('TSZ%d %s' % (n, sensor)) | |
def run(self, args): | |
for name in args[1:]: | |
method = getattr(self, name, None) | |
if callable(method): | |
method() | |
return | |
if 'max' in args: | |
powerclamp = False | |
v = 0xFF | |
elif 'high' in args: | |
powerclamp = False | |
v = 0xFF*0.75 | |
elif 'medium' in args: | |
powerclamp = True | |
v = 0xFF*0.5 | |
elif 'low' in args: | |
powerclamp = True | |
v = 0xFF*0.25 | |
elif 'auto' in args: | |
powerclamp = True | |
v = None | |
else: | |
logger.error('No param') | |
return | |
try: | |
subprocess.check_output( | |
('modprobe' if powerclamp else 'rmmod', 'intel_powerclamp')) | |
except subprocess.CalledProcessError as e: | |
if powerclamp: | |
logger.exception(e) | |
for device in self.devices: | |
device.fan = v | |
def auto(self): | |
temps = [ | |
temp | |
for device in self.devices | |
for temp in device.sensors | |
] | |
avgtemp = sum(temps)/len(temps) | |
oldtemp = self.last_temp | |
if abs(avgtemp - oldtemp) > 5: | |
avgtemp = (avgtemp + oldtemp) / 2 | |
self.last_temp = avgtemp | |
delta = self.MAX_TEMP - self.MIN_TEMP | |
speed = max(0, min(0xff, 0xff*(avgtemp-self.MIN_TEMP)/delta)) | |
for device in self.devices: | |
device.fan = speed | |
logger.info("Temperature: %dºC - Fan: %d%%" % (avgtemp, speed*100/255)) | |
def test_value(): | |
import matplotlib.pyplot as plt | |
a = iter(xrange(0, 10000, 0x100)) | |
for t in ( | |
Value.original, | |
Value.reverse(), | |
Value.relative(base=100), | |
Value.corrected(mid=0x20), | |
): | |
fixed = map(t.fix, xrange(0x100)) | |
r = { | |
'fix': fixed, | |
'unfix': map(t.unfix, fixed), | |
} | |
f = 0 | |
for k, v in r.iteritems(): | |
f = next(a) | |
plt.plot(xrange(f, f+0x100), v) | |
plt.plot((f+0x100, f+0x100), (0, 0xFF)) | |
plt.show() | |
if __name__ == '__main__': | |
import sys | |
try: | |
dirname = os.path.dirname(App.LOG_FILE) | |
if not os.path.isdir(dirname): | |
os.makedirs(dirname) | |
handler = logging.handlers.RotatingFileHandler(App.LOG_FILE, maxBytes=App.LOG_FILESIZE) | |
logger.setLevel(logging.DEBUG if 'debug' in sys.argv else logging.INFO) | |
logger.addHandler(handler) | |
except IOError: | |
logger.error('Script called without enough permissions.') | |
except BaseException as e: | |
logger.exception(e) | |
if 'test-value' in sys.argv: | |
test_value() | |
else: | |
main = App() | |
main.run(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment