Skip to content

Instantly share code, notes, and snippets.

@maartendamen
Created November 6, 2012 20:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maartendamen/4027234 to your computer and use it in GitHub Desktop.
Save maartendamen/4027234 to your computer and use it in GitHub Desktop.
Onkyo eISCP Test
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from struct import pack, unpack
INPUTS = {'00' : 'VCR/DVR',
'01' : 'CBL/SAT',
'02' : 'GAME',
'03' : 'AUX',
'05' : 'PC',
'10' : 'BD/DVD',
'08750' : 'Tuner',
'23' : 'TV/CD',
'2B' : 'Net',
'29' : 'USB'}
class OnkyoReceiver(object):
def __init__(self):
self.status = None # Power status
self.input = None # Currently active input
def __repr__(self):
return '[OnkyoReceiver] status: {0}, input: {1}'.format(self.status, INPUTS[self.input])
class OnkyoISCPProtocol(LineReceiver):
HEADER = 'ISCP'
HEADERSIZE = 16
VERSION = 1
UNITTYPE = 1 # Receiver
def connectionMade(self):
print 'Connected to Onkyo receiver...'
#self.sendCommand('PWRQSTN')
#self.sendCommand('PWR01')
#self.queryPowerStatus()
self.selectInput('2B')
def lineReceived(self, line):
print 'Received: {0}'.format(line)
header, headersize, datasize, version = unpack('!4sIIBxxx', line[0:self.HEADERSIZE])
if header != self.HEADER:
print "Received packet is not an ISCP packet"
return
if version != self.VERSION:
print "ISCP version {0} not supported".format(version)
return
message = line[headersize:headersize+datasize].rstrip('\x1a\r\n')
command = message[2:5]
parameter = message[5:]
print 'Decoded response, raw: {0}, command: {1}, parameter: {2}'.format(message, command, parameter)
if command == 'PWR':
self.factory.receiver.status = int(parameter)
elif command == 'SLI':
self.factory.receiver.input = parameter
print self.factory.receiver
def sendCommand(self, command):
message = '!' + str(self.UNITTYPE) + command + '\x0d'
datasize = self.HEADERSIZE + len(message)
line = pack('!4sIIBxxx',
self.HEADER,
self.HEADERSIZE,
datasize,
self.VERSION,
) + message
print "Sending: {0}".format(line)
self.sendLine(line)
def powerOff(self):
'''
Set the receiver to standby mode.
'''
self.sendCommand('PWR00')
def powerOn(self):
'''
Power on the receiver.
'''
self.sendCommand('PWR01')
def queryPowerStatus(self):
'''
Query the current power status of the receiver.
'''
self.sendCommand('PWRQSTN')
def selectInput(self, value):
'''
Change input to selected value.
@param value: the input value (see INPUTS dictionary)
'''
if not value in INPUTS:
return
print "Changing input to: {0}".format(INPUTS[value])
self.sendCommand("SLI{0}".format(value))
class OnkyoISCPFactory(ClientFactory):
def __init__(self, receiver):
self.receiver = receiver
protocol = OnkyoISCPProtocol
def clientConnectionFailed(self, connector, reason):
print 'Client connection failed: {0}'.format(reason.getErrorMessage())
reactor.stop()
def clientConnectionLost(self, connector, reason):
print 'Client connection lost: {0}'.format(reason.getErrorMessage())
reactor.stop()
def main():
receiver = OnkyoReceiver()
factory = OnkyoISCPFactory(receiver)
reactor.connectTCP('192.168.1.101', 60128, factory)
reactor.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment