Skip to content

Instantly share code, notes, and snippets.

@ergoithz
Last active August 29, 2015 14:03
Show Gist options
  • Save ergoithz/0e8f7e37d0950aeaf4a5 to your computer and use it in GitHub Desktop.
Save ergoithz/0e8f7e37d0950aeaf4a5 to your computer and use it in GitHub Desktop.
More sophisticated 3820TG EC tool
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
==== NOTES ====
```
3820TG LAYOUT
_________
\+------+\
\+------+\
ETH > |#######º| < GPU
CPU > |##___###| < VGA + HDMI
USB > |__L_L___| < USB
^
HDD
```
'''
import os
import os.path
import time
import threading
import struct
import subprocess
import logging
import logging.handlers
logger = logging.Logger('ec.%s' % __name__)
class LocalSingleton(object):
local = threading.local()
def __new__(cls):
if not hasattr(cls.local, 'instance'):
cls.local.instances = {}
if not cls in cls.local.instances:
cls.local.instances[cls] = object.__new__(cls)
return cls.local.instances[cls]
class IOPort(LocalSingleton):
char_struct = struct.Struct('B')
fd = None
def __init__(self):
self.fd = os.open('/dev/port', os.O_RDWR)
def __del__(self):
if self.fd:
os.close(self.fd)
def __getitem__(self, pos):
os.lseek(self.fd, pos, os.SEEK_SET)
return self.char_struct.unpack(os.read(self.fd, 1))[0]
def __setitem__(self, pos, value):
os.lseek(self.fd, pos, os.SEEK_SET)
os.write(self.fd, value if isinstance(value, str) else self.char_struct.pack(value))
class EC(LocalSingleton):
CONTROL = 0x66
VALUE = 0x62
READ_READY = 0x01
WRITE_WAIT = 0x02
READ = 0x80
WRITE = 0x81
def __init__(self):
self.io = IOPort()
def wait_write(self):
while self.io[self.CONTROL] & self.WRITE_WAIT:
time.sleep(0.01)
def wait_read(self):
while not self.io[self.CONTROL] & self.READ_READY:
time.sleep(0.01)
def __getitem__(self, pos):
self.wait_write()
self.io[self.CONTROL] = self.READ
self.wait_write()
self.io[self.VALUE] = pos
self.wait_read()
return self.io[self.VALUE]
def __setitem__(self, pos, value):
try:
if not 0 <= value <= 255:
logger.critical('Value %r outside range' % value)
value = min(255, max(0, value))
if not isinstance(value, int):
value = int(vale)
except BaseException as e:
logger.exception(e)
return
self.wait_write()
self.io[self.CONTROL] = self.WRITE
self.wait_write()
self.io[self.VALUE] = pos
self.wait_write()
self.io[self.VALUE] = value
class Value:
'''
Terms:
fix: from EC data to user data
unfix: user data to EC data
'''
class original(object):
@staticmethod
def fix(val):
return val
@staticmethod
def unfix(val):
return val
class chain(object):
def __init__(self, *transforms):
self.transforms = transforms
def fix(self, val):
for transform in self.transforms:
val = transform.fix(val)
return val
def unfix(self, val):
for transform in self.transforms:
val = transform.unfix(val)
return val
class reverse(object):
def __init__(self, floor=0xFF):
self.floor = floor
def fix(self, val):
return self.floor-val
def unfix(self, val):
return self.floor-val
class relative(object):
def __init__(self, min=0x00, max=0xFF, base=1):
self.min = min
self.max = max
self.delta = self.max - self.min
self.base = base
def fix(self, val):
return max(0, float(val-self.min)/self.delta)*self.base
def unfix(self, val):
logger.debug("%s %s" % (val, min(1, float(val)/self.base)*self.delta + self.min))
return int(min(1, float(val)/self.base)*self.delta + self.min)
class corrected(object):
# TODO: x2 segments
ec_base = 0xFF
ec_half = ec_base/2.
def __init__(self, mid=0x40):
self.mid = mid
self.m1 = mid/self.ec_half
self.m2 = (self.ec_base-mid)/self.ec_half
def fix(self, val):
if val < self.ec_half:
return int(val*self.m1)
return int((val-self.ec_half)*self.m2+self.mid)
def unfix(self, val):
if val < self.mid:
return int(val/self.m1)
return int((val-self.mid)/self.m2+self.ec_half)
class GetProperty(object):
'''
Read-only property for getting an EC register value
'''
@property
def ec(self):
return EC()
def __init__(self, pos, transform=None):
self.pos = pos
self.transform = transform or Value.original
def __get__(self, obj, objtype=None):
return self.transform.fix(self.ec[self.pos])
def __delete__(self, obj):
pass
class SetProperty(GetProperty):
'''
Writable GetProperty
'''
def __set__(self, obj, val):
self.ec[self.pos] = self.transform.unfix(val)
class AutoProperty(SetProperty):
'''
SetProperty with another EC register for automatic mode.
'''
AUTO = 0x04
MANUAL = 0x14
def __init__(self, control, pos, transform=None):
SetProperty.__init__(self, pos, transform)
self.pos_control = control
def __set__(self, obj, val):
if val is None:
self.ec[self.pos_control] = self.AUTO
else:
self.ec[self.pos_control] = self.MANUAL
SetProperty.__set__(self, obj, val)
class Device(LocalSingleton):
sensors = ()
class CPU(Device):
fan = AutoProperty(0x93, 0x94, transform=Value.chain(
Value.reverse(),
Value.corrected(mid=0xFF*0.7)
))
fan_speed = GetProperty(0x95, transform=Value.relative(max=0x60))
tzs0 = GetProperty(0xA8)
tzs1 = GetProperty(0xA9)
@property
def sensors(self):
return (self.tzs0, self.tzs1)
class GPU(Device):
fan = AutoProperty(0x96, 0x97, transform=Value.chain(
Value.reverse(),
Value.corrected(mid=0xFF*0.75),
))
fan_speed = GetProperty(0x98, transform=Value.relative(max=0x45))
class App(object):
MIN_TEMP = 35
MAX_TEMP = 55
TEMP_FILE = '/run/ec2/temp'
LOG_FILE = '/run/ec2/log'
LOG_FILESIZE = 524288
TEMP_TIMEOUT = 2
def __init__(self):
self.devices = (
CPU(),
GPU(),
)
@property
def last_temp(self):
try:
with open(self.TEMP_FILE) as f:
return int(f.read())
except (IOError, ValueError):
return self.MIN_TEMP
@last_temp.setter
def last_temp(self, v):
dirname = os.path.dirname(self.TEMP_FILE)
if not os.path.isdir(dirname):
os.makedirs(dirname)
with open(self.TEMP_FILE, 'w') as f:
f.write('%d' % v)
def daemon(self):
# Main loop
try:
while True:
self.auto()
time.sleep(self.TEMP_TIMEOUT)
except KeyboardInterrupt:
pass
except BaseException as e:
logger.exception(e)
# Cleanup
try:
for device in self.devices:
device.fan = None
except BaseException as e:
logger.exception(e)
def temp(self):
for device in self.devices:
for n, sensor in enumerate(device.sensors):
logger.info('TSZ%d %s' % (n, sensor))
def run(self, args):
for name in args[1:]:
method = getattr(self, name, None)
if callable(method):
method()
return
if 'max' in args:
powerclamp = False
v = 0xFF
elif 'high' in args:
powerclamp = False
v = 0xFF*0.75
elif 'medium' in args:
powerclamp = True
v = 0xFF*0.5
elif 'low' in args:
powerclamp = True
v = 0xFF*0.25
elif 'auto' in args:
powerclamp = True
v = None
else:
logger.error('No param')
return
try:
subprocess.check_output(
('modprobe' if powerclamp else 'rmmod', 'intel_powerclamp'))
except subprocess.CalledProcessError as e:
if powerclamp:
logger.exception(e)
for device in self.devices:
device.fan = v
def auto(self):
temps = [
temp
for device in self.devices
for temp in device.sensors
]
avgtemp = sum(temps)/len(temps)
oldtemp = self.last_temp
if abs(avgtemp - oldtemp) > 5:
avgtemp = (avgtemp + oldtemp) / 2
self.last_temp = avgtemp
delta = self.MAX_TEMP - self.MIN_TEMP
speed = max(0, min(0xff, 0xff*(avgtemp-self.MIN_TEMP)/delta))
for device in self.devices:
device.fan = speed
logger.info("Temperature: %dºC - Fan: %d%%" % (avgtemp, speed*100/255))
def test_value():
import matplotlib.pyplot as plt
a = iter(xrange(0, 10000, 0x100))
for t in (
Value.original,
Value.reverse(),
Value.relative(base=100),
Value.corrected(mid=0x20),
):
fixed = map(t.fix, xrange(0x100))
r = {
'fix': fixed,
'unfix': map(t.unfix, fixed),
}
f = 0
for k, v in r.iteritems():
f = next(a)
plt.plot(xrange(f, f+0x100), v)
plt.plot((f+0x100, f+0x100), (0, 0xFF))
plt.show()
if __name__ == '__main__':
import sys
try:
dirname = os.path.dirname(App.LOG_FILE)
if not os.path.isdir(dirname):
os.makedirs(dirname)
handler = logging.handlers.RotatingFileHandler(App.LOG_FILE, maxBytes=App.LOG_FILESIZE)
logger.setLevel(logging.DEBUG if 'debug' in sys.argv else logging.INFO)
logger.addHandler(handler)
except IOError:
logger.error('Script called without enough permissions.')
except BaseException as e:
logger.exception(e)
if 'test-value' in sys.argv:
test_value()
else:
main = App()
main.run(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment