Skip to content

Instantly share code, notes, and snippets.

@boopdotpng
Created January 21, 2026 21:57
Show Gist options
  • Select an option

  • Save boopdotpng/1ebe5d5aa4f658240798e2b2253707fe to your computer and use it in GitHub Desktop.

Select an option

Save boopdotpng/1ebe5d5aa4f658240798e2b2253707fe to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse, ctypes, fcntl, mmap, os, time
S = ctypes.LittleEndianStructure
u8, u16, u32, u64 = ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint32, ctypes.c_uint64
TENSTORRENT_IOCTL_MAGIC = 0xFA
IOCTL_ALLOCATE_TLB = 11
IOCTL_FREE_TLB = 12
IOCTL_CONFIGURE_TLB = 13
TLB_2M = 1 << 21
ARC_TILE = (8, 0)
ARC_NOC_BASE = 0x80000000
RESET_UNIT_OFFSET = 0x30000
SCRATCH_RAM_11 = RESET_UNIT_OFFSET + 0x42C # msg queue ctrl block ptr
RESET_UNIT_ARC_MISC_CNTL = RESET_UNIT_OFFSET + 0x100
FAN_MSG_FORCE_SPEED = 0xAC
def _IO(nr: int) -> int: return (TENSTORRENT_IOCTL_MAGIC << 8) | nr
def _align_down(value: int, alignment: int) -> tuple[int, int]: return value & ~(alignment - 1), value & (alignment - 1)
def _noc1(x: int, y: int) -> tuple[int, int]: return (16 - x, 11 - y) # Blackhole
class AllocateTlbIn(S): _fields_ = [("size", u64), ("reserved", u64)]
class AllocateTlbOut(S): _fields_ = [("tlb_id", u32), ("reserved0", u32), ("mmap_offset_uc", u64), ("mmap_offset_wc", u64), ("reserved1", u64)]
class FreeTlbIn(S): _fields_ = [("tlb_id", u32)]
class NocTlbConfig(S):
_fields_ = [
("addr", u64), ("x_end", u16), ("y_end", u16), ("x_start", u16), ("y_start", u16),
("noc", u8), ("mcast", u8), ("ordering", u8), ("linked", u8), ("static_vc", u8),
("reserved0_0", u8), ("reserved0_1", u8), ("reserved0_2", u8), ("reserved1_0", u32), ("reserved1_1", u32),
]
class ConfigureTlbIn(S): _fields_ = [("tlb_id", u32), ("reserved", u32), ("config", NocTlbConfig)]
class TLBWindow:
def __init__(self, fd: int, *, size: int = TLB_2M):
self.fd, self.size = fd, size
buf = bytearray(ctypes.sizeof(AllocateTlbIn) + ctypes.sizeof(AllocateTlbOut))
AllocateTlbIn.from_buffer(buf).size = size
fcntl.ioctl(fd, _IO(IOCTL_ALLOCATE_TLB), buf, True)
out = AllocateTlbOut.from_buffer(buf, ctypes.sizeof(AllocateTlbIn))
self.tlb_id = int(out.tlb_id)
self.uc = mmap.mmap(fd, size, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ | mmap.PROT_WRITE, offset=int(out.mmap_offset_uc))
self.wc = mmap.mmap(fd, size, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ | mmap.PROT_WRITE, offset=int(out.mmap_offset_wc))
def configure(self, *, addr: int, start: tuple[int, int], end: tuple[int, int], noc: int = 0, ordering: int = 1):
if addr & (self.size - 1): raise ValueError(f"tlb addr must be {self.size}-aligned")
if noc == 1: start, end = _noc1(*start), _noc1(*end)
cfg = NocTlbConfig()
cfg.addr = addr
cfg.x_start, cfg.y_start = start
cfg.x_end, cfg.y_end = end
cfg.noc, cfg.mcast, cfg.ordering, cfg.linked, cfg.static_vc = noc, 0, ordering, 0, 0
buf = bytearray(ctypes.sizeof(ConfigureTlbIn))
view = ConfigureTlbIn.from_buffer(buf)
view.tlb_id = self.tlb_id
view.config = cfg
fcntl.ioctl(self.fd, _IO(IOCTL_CONFIGURE_TLB), buf, False)
def r32(self, off: int) -> int: return int.from_bytes(self.uc[off:off + 4], "little")
def w32(self, off: int, v: int): self.uc[off:off + 4] = int(v & 0xFFFFFFFF).to_bytes(4, "little")
def close(self):
self.uc.close()
self.wc.close()
buf = bytearray(ctypes.sizeof(FreeTlbIn))
FreeTlbIn.from_buffer(buf).tlb_id = self.tlb_id
fcntl.ioctl(self.fd, _IO(IOCTL_FREE_TLB), buf, False)
def __enter__(self): return self
def __exit__(self, *_): self.close()
def arc_msg(fd: int, msg: int, arg0: int = 0, arg1: int = 0, *, queue: int = 0, timeout_ms: int = 1000) -> list[int]:
MSG_QUEUE_SIZE = 4
MSG_QUEUE_POINTER_WRAP = 2 * MSG_QUEUE_SIZE
REQUEST_MSG_LEN = 8
RESPONSE_MSG_LEN = 8
HEADER_BYTES = 8 * 4
REQUEST_BYTES = REQUEST_MSG_LEN * 4
RESPONSE_BYTES = RESPONSE_MSG_LEN * 4
QUEUE_STRIDE = HEADER_BYTES + (MSG_QUEUE_SIZE * REQUEST_BYTES) + (MSG_QUEUE_SIZE * RESPONSE_BYTES)
IRQ0_TRIG_BIT = 1 << 16
if queue < 0 or queue >= 4: raise ValueError("queue must be 0..3")
with TLBWindow(fd) as arc:
arc.configure(addr=ARC_NOC_BASE, start=ARC_TILE, end=ARC_TILE)
info_ptr = arc.r32(SCRATCH_RAM_11)
if info_ptr == 0: raise RuntimeError("msgqueue not initialized (SCRATCH_RAM_11 == 0)")
info_base, info_off = _align_down(info_ptr, TLB_2M)
arc.configure(addr=info_base, start=ARC_TILE, end=ARC_TILE)
queues_ptr = arc.r32(info_off)
q_base, q_off = _align_down(queues_ptr, TLB_2M)
arc.configure(addr=q_base, start=ARC_TILE, end=ARC_TILE)
q = q_off + queue * QUEUE_STRIDE
wptr = arc.r32(q + 0)
req = q + HEADER_BYTES + (wptr % MSG_QUEUE_SIZE) * REQUEST_BYTES
words = [msg & 0xFF, arg0 & 0xFFFFFFFF, arg1 & 0xFFFFFFFF] + [0] * (REQUEST_MSG_LEN - 3)
for i, word in enumerate(words): arc.w32(req + i * 4, word)
arc.w32(q + 0, (wptr + 1) % MSG_QUEUE_POINTER_WRAP)
arc.configure(addr=ARC_NOC_BASE, start=ARC_TILE, end=ARC_TILE)
arc.w32(RESET_UNIT_ARC_MISC_CNTL, arc.r32(RESET_UNIT_ARC_MISC_CNTL) | IRQ0_TRIG_BIT)
arc.configure(addr=q_base, start=ARC_TILE, end=ARC_TILE)
rptr = arc.r32(q + 4)
deadline = time.monotonic() + (timeout_ms / 1000.0)
while time.monotonic() < deadline:
resp_wptr = arc.r32(q + 20)
if resp_wptr != rptr:
resp = q + HEADER_BYTES + (MSG_QUEUE_SIZE * REQUEST_BYTES) + (rptr % MSG_QUEUE_SIZE) * RESPONSE_BYTES
out = [arc.r32(resp + i * 4) for i in range(RESPONSE_MSG_LEN)]
arc.w32(q + 4, (rptr + 1) % MSG_QUEUE_POINTER_WRAP)
return out
time.sleep(0.001)
raise TimeoutError(f"arc_msg timeout ({timeout_ms} ms)")
def main():
ap = argparse.ArgumentParser(description="Tenstorrent Blackhole fan control (direct ARC msgqueue)")
ap.add_argument("--dev", default="/dev/tenstorrent/0")
ap.add_argument("--timeout-ms", type=int, default=1000)
mx = ap.add_mutually_exclusive_group(required=True)
mx.add_argument("--set", type=int, metavar="PCT", help="force fan speed (0..100)")
mx.add_argument("--reset", action="store_true", help="reset to firmware curve")
args = ap.parse_args()
pct = None if args.reset else int(args.set)
if pct is not None and not (0 <= pct <= 100): raise SystemExit("--set must be 0..100")
fd = os.open(args.dev, os.O_RDWR | os.O_CLOEXEC)
try:
raw = 0xFFFFFFFF if pct is None else pct
resp = arc_msg(fd, FAN_MSG_FORCE_SPEED, raw, 0, timeout_ms=args.timeout_ms)
print("resp:", " ".join(f"{x:08x}" for x in resp))
finally:
os.close(fd)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment