Skip to content

Instantly share code, notes, and snippets.

@arpruss
Last active February 27, 2023 14:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arpruss/cb55ffbe0c9713b80fcf1ab8e378e2ca to your computer and use it in GitHub Desktop.
Save arpruss/cb55ffbe0c9713b80fcf1ab8e378e2ca to your computer and use it in GitHub Desktop.
from __future__ import print_function
import nintaco
import math
import bike
nintaco.initRemoteAPI("localhost", 9999)
api = nintaco.getAPI()
# RX
SPEED = 1
POWER = 2
HEART = 3
RPM = 6
RPM_MASK = (1<<11)
START = 0xF
# TX
GRADE = 1
WIND = 2
WEIGHT = 3
PULSE_TARGET_LOW = 4
PULSE_TARGET_HIGH = 5
START_RACE = 0xF
lastTXData = {}
lastWrote = None
newFrame = False
skipFrame = False
txPos = 0
posInFrame = 0
txData = 0
rxFeed = [SPEED,HEART,RPM,POWER]
rxFeedValues = { SPEED:25*50, HEART:74, RPM:65|RPM_MASK, POWER:500 }
rxFeedPos = 0
power = 0
def clamp(low,high,x):
return min(max(x,low),high)
def setSpeed(v):
s = math.floor(v*50+0.5)
rxFeedValues[SPEED] = clamp(0,0xFFF,s)
def setHeart(h):
rxFeedValues[HEART] = clamp(0,0xFF,h)
def setCadence(rpm):
rxFeedValues[RPM] = clamp(0,0x7FF,rpm)|RPM_MASK
def setPower(_power):
global power
power = _power
p = math.floor(_power / 3054 * 0xFFF + 0.5)
rxFeedValues[POWER] = clamp(0,0xFFF,p)
def signFix(v):
if v & 0x800:
return -((v^0xFFF)+1)
else:
return v
# percent
def getGrade():
if GRADE not in lastTXData:
return 0
return signFix(lastTXData[GRADE]) / 10.
# mph
def getWind():
if WIND not in lastTXData:
return 0
return signFix(lastTXData[WIND])
# lbs
def getWeight():
if WEIGHT not in lastTXData:
return 0
return signFix(lastTXData[WEIGHT])
def getPulseTargetLow():
if PULSE_TARGET_LOW not in lastTXData:
return 0
p = lastTXData[PULSE_TARGET_LOW]
if p & 0x800:
return 0x7FF & p
else:
return 0
def getPulseTargetHigh():
if PULSE_TARGET_HIGH not in lastTXData:
return 0
p = lastTXData[PULSE_TARGET_HIGH]
if p & 0x800:
return 0x7FF & p
else:
return 0
def updateRX(index):
if index == SPEED:
setSpeed(bike.calculateSpeedMPH(getGrade(),getWind(),getWeight(),power))
def showData(changedIndex,changedValue):
print(changedIndex,changedValue)
print("grade=%0.1f%% wind=%d mph weight=%d lbs pulseTargets=[%d,%d]" %
(getGrade(), getWind(), getWeight(), getPulseTargetLow(), getPulseTargetHigh()))
def getRXValue(index,value,bitPos):
if bitPos == 12:
return 0 # todo
elif bitPos == 13:
return 1 # todo
elif bitPos == 14:
return 0 # todo
elif bitPos == 15:
return 1 # todo
bitPos -= 16
data = value | ((index & 0xFFF) << 12)
bitValue = ( data >> (bitPos // 2) ) & 1
if bitPos % 2 == 0:
return bitValue^1
else:
return bitValue
def PostRead(accessPointType, address, value):
global lastWrote,newFrame,skipFrame,txPos,txData,posInFrame,rxFeedPos
out = -1
if posInFrame != 0:
if txPos <= 24:
rxPos = posInFrame - 1
else:
rxPos = posInFrame + 23
if rxPos >= 12:
out = ( value & ~1 ) | getRXValue(rxFeed[rxFeedPos],rxFeedValues[rxFeed[rxFeedPos]],rxPos)
rxPos += 1
if rxPos == 48 and not skipFrame:
rxFeedPos = (rxFeedPos+1) % len(rxFeed)
updateRX(rxFeed[rxFeedPos])
if lastWrote is not None:
if newFrame:
if txPos == 0:
if lastWrote != 0:
skipFrame = True
print("skip")
elif txPos == 24:
if lastWrote != 1:
skipFrame = True
print("skip")
newFrame = False
if not skipFrame and posInFrame != 24:
txData |= lastWrote << txPos
txPos += 1
posInFrame += 1
lastWrote = None
return out
def PreWrite(accessPointType, address, value):
global lastWrote
lastWrote = value & 1
return -1
def parseTX(bits):
out = 0
for i in range(24):
b0 = (bits>>(2*i))&1
out |= (b0 << i)
if (out & 0x7F) != 0b0010110 or ((out >> 12) & 1) != 1:
return None
out >>= 7
bits0_4 = out & 0b11111
out >>= 6
bits5_11 = out & 0b1111111
data = bits0_4 | (bits5_11 << 5)
out >>= 7
index = out
if index not in lastTXData or data != lastTXData[index]:
lastTXData[index] = data
showData(index,data)
return(index,data)
def Frame():
global newFrame,txData,txPos,skipFrame,posInFrame
newFrame = True
posInFrame = 0
if txPos == 48:
parseTX(txData)
if txPos != 24 or skipFrame:
txData = 0
txPos = 0
skipFrame = False
setSpeed(12.5)
setCadence(67)
setHeart(121)
setPower(320)
api.addActivateListener(lambda : print("Active"))
api.addAccessPointListener(PreWrite, nintaco.PreWrite, 0x4016)
#api.addAccessPointListener(PreRead, nintaco.PreRead, 0x4016)
api.addAccessPointListener(PostRead, nintaco.PostRead, 0x4016)
api.addFrameListener(Frame)
api.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment