Created
January 21, 2026 21:57
-
-
Save boopdotpng/1ebe5d5aa4f658240798e2b2253707fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse, ctypes, fcntl, mmap, os, time | |
| S = ctypes.LittleEndianStructure | |
| u8, u16, u32, u64 = ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint32, ctypes.c_uint64 | |
| TENSTORRENT_IOCTL_MAGIC = 0xFA | |
| IOCTL_ALLOCATE_TLB = 11 | |
| IOCTL_FREE_TLB = 12 | |
| IOCTL_CONFIGURE_TLB = 13 | |
| TLB_2M = 1 << 21 | |
| ARC_TILE = (8, 0) | |
| ARC_NOC_BASE = 0x80000000 | |
| RESET_UNIT_OFFSET = 0x30000 | |
| SCRATCH_RAM_11 = RESET_UNIT_OFFSET + 0x42C # msg queue ctrl block ptr | |
| RESET_UNIT_ARC_MISC_CNTL = RESET_UNIT_OFFSET + 0x100 | |
| FAN_MSG_FORCE_SPEED = 0xAC | |
| def _IO(nr: int) -> int: return (TENSTORRENT_IOCTL_MAGIC << 8) | nr | |
| def _align_down(value: int, alignment: int) -> tuple[int, int]: return value & ~(alignment - 1), value & (alignment - 1) | |
| def _noc1(x: int, y: int) -> tuple[int, int]: return (16 - x, 11 - y) # Blackhole | |
| class AllocateTlbIn(S): _fields_ = [("size", u64), ("reserved", u64)] | |
| class AllocateTlbOut(S): _fields_ = [("tlb_id", u32), ("reserved0", u32), ("mmap_offset_uc", u64), ("mmap_offset_wc", u64), ("reserved1", u64)] | |
| class FreeTlbIn(S): _fields_ = [("tlb_id", u32)] | |
| class NocTlbConfig(S): | |
| _fields_ = [ | |
| ("addr", u64), ("x_end", u16), ("y_end", u16), ("x_start", u16), ("y_start", u16), | |
| ("noc", u8), ("mcast", u8), ("ordering", u8), ("linked", u8), ("static_vc", u8), | |
| ("reserved0_0", u8), ("reserved0_1", u8), ("reserved0_2", u8), ("reserved1_0", u32), ("reserved1_1", u32), | |
| ] | |
| class ConfigureTlbIn(S): _fields_ = [("tlb_id", u32), ("reserved", u32), ("config", NocTlbConfig)] | |
| class TLBWindow: | |
| def __init__(self, fd: int, *, size: int = TLB_2M): | |
| self.fd, self.size = fd, size | |
| buf = bytearray(ctypes.sizeof(AllocateTlbIn) + ctypes.sizeof(AllocateTlbOut)) | |
| AllocateTlbIn.from_buffer(buf).size = size | |
| fcntl.ioctl(fd, _IO(IOCTL_ALLOCATE_TLB), buf, True) | |
| out = AllocateTlbOut.from_buffer(buf, ctypes.sizeof(AllocateTlbIn)) | |
| self.tlb_id = int(out.tlb_id) | |
| self.uc = mmap.mmap(fd, size, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ | mmap.PROT_WRITE, offset=int(out.mmap_offset_uc)) | |
| self.wc = mmap.mmap(fd, size, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ | mmap.PROT_WRITE, offset=int(out.mmap_offset_wc)) | |
| def configure(self, *, addr: int, start: tuple[int, int], end: tuple[int, int], noc: int = 0, ordering: int = 1): | |
| if addr & (self.size - 1): raise ValueError(f"tlb addr must be {self.size}-aligned") | |
| if noc == 1: start, end = _noc1(*start), _noc1(*end) | |
| cfg = NocTlbConfig() | |
| cfg.addr = addr | |
| cfg.x_start, cfg.y_start = start | |
| cfg.x_end, cfg.y_end = end | |
| cfg.noc, cfg.mcast, cfg.ordering, cfg.linked, cfg.static_vc = noc, 0, ordering, 0, 0 | |
| buf = bytearray(ctypes.sizeof(ConfigureTlbIn)) | |
| view = ConfigureTlbIn.from_buffer(buf) | |
| view.tlb_id = self.tlb_id | |
| view.config = cfg | |
| fcntl.ioctl(self.fd, _IO(IOCTL_CONFIGURE_TLB), buf, False) | |
| def r32(self, off: int) -> int: return int.from_bytes(self.uc[off:off + 4], "little") | |
| def w32(self, off: int, v: int): self.uc[off:off + 4] = int(v & 0xFFFFFFFF).to_bytes(4, "little") | |
| def close(self): | |
| self.uc.close() | |
| self.wc.close() | |
| buf = bytearray(ctypes.sizeof(FreeTlbIn)) | |
| FreeTlbIn.from_buffer(buf).tlb_id = self.tlb_id | |
| fcntl.ioctl(self.fd, _IO(IOCTL_FREE_TLB), buf, False) | |
| def __enter__(self): return self | |
| def __exit__(self, *_): self.close() | |
| def arc_msg(fd: int, msg: int, arg0: int = 0, arg1: int = 0, *, queue: int = 0, timeout_ms: int = 1000) -> list[int]: | |
| MSG_QUEUE_SIZE = 4 | |
| MSG_QUEUE_POINTER_WRAP = 2 * MSG_QUEUE_SIZE | |
| REQUEST_MSG_LEN = 8 | |
| RESPONSE_MSG_LEN = 8 | |
| HEADER_BYTES = 8 * 4 | |
| REQUEST_BYTES = REQUEST_MSG_LEN * 4 | |
| RESPONSE_BYTES = RESPONSE_MSG_LEN * 4 | |
| QUEUE_STRIDE = HEADER_BYTES + (MSG_QUEUE_SIZE * REQUEST_BYTES) + (MSG_QUEUE_SIZE * RESPONSE_BYTES) | |
| IRQ0_TRIG_BIT = 1 << 16 | |
| if queue < 0 or queue >= 4: raise ValueError("queue must be 0..3") | |
| with TLBWindow(fd) as arc: | |
| arc.configure(addr=ARC_NOC_BASE, start=ARC_TILE, end=ARC_TILE) | |
| info_ptr = arc.r32(SCRATCH_RAM_11) | |
| if info_ptr == 0: raise RuntimeError("msgqueue not initialized (SCRATCH_RAM_11 == 0)") | |
| info_base, info_off = _align_down(info_ptr, TLB_2M) | |
| arc.configure(addr=info_base, start=ARC_TILE, end=ARC_TILE) | |
| queues_ptr = arc.r32(info_off) | |
| q_base, q_off = _align_down(queues_ptr, TLB_2M) | |
| arc.configure(addr=q_base, start=ARC_TILE, end=ARC_TILE) | |
| q = q_off + queue * QUEUE_STRIDE | |
| wptr = arc.r32(q + 0) | |
| req = q + HEADER_BYTES + (wptr % MSG_QUEUE_SIZE) * REQUEST_BYTES | |
| words = [msg & 0xFF, arg0 & 0xFFFFFFFF, arg1 & 0xFFFFFFFF] + [0] * (REQUEST_MSG_LEN - 3) | |
| for i, word in enumerate(words): arc.w32(req + i * 4, word) | |
| arc.w32(q + 0, (wptr + 1) % MSG_QUEUE_POINTER_WRAP) | |
| arc.configure(addr=ARC_NOC_BASE, start=ARC_TILE, end=ARC_TILE) | |
| arc.w32(RESET_UNIT_ARC_MISC_CNTL, arc.r32(RESET_UNIT_ARC_MISC_CNTL) | IRQ0_TRIG_BIT) | |
| arc.configure(addr=q_base, start=ARC_TILE, end=ARC_TILE) | |
| rptr = arc.r32(q + 4) | |
| deadline = time.monotonic() + (timeout_ms / 1000.0) | |
| while time.monotonic() < deadline: | |
| resp_wptr = arc.r32(q + 20) | |
| if resp_wptr != rptr: | |
| resp = q + HEADER_BYTES + (MSG_QUEUE_SIZE * REQUEST_BYTES) + (rptr % MSG_QUEUE_SIZE) * RESPONSE_BYTES | |
| out = [arc.r32(resp + i * 4) for i in range(RESPONSE_MSG_LEN)] | |
| arc.w32(q + 4, (rptr + 1) % MSG_QUEUE_POINTER_WRAP) | |
| return out | |
| time.sleep(0.001) | |
| raise TimeoutError(f"arc_msg timeout ({timeout_ms} ms)") | |
| def main(): | |
| ap = argparse.ArgumentParser(description="Tenstorrent Blackhole fan control (direct ARC msgqueue)") | |
| ap.add_argument("--dev", default="/dev/tenstorrent/0") | |
| ap.add_argument("--timeout-ms", type=int, default=1000) | |
| mx = ap.add_mutually_exclusive_group(required=True) | |
| mx.add_argument("--set", type=int, metavar="PCT", help="force fan speed (0..100)") | |
| mx.add_argument("--reset", action="store_true", help="reset to firmware curve") | |
| args = ap.parse_args() | |
| pct = None if args.reset else int(args.set) | |
| if pct is not None and not (0 <= pct <= 100): raise SystemExit("--set must be 0..100") | |
| fd = os.open(args.dev, os.O_RDWR | os.O_CLOEXEC) | |
| try: | |
| raw = 0xFFFFFFFF if pct is None else pct | |
| resp = arc_msg(fd, FAN_MSG_FORCE_SPEED, raw, 0, timeout_ms=args.timeout_ms) | |
| print("resp:", " ".join(f"{x:08x}" for x in resp)) | |
| finally: | |
| os.close(fd) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment