Created
November 6, 2012 20:16
-
-
Save maartendamen/4027234 to your computer and use it in GitHub Desktop.
Onkyo eISCP Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet.protocol import ClientFactory | |
from twisted.protocols.basic import LineReceiver | |
from twisted.internet import reactor | |
from struct import pack, unpack | |
INPUTS = {'00' : 'VCR/DVR', | |
'01' : 'CBL/SAT', | |
'02' : 'GAME', | |
'03' : 'AUX', | |
'05' : 'PC', | |
'10' : 'BD/DVD', | |
'08750' : 'Tuner', | |
'23' : 'TV/CD', | |
'2B' : 'Net', | |
'29' : 'USB'} | |
class OnkyoReceiver(object): | |
def __init__(self): | |
self.status = None # Power status | |
self.input = None # Currently active input | |
def __repr__(self): | |
return '[OnkyoReceiver] status: {0}, input: {1}'.format(self.status, INPUTS[self.input]) | |
class OnkyoISCPProtocol(LineReceiver): | |
HEADER = 'ISCP' | |
HEADERSIZE = 16 | |
VERSION = 1 | |
UNITTYPE = 1 # Receiver | |
def connectionMade(self): | |
print 'Connected to Onkyo receiver...' | |
#self.sendCommand('PWRQSTN') | |
#self.sendCommand('PWR01') | |
#self.queryPowerStatus() | |
self.selectInput('2B') | |
def lineReceived(self, line): | |
print 'Received: {0}'.format(line) | |
header, headersize, datasize, version = unpack('!4sIIBxxx', line[0:self.HEADERSIZE]) | |
if header != self.HEADER: | |
print "Received packet is not an ISCP packet" | |
return | |
if version != self.VERSION: | |
print "ISCP version {0} not supported".format(version) | |
return | |
message = line[headersize:headersize+datasize].rstrip('\x1a\r\n') | |
command = message[2:5] | |
parameter = message[5:] | |
print 'Decoded response, raw: {0}, command: {1}, parameter: {2}'.format(message, command, parameter) | |
if command == 'PWR': | |
self.factory.receiver.status = int(parameter) | |
elif command == 'SLI': | |
self.factory.receiver.input = parameter | |
print self.factory.receiver | |
def sendCommand(self, command): | |
message = '!' + str(self.UNITTYPE) + command + '\x0d' | |
datasize = self.HEADERSIZE + len(message) | |
line = pack('!4sIIBxxx', | |
self.HEADER, | |
self.HEADERSIZE, | |
datasize, | |
self.VERSION, | |
) + message | |
print "Sending: {0}".format(line) | |
self.sendLine(line) | |
def powerOff(self): | |
''' | |
Set the receiver to standby mode. | |
''' | |
self.sendCommand('PWR00') | |
def powerOn(self): | |
''' | |
Power on the receiver. | |
''' | |
self.sendCommand('PWR01') | |
def queryPowerStatus(self): | |
''' | |
Query the current power status of the receiver. | |
''' | |
self.sendCommand('PWRQSTN') | |
def selectInput(self, value): | |
''' | |
Change input to selected value. | |
@param value: the input value (see INPUTS dictionary) | |
''' | |
if not value in INPUTS: | |
return | |
print "Changing input to: {0}".format(INPUTS[value]) | |
self.sendCommand("SLI{0}".format(value)) | |
class OnkyoISCPFactory(ClientFactory): | |
def __init__(self, receiver): | |
self.receiver = receiver | |
protocol = OnkyoISCPProtocol | |
def clientConnectionFailed(self, connector, reason): | |
print 'Client connection failed: {0}'.format(reason.getErrorMessage()) | |
reactor.stop() | |
def clientConnectionLost(self, connector, reason): | |
print 'Client connection lost: {0}'.format(reason.getErrorMessage()) | |
reactor.stop() | |
def main(): | |
receiver = OnkyoReceiver() | |
factory = OnkyoISCPFactory(receiver) | |
reactor.connectTCP('192.168.1.101', 60128, factory) | |
reactor.run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment