Create a gist now

Instantly share code, notes, and snippets.

Embed
Onkyo eISCP Test
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from struct import pack, unpack
INPUTS = {'00' : 'VCR/DVR',
'01' : 'CBL/SAT',
'02' : 'GAME',
'03' : 'AUX',
'05' : 'PC',
'10' : 'BD/DVD',
'08750' : 'Tuner',
'23' : 'TV/CD',
'2B' : 'Net',
'29' : 'USB'}
class OnkyoReceiver(object):
def __init__(self):
self.status = None # Power status
self.input = None # Currently active input
def __repr__(self):
return '[OnkyoReceiver] status: {0}, input: {1}'.format(self.status, INPUTS[self.input])
class OnkyoISCPProtocol(LineReceiver):
HEADER = 'ISCP'
HEADERSIZE = 16
VERSION = 1
UNITTYPE = 1 # Receiver
def connectionMade(self):
print 'Connected to Onkyo receiver...'
#self.sendCommand('PWRQSTN')
#self.sendCommand('PWR01')
#self.queryPowerStatus()
self.selectInput('2B')
def lineReceived(self, line):
print 'Received: {0}'.format(line)
header, headersize, datasize, version = unpack('!4sIIBxxx', line[0:self.HEADERSIZE])
if header != self.HEADER:
print "Received packet is not an ISCP packet"
return
if version != self.VERSION:
print "ISCP version {0} not supported".format(version)
return
message = line[headersize:headersize+datasize].rstrip('\x1a\r\n')
command = message[2:5]
parameter = message[5:]
print 'Decoded response, raw: {0}, command: {1}, parameter: {2}'.format(message, command, parameter)
if command == 'PWR':
self.factory.receiver.status = int(parameter)
elif command == 'SLI':
self.factory.receiver.input = parameter
print self.factory.receiver
def sendCommand(self, command):
message = '!' + str(self.UNITTYPE) + command + '\x0d'
datasize = self.HEADERSIZE + len(message)
line = pack('!4sIIBxxx',
self.HEADER,
self.HEADERSIZE,
datasize,
self.VERSION,
) + message
print "Sending: {0}".format(line)
self.sendLine(line)
def powerOff(self):
'''
Set the receiver to standby mode.
'''
self.sendCommand('PWR00')
def powerOn(self):
'''
Power on the receiver.
'''
self.sendCommand('PWR01')
def queryPowerStatus(self):
'''
Query the current power status of the receiver.
'''
self.sendCommand('PWRQSTN')
def selectInput(self, value):
'''
Change input to selected value.
@param value: the input value (see INPUTS dictionary)
'''
if not value in INPUTS:
return
print "Changing input to: {0}".format(INPUTS[value])
self.sendCommand("SLI{0}".format(value))
class OnkyoISCPFactory(ClientFactory):
def __init__(self, receiver):
self.receiver = receiver
protocol = OnkyoISCPProtocol
def clientConnectionFailed(self, connector, reason):
print 'Client connection failed: {0}'.format(reason.getErrorMessage())
reactor.stop()
def clientConnectionLost(self, connector, reason):
print 'Client connection lost: {0}'.format(reason.getErrorMessage())
reactor.stop()
def main():
receiver = OnkyoReceiver()
factory = OnkyoISCPFactory(receiver)
reactor.connectTCP('192.168.1.101', 60128, factory)
reactor.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment