Skip to content

Instantly share code, notes, and snippets.

@sparky3387
Created August 17, 2019 03:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sparky3387/482f40f6b248ecd6c3d0d08e16a512e0 to your computer and use it in GitHub Desktop.
Save sparky3387/482f40f6b248ecd6c3d0d08e16a512e0 to your computer and use it in GitHub Desktop.
NVIDIA HDMI Control Device (Does not work properly)
#!/usr/bin/env python
import asyncio
import websockets
import logging
import json
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
import cec
from time import sleep
def log_info(message):
print(message)
JSONMETHODTOCEC = [
('Input.Select','','00'),
('Input.Left','','03'),
('Input.Right','','04'),
('Input.Up','','01'),
('Input.Down','','02'),
('Input.ShowOSD','','UNUSED'),
('Input.Info','','UNUSED'),
('Input.Back','','0D'),
('Input.Home','','09'),
('Input.SendText','','UNUSED'),
('VideoLibrary.Scan','','UNUSED'),
('Input.ContextMenu','','UNUSED'),
('Player.GetActivePlayers','','UNUSED'),
#Pause then play
('Player.PlayPause',1,'46'),
('Player.PlayPause',2,'44'),
('Player.Stop','','45'),
('Input.ExecuteAction','play','44'),
('Input.ExecuteAction','mute','43'),
('Input.ExecuteAction','stepback','48'),
('Input.ExecuteAction','stepforward','49'),
('Input.ExecuteAction','skipprevious','4C'),
('Input.ExecuteAction','skipnext','4B'),
('Input.ExecuteAction','volumeup','41'),
('Input.ExecuteAction','volumedown','42')
]
cec.init() # use default adapter
devices = cec.list_devices()
shieldnumber = None
tvnumber = None
while (1):
log_info("Initializing CEC-Control detecting devices")
for device in devices:
if (cec.Device(device).osd_string=="SHIELD"):
shieldnumber = cec.Device(device)
if shieldnumber is not None:
break
sleep(5)
async def json_recieve(websocket, path):
#So the first time will be pause second play
playerpause = 2
#Security through obscurity unfortunetly....
if path=='/jsonrpc':
while True:
try:
json_string = await websocket.recv()
json_loaded = json.loads(json_string)
if len(json_loaded)>1 and 'method' in json_loaded:
if json_loaded['method'] == 'Player.GetActivePlayers':
playerpause+=1
if playerpause==3:
playerpause=1
await websocket.send('{"id": 1, "jsonrpc": "2.0", "result": [ { "playerid": 1, "type": "video" } ]}')
continue
elif json_loaded['method'] == 'Player.PlayPause':
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method'] and action==playerpause)
elif json_loaded['method'] == 'Input.ExecuteAction':
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method'] and action==json_loaded['params']['action'])
else:
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method'])
shieldaddress = shieldnumber.address
shieldnumber.transmit(0x44,opcode)
#Simulate human presses
#This code is where it breaks, it waits too long to send the key up code, forcing every touch to be a long touch
#sleep(0.1)
#cec.transmit(shieldaddress,0x8b,opcode)
shieldnumber.transmit(0x8b,opcode)
await websocket.send('{"id":1,"jsonrpc":"2.0","result":"OK"}')
except websockets.exceptions.ConnectionClosedOK:
pass
except websockets.exceptions.ConnectionClosedError:
pass
#Enter IP Address and Port for your Kassi configuration
start_server = websockets.serve(json_recieve, "192.168.1.1", 9090)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment