Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Onkyo eISCP Test
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from struct import pack, unpack
INPUTS = {'00' : 'VCR/DVR',
'01' : 'CBL/SAT',
'02' : 'GAME',
'03' : 'AUX',
'05' : 'PC',
'10' : 'BD/DVD',
'08750' : 'Tuner',
'23' : 'TV/CD',
'2B' : 'Net',
'29' : 'USB'}
class OnkyoReceiver(object):
def __init__(self):
self.status = None # Power status
self.input = None # Currently active input
def __repr__(self):
return '[OnkyoReceiver] status: {0}, input: {1}'.format(self.status, INPUTS[self.input])
class OnkyoISCPProtocol(LineReceiver):
HEADER = 'ISCP'
HEADERSIZE = 16
VERSION = 1
UNITTYPE = 1 # Receiver
def connectionMade(self):
print 'Connected to Onkyo receiver...'
#self.sendCommand('PWRQSTN')
#self.sendCommand('PWR01')
#self.queryPowerStatus()
self.selectInput('2B')
def lineReceived(self, line):
print 'Received: {0}'.format(line)
header, headersize, datasize, version = unpack('!4sIIBxxx', line[0:self.HEADERSIZE])
if header != self.HEADER:
print "Received packet is not an ISCP packet"
return
if version != self.VERSION:
print "ISCP version {0} not supported".format(version)
return
message = line[headersize:headersize+datasize].rstrip('\x1a\r\n')
command = message[2:5]
parameter = message[5:]
print 'Decoded response, raw: {0}, command: {1}, parameter: {2}'.format(message, command, parameter)
if command == 'PWR':
self.factory.receiver.status = int(parameter)
elif command == 'SLI':
self.factory.receiver.input = parameter
print self.factory.receiver
def sendCommand(self, command):
message = '!' + str(self.UNITTYPE) + command + '\x0d'
datasize = self.HEADERSIZE + len(message)
line = pack('!4sIIBxxx',
self.HEADER,
self.HEADERSIZE,
datasize,
self.VERSION,
) + message
print "Sending: {0}".format(line)
self.sendLine(line)
def powerOff(self):
'''
Set the receiver to standby mode.
'''
self.sendCommand('PWR00')
def powerOn(self):
'''
Power on the receiver.
'''
self.sendCommand('PWR01')
def queryPowerStatus(self):
'''
Query the current power status of the receiver.
'''
self.sendCommand('PWRQSTN')
def selectInput(self, value):
'''
Change input to selected value.
@param value: the input value (see INPUTS dictionary)
'''
if not value in INPUTS:
return
print "Changing input to: {0}".format(INPUTS[value])
self.sendCommand("SLI{0}".format(value))
class OnkyoISCPFactory(ClientFactory):
def __init__(self, receiver):
self.receiver = receiver
protocol = OnkyoISCPProtocol
def clientConnectionFailed(self, connector, reason):
print 'Client connection failed: {0}'.format(reason.getErrorMessage())
reactor.stop()
def clientConnectionLost(self, connector, reason):
print 'Client connection lost: {0}'.format(reason.getErrorMessage())
reactor.stop()
def main():
receiver = OnkyoReceiver()
factory = OnkyoISCPFactory(receiver)
reactor.connectTCP('192.168.1.101', 60128, factory)
reactor.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment