Skip to content

Instantly share code, notes, and snippets.

@matejc
Last active August 18, 2018 19:36
Show Gist options
  • Save matejc/0d8e5f14d8b0611f2834f3e5613ae9db to your computer and use it in GitHub Desktop.
Save matejc/0d8e5f14d8b0611f2834f3e5613ae9db to your computer and use it in GitHub Desktop.
import sys
import logging
import tty
import termios
import argparse
import pychromecast
import threading
from pychromecast.controllers.youtube import YouTubeController
from urllib.parse import urlparse
from urllib.parse import parse_qs
class ThreadJob(threading.Thread):
def __init__(self, callback, interval):
self.callback = callback
self.event = threading.Event()
self.interval = interval
self._stopped = False
super(ThreadJob, self).__init__()
def run(self):
while not self.event.wait(self.interval):
cont = self.callback()
if not cont or self._stopped:
break
def stop(self):
self._stopped = True
def cast_run(videoId, host=None, name=None):
cast = None
if host is not None:
cast = pychromecast._get_chromecast_from_host(host)
elif name is not None:
chromecasts = pychromecast.get_chromecasts()
cast = next(
cc for cc in chromecasts if cc.device.friendly_name == name)
else:
chromecasts = pychromecast.get_chromecasts()
cast = chromecasts[0]
cast.wait()
yt = YouTubeController()
cast.register_handler(yt)
yt.play_video(videoId)
def cleanup():
if cast.media_controller.status.idle_reason is not None:
cast.quit_app()
return False
return True
job = ThreadJob(cleanup, 1.0)
job.start()
return cast, yt, job
class ReadChar():
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old_settings = termios.tcgetattr(self.fd)
tty.setraw(sys.stdin.fileno())
return sys.stdin.read(1)
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_settings)
def set_seek(cast, by):
if cast.media_controller.status.supports_seek is False:
return
cast.media_controller.update_status()
position = cast.media_controller.status.current_time + by
if by >= 0:
if position > cast.media_controller.status.duration:
position = cast.media_controller.status.duration - by
else:
if position < 0:
position = 0
cast.media_controller.seek(position)
def set_volume(cast, by):
volume = cast.status.volume_level + by
if by >= 0:
if volume > 1.0:
volume = 1.0
else:
if volume < 0.0:
volume = 0.0
cast.set_volume(volume)
def toggle_play(cast):
if cast.media_controller.status.supports_pause is False:
return
if cast.media_controller.status.player_state == "PLAYING":
cast.media_controller.pause()
else:
cast.media_controller.play()
def parse_video_id(value):
"""
https://stackoverflow.com/a/7936523
Examples:
- http://youtu.be/SA2iWivDJiE
- http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu
- http://www.youtube.com/embed/SA2iWivDJiE
- http://www.youtube.com/v/SA2iWivDJiE?version=3&amp;hl=en_US
"""
query = urlparse(value)
if query.hostname == 'youtu.be':
return query.path[1:]
if query.hostname in ('www.youtube.com', 'youtube.com'):
if query.path == '/watch':
p = parse_qs(query.query)
return p['v'][0]
if query.path[:7] == '/embed/':
return query.path.split('/')[2]
if query.path[:3] == '/v/':
return query.path.split('/')[2]
# fail?
return None
def main(args):
cast, yt, job = None, None, None
video_id = parse_video_id(args.url)
if args.address is not None:
cast, yt, job = cast_run(video_id, host=(args.address, args.port,
'1-2', 'dev12', "Some TV"))
elif args.name is not None:
cast, yt, job = cast_run(video_id, name=args.name)
else:
cast, yt, job = cast_run(video_id)
print('casting ...')
if args.control is False:
return
print('use keys to control')
lastOrdChars = []
while True:
with ReadChar() as rc:
char = rc
while len(lastOrdChars) >= 3:
del lastOrdChars[0]
lastOrdChars += [ord(rc)]
if ord(char) == 3 or char == "q":
job.stop()
cast.quit_app()
sys.exit()
elif char == "0":
set_volume(cast, 0.1)
elif char == "9":
set_volume(cast, -0.1)
elif ord(char) == 32:
toggle_play(cast)
elif lastOrdChars == [27, 91, 67]: # right
set_seek(cast, 5.0)
elif lastOrdChars == [27, 91, 68]: # left
set_seek(cast, -5.0)
elif lastOrdChars == [27, 91, 65]: # up
set_seek(cast, 60.0)
elif lastOrdChars == [27, 91, 66]: # down
set_seek(cast, -60.0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('url', help='youtube url')
parser.add_argument('--control', help='control by keyboard',
action='store_const', const=True, default=False)
parser.add_argument('--name', help='friendly chromecast name')
parser.add_argument('--address', help='ip address of chromecast')
parser.add_argument('--port', help='port of chromecast', default=8009)
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment