Skip to content

Instantly share code, notes, and snippets.

@povik
Last active September 21, 2021 20:21
Show Gist options
  • Save povik/66f69f539905bcde6f4cacbae1d24ad6 to your computer and use it in GitHub Desktop.
Save povik/66f69f539905bcde6f4cacbae1d24ad6 to your computer and use it in GitHub Desktop.
Play audio through the embedded speaker on Mac mini
# speaker_amp.py -- play audio through the embedded speaker on Mac mini
#
# tested with m1n1 64dab9482
#
# sample usage with sox:
#
# sox INPUT_FILE -t raw -r 48000 -c 1 -e signed-int -b 32 -L - gain -63 | python3 ./speaker_amp.py
#
# (expects mono, 24-bit signed samples padded to 32 bits on the msb side)
import argparse
import os.path
import code
import re
import sys
from m1n1.setup import *
from m1n1.hw.dart import DART, DARTRegs
#from m1n1.hw.admac import ADMAC, ADMACRegs
#from m1n1.hw.i2c import I2C
class R_FIFO_TX(Register32):
READ = 10
STOP = 9
START = 8
class R_FIFO_RX(Register32):
EMPTY = 8
class R_STATUS(Register32):
XFER_READY = 27
class R_CONTROL(Register32):
ENABLE = 11
CLEAR_RX = 10
CLEAR_TX = 9
CLOCK = 7, 0
class I2CRegs(RegMap):
FIFO_TX = 0x00, R_FIFO_TX
FIFO_RX = 0x04, R_FIFO_RX
STATUS = 0x14, R_STATUS
CONTROL = 0x1c, R_CONTROL
class I2C:
def __init__(self, u, adt_path):
self.u = u
self.p = u.proxy
self.iface = u.iface
self.base = u.adt[adt_path].get_reg(0)[0]
self.regs = I2CRegs(u, self.base)
def clear_fifos(self):
self.regs.CONTROL.set(CLEAR_TX=1, CLEAR_RX=1)
def clear_status(self):
self.regs.STATUS.val = 0xffffffff
def _fifo_read(self, nbytes):
read = []
for _ in range(nbytes):
val = self.regs.FIFO_RX.reg
timeout = 1000
while val.EMPTY and timeout > 0:
val = self.regs.FIFO_RX.reg
timeout -= 1
if timeout == 0:
raise Exception("timeout")
read.append(int(val) & 0xff)
return bytes(read)
def _fifo_write(self, buf, stop=False):
for no, byte in enumerate(buf):
fifo_val = R_FIFO_TX(byte)
if stop and no == len(buf) - 1:
fifo_val.STOP = 1
self.regs.FIFO_TX.reg = fifo_val
if not stop:
return
timeout = 1000
while not self.regs.STATUS.reg.XFER_READY and timeout > 0:
timeout -= 1
if timeout == 0:
raise Exception("timeout")
def write_reg(self, addr, reg, data):
self.clear_fifos()
self.regs.CONTROL.set(ENABLE=1, CLOCK=0x4)
self.regs.FIFO_TX.reg = R_FIFO_TX(addr << 1, START=1)
self._fifo_write(bytes([reg]) + bytes(data), stop=True)
self.regs.CONTROL.set(ENABLE=0, CLOCK=0x4)
def read_reg(self, addr, reg, nbytes):
self.clear_fifos()
self.regs.CONTROL.set(ENABLE=1, CLOCK=0x4)
self.regs.FIFO_TX.reg = R_FIFO_TX(addr << 1, START=1)
self._fifo_write(bytes([reg]), stop=True)
self.regs.FIFO_TX.reg = R_FIFO_TX((addr << 1) | 1, START=1)
self.regs.FIFO_TX.reg = R_FIFO_TX(nbytes, STOP=1, READ=1)
data = self._fifo_read(nbytes)
self.regs.CONTROL.set(ENABLE=0, CLOCK=0x4)
return data
class R_UNK_CONTROL(Register32):
UNK1_STOPCOUNT = 0
UNK2_RESETCOUNT = 1
UNK3 = 3
UNK4 = 8
class R_COUNTER_HI(Register32):
FLAG = 31
class R_DESC_RING(Register32):
UNDERFLOW = 31, 16
# when READ_SLOT==WRITE_SLOT one of the two is set
EMPTY = 8
FULL = 9
ERR = 10
UNK1 = 6
# next slot to read
READ_SLOT = 5, 4
# next slot to be written to
WRITE_SLOT = 1, 0
class R_REPORT_RING(Register32):
OVERFLOW = 31, 16
# goes through 0, 1, 2, 3 as the pieces of a report
# are being read through REPORT_READ
READOUT_PROGRESS = 13, 12
# when READ_SLOT==WRITE_SLOT one of the two is set
EMPTY = 8
FULL = 9
ERR = 10
# next slot to read
READ_SLOT = 5, 4
# next slot to be written to
WRITE_SLOT = 1, 0
class R_TX_STATUS1(Register32):
UNK1 = 1
UNK2 = 4
UNK3 = 8
UNK4 = 9
UNK5 = 10
class R_TX_CONTROL(Register32):
RESET_RINGS = 0
class ADMACRegs(RegMap):
TX_FLAGS = 0x0, Register32 # one bit per channel
TX_FLAGS_CLEAR = 0x4, Register32
RX_FLAGS = 0x8, Register32
RX_FLAGS_CLEAR = 0xc, Register32
UNK_CONTROL = 0x10, Register32
STATUS0 = 0x34, Register32 # bit per channel, exports 0x10 from TX_UNK4 of channels
STATUS1 = 0x44, Register32
STATUS2 = 0x54, Register32
# a 24 MHz always-running counter
COUNTER_LO = 0x70, Register32
COUNTER_HI = 0x74, R_COUNTER_HI
TX_CONTROL = (irange(0x8000, 12, 0x400)), R_TX_CONTROL
TX_UNK_STATUS0 = irange(0x8010, 12, 0x400), Register32
TX_STATUS1 = irange(0x8014, 12, 0x400), R_TX_STATUS1
TX_UNK_STATUS2 = irange(0x8018, 12, 0x400), Register32
TX_UNK_STATUS3 = irange(0x801c, 12, 0x400), Register32
TX_UNK_STATUS4 = irange(0x801c, 12, 0x400), Register32
TX_UNK3 = irange(0x8060, 12, 0x400), Register32
TX_UNK4 = irange(0x8024, 12, 0x400), Register32
TX_DESC_RING = irange(0x8070, 12, 0x400), R_DESC_RING
TX_REPORT_RING = irange(0x8074, 12, 0x400), R_REPORT_RING
TX_UNK2 = (irange(0x8000, 12, 0x400), irange(0x78, (0x200-0x78)//4, 4)), Register32
RX_UNK = (irange(0x8200, 12, 0x400), irange(0, 0x200//4, 4)), Register32
DESC_WRITE = irange(0x10000, 12, 4), Register32
REPORT_READ = irange(0x10100, 12, 4), Register32
class ADMADescriptorFlags(Register32):
# macos always writes descriptors in pairs,
# the second descriptor has this bit set
UNK_LAST = 16
DESC_ID = 7, 0
class ADMADescriptor(Reloadable):
def __init__(self, addr, length, flags):
self.addr, self.length, self.flags = addr, length, ADMADescriptorFlags(flags)
def __repr__(self):
return f"<descriptor: addr=0x{self.addr:x} len=0x{self.length:x} flags={self.flags}>"
def ser(self):
return [
self.addr & (1<<32)-1,
self.addr>>32 & (1<<32)-1,
self.length & (1<<32)-1,
int(self.flags)
]
@classmethod
def deser(self, seq):
if not len(seq) == 4:
raise ValueError
return ADMADescriptor(
seq[0] | seq[1] << 32, # addr
seq[2], # length (in bytes)
seq[3] # flags
)
class ADMAReportFlags(Register32):
UNK1 = 24
UNK2 = 25
UNK3 = 27
DESC_ID = 8, 0
class ADMAReport(Reloadable):
def __init__(self, countval, unk1, flags):
self.countval, self.unk1, self.flags = countval, unk1, ADMAReportFlags(flags)
def __repr__(self):
return f"<report: countval=0x{self.countval:x} unk1=0x{self.unk1:x} flags={self.flags}>"
def ser(self):
return [
self.countval & (1<<32)-1,
self.countval>>32 & (1<<32)-1,
self.unk1 & (1<<32)-1,
int(self.flags)
]
@classmethod
def deser(self, seq):
if not len(seq) == 4:
raise ValueError
return ADMAReport(
seq[0] | seq[1] << 32, # countval
seq[2], # unk1
seq[3] # flags
)
class ADMAC(Reloadable):
def __init__(self, u, devpath, dart=None):
self.u = u
self.p = u.proxy
self.base, _ = u.adt[devpath].get_reg(0)
self.regs = ADMACRegs(u, self.base)
self.dart = dart
def tx_enable(self, channo):
self.regs.TX_FLAGS.val = 1<<channo
def tx_disable(self, channo):
self.regs.TX_FLAGS_CLEAR.val = 1<<channo
def tx_reset(self, channo):
self.regs.TX_CONTROL[channo].val = 1
self.regs.TX_CONTROL[channo].val = 0
def tx_submit(self, channo, addr, length, flags):
desc = ADMADescriptor(addr, length, flags)
print(f"submitting: {desc}")
if self.regs.TX_DESC_RING[channo].reg.FULL:
raise Exception("descriptor ring full")
for piece in desc.ser():
self.regs.DESC_WRITE[channo].val = piece
def tx_can_submit(self, channo):
return not self.regs.TX_DESC_RING[channo].reg.FULL
def tx_have_report(self, channo):
return not self.regs.TX_REPORT_RING[channo].reg.EMPTY
def tx_read_report(self, channo):
if self.regs.TX_REPORT_RING[channo].reg.EMPTY:
raise Exception("report ring empty")
pieces = []
for _ in range(4):
pieces.append(self.regs.REPORT_READ[channo].val)
return ADMAReport.deser(pieces)
def tx_status(self, channo):
reg = self.regs.TX_STATUS1[channo].reg
self.regs.TX_STATUS1[channo].val = 0xffffffff
return reg
class PollingConsole(code.InteractiveConsole):
def __init__(self, locals=None, filename="<console>"):
global patch_stdout, PromptSession, FileHistory
global Thread, Queue, Empty
from prompt_toolkit import PromptSession
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from threading import Thread
from queue import Queue, Empty
super().__init__(locals, filename)
self._qu_input = Queue()
self._qu_result = Queue()
self._should_exit = False
self.session = PromptSession(history=FileHistory(os.path.expanduser("~/.m1n1-history")))
self._other_thread = Thread(target=self._other_thread_main, daemon=False)
self._other_thread.start()
def __enter__(self):
self._patch = patch_stdout()
self._patch.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._patch.__exit__(exc_type, exc_val, exc_tb)
def _other_thread_main(self):
first = True
while True:
if first:
more_input = False
first = False
else:
more_input = self._qu_result.get()
try:
self._qu_input.put(self.session.prompt("(♫♫) " if not more_input else "... "))
except EOFError:
self._qu_input.put(None)
return
def poll(self):
if self._should_exit:
return False
try:
line = self._qu_input.get(timeout=0.01)
except Empty:
return True
if line is None:
self._should_exit = True
return False
self._qu_result.put(self.push(line))
return True
class NoConsole:
def poll(self):
time.sleep(0.01)
return True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
argparser = argparse.ArgumentParser()
argparser.add_argument("--console", action='store_true')
argparser.add_argument("-f", "--file", "--input", "--samples",
type=str, default=None,
help='input filename to take samples from ' \
'(default: standard input)')
args = argparser.parse_args()
if args.console and args.file is None:
print("Specify file with samples (option -f) if using console")
sys.exit(1)
inp = open(args.file, "rb") if args.file is not None else sys.stdin.buffer
p.pmgr_adt_clocks_enable("/arm-io/gpio")
p.pmgr_adt_clocks_enable("/arm-io/i2c1")
p.pmgr_adt_clocks_enable("/arm-io/admac-sio")
p.pmgr_adt_clocks_enable("/arm-io/dart-sio")
p.pmgr_adt_clocks_enable("/arm-io/mca-switch")
channo = 2
admac = ADMAC(u, "/arm-io/admac-sio")
admac.tx_disable(channo)
admac.tx_reset(channo)
admac.regs.UNK_CONTROL.val = 1
admac.regs.UNK_CONTROL.val = 0
while admac.tx_have_report(channo):
print("stale report: ", admac.tx_read_report(channo))
def pmgr_reset():
# pmgr-related, unknown meaning,
# needs to be written for the speaker-amp IC to respond over I2C
p.write32(0x23d10c000, 0)
p.write32(0x23d10c004, 3)
p.write32(0x23d10c008, 0)
p.write32(0x23d10c00c, 3)
pmgr_reset()
p.write32(0x238400000, 0x0)
p.write32(0x238400000, 0x2)
p.write32(0x238400100, 0x0)
p.write32(0x238400100, 0x2)
p.write32(0x238400300, 0x0)
p.write32(0x238400300, 0x2)
p.write32(0x238404300, 0x0)
p.write32(0x238404300, 0x2)
p.write32(0x238404100, 0x0)
p.write32(0x238404100, 0x2)
p.write32(0x238404000, 0x0)
p.write32(0x238404000, 0x2)
p.write32(0x238208840, 0x22)
p.write32(0x238208854, 0xc00060)
p.write32(0x238208854, 0xc00060)
mca_switch_base = 0x2_3840_0000
p.write32(0x238404004, 0x100)
p.write32(0x238404104, 0x200)
p.write32(0x238404108, 0x0)
p.write32(0x23840410c, 0xfe)
p.write32(0x238408004, 0x100)
p.write32(0x23840c004, 0x100)
p.write32(0x238308000, 0x102048)
# bits 0x0000e0 influence clock
# 0x00000f influence sample serialization
p.write32(0x23b0400d8, 0x06000000) # 48 ksps, zero-out for ~96 ksps
p.write32(0x238400600, 0xe) # 0x8 or have zeroed samples, 0x6 or have no clock
p.write32(0x238400604, 0x200) # sensitive in mask 0xf00, any other value disables clock
p.write32(0x238400608, 0x4) # 0x4 or zeroed samples
chunk_size = 0x10000
heap_start = 0x220000
heap_size = 16*chunk_size
heap_end = heap_start + heap_size
heap = heap_start
dart_base, _ = u.adt["/arm-io/dart-sio"].get_reg(0) # stream index 2
dart = DART(iface, DARTRegs(u, dart_base), util=u)
dart.initialize()
dart.iomap_at(2, 0x220000, 0x8_0100_0000, 16*chunk_size)
dart.invalidate_streams()
# the counter isn't necessary for anything, really, it just
# demonstrates how descriptor IDs propagate into reports
chunk_counter = 1
def fill_data():
global heap, chunk_counter
if heap + chunk_size > heap_end:
heap = heap_start
bytes_ = inp.read(chunk_size)
dart.iowrite(2, heap, bytes_)
dart.invalidate_streams()
admac.tx_submit(channo, heap, chunk_size, 0x100 | (chunk_counter & 0xff))
chunk_counter += 1
heap += chunk_size
# toggle the GPIO line driving the speaker-amp IC reset
p.write32(0x23c1002d4, 0x76a02) # invoke reset
p.write32(0x23c1002d4, 0x76a03) # take out of reset
i2c1 = I2C(u, "/arm-io/i2c1")
fill_data()
admac.tx_enable(channo)
# accesses to 0x100-sized blocks in the +0x4000 region require
# the associated enable bit cleared, or they cause SErrors
def mca_switch_unk_disable():
for off in [0x4000, 0x4100, 0x4300]:
p.write32(mca_switch_base + off, 0x0)
def mca_switch_unk_enable():
for off in [0x4000, 0x4100, 0x4300]:
p.write32(mca_switch_base + off, 0x1)
p.write32(0x238404104, 0x202)
p.write32(0x238404208, 0x3107)
mca_switch_unk_enable()
# by ADT and leaked schematic, i2c1 contains TAS5770L,
# which is not a public part. but there's e.g. TAS2110
# with similar registers
#
# https://www.ti.com/product/TAS2110
#
# if the speaker-amp IC loses clock on the serial sample input,
# it automatically switches to software shutdown.
#
i2c1.write_reg(0x31, 0x08, [0x40])
i2c1.write_reg(0x31, 0x0a, [0x06])
i2c1.write_reg(0x31, 0x0b, [0x00])
i2c1.write_reg(0x31, 0x0c, [0x1a])
i2c1.write_reg(0x31, 0x1c, [0x82])
i2c1.write_reg(0x31, 0x1d, [0x06])
i2c1.write_reg(0x31, 0x16, [0x50])
i2c1.write_reg(0x31, 0x17, [0x04])
i2c1.write_reg(0x31, 0x1b, [0x01])
i2c1.write_reg(0x31, 0x0d, [0x00])
#i2c1.write_reg(0x31, 0x03, [0x14])
# amplifier gain, presumably this is the lowest setting
i2c1.write_reg(0x31, 0x03, [0x0])
# take the IC out of software shutdown
i2c1.write_reg(0x31, 0x02, [0x0c])
with (PollingConsole(locals()) if args.console else NoConsole()) as cons:
try:
while cons.poll():
while (not admac.tx_have_report(channo)) and cons.poll():
pass
while admac.tx_have_report(channo):
print("report: ", admac.tx_read_report(channo))
while admac.tx_can_submit(channo):
fill_data()
except KeyboardInterrupt:
pass
# mute
i2c1.write_reg(0x31, 0x02, [0x0d])
# software shutdown
i2c1.write_reg(0x31, 0x02, [0x0e])
admac.tx_disable(channo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment