Skip to content

Instantly share code, notes, and snippets.

@robbiet480
Last active February 4, 2017 09:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robbiet480/e0291fce8a9e58e683a19b5dee5c1be5 to your computer and use it in GitHub Desktop.
Save robbiet480/e0291fce8a9e58e683a19b5dee5c1be5 to your computer and use it in GitHub Desktop.
A Python 3/asyncio implementation of DACP discovery and pairing for use with iTunes or Apple TV (tested on aTV Generation 4)
import asyncio
import aiohttp
from aiohttp import web
from zeroconf import ServiceInfo, ServiceBrowser, Zeroconf
import socket
import struct
import random
import hashlib
def generate_hex_string(bits):
return hex(random.getrandbits(bits))[2:-1]
pairing = generate_hex_string(64).upper()
print('Pairing code', pairing)
guid = generate_hex_string(64).upper()
print('GUID', guid)
type_ = '_touch-remote._tcp.local.'
registration_name = 'pyATV-{}._touch-remote._tcp.local.'.format(pairing)
local_ip = socket.inet_aton(socket.gethostbyname(socket.gethostname()))
device_name = 'pyATV - {}'.format(pairing)
print('device_name', device_name)
info = ServiceInfo('_touch-remote._tcp.local.', registration_name,
local_ip, 3689, 0, 0, {
'DvNm': device_name,
'RemV': '10000',
'DvTy': 'computer',
'RemN': 'Remote',
'Pair': pairing,
'txtvers': '1'
})
zeroconf = Zeroconf()
def encode_msg(msg):
encoded = ''.join([('%s%s%s' % (key,
struct.pack('>i', len(value)).decode(),
value)) for key, value in msg.items()])
return 'cmpa%s%s' % (struct.pack('>i', len(encoded)).decode(), encoded)
def calculate_paring_code(pair, code):
payload = str(pair + ''.join([c + '\x00' for c in code])).encode('utf-8')
return hashlib.md5(payload).hexdigest().upper()
def serve():
loop = asyncio.get_event_loop()
loop.run_in_executor(None, zeroconf.register_service, info)
app = aiohttp.web.Application(loop=loop)
srv = None
pin_code = ''.join(['%s' % random.randint(0, 9) for num in range(0, 4)])
print('Please enter', pin_code)
@asyncio.coroutine
def delayed_shutdown():
yield from asyncio.sleep(0.2)
srv.close()
zeroconf.unregister_service(info)
zeroconf.close()
@asyncio.coroutine
def accept_pairing(request):
received_pair_code = request.rel_url.query.get('pairingcode')
expected_code = calculate_paring_code(pairing, pin_code)
print('RECEIVED PAIRING CODE', received_pair_code)
print('EXPECTED CODE', expected_code)
if (received_pair_code == expected_code):
print('Pairing code matched!!!')
else:
print('WRONG PAIRING CODE :(')
cmpg = ''.join([chr(int(guid[i:i + 2], 16)) for i in range(0, 16, 2)])
# Option 1
encoded = encode_msg({'cmpg': cmpg, 'cmnm': device_name,
'cmty': 'computer'})
# Option 2
# values = {'cmpg': cmpg, 'cmnm': device_name, 'cmty': 'computer'}
# encoded = ''
# for key, value in values.items():
# packed = struct.pack('>i', len(value)).decode()
# encoded += '%s%s%s' % (key, packed, value)
# header = 'cmpa%s' % (struct.pack('>i', len(encoded)).decode())
# encoded = '%s%s' % (header, encoded)
# loop.create_task(delayed_shutdown())
return aiohttp.web.Response(content_type='text/plain', text=encoded)
app.router.add_get('/pair', accept_pairing)
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 3689)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
loop.run_forever()
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.shutdown())
loop.run_until_complete(handler.shutdown(60.0))
loop.run_until_complete(app.cleanup())
loop.close()
serve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment