Skip to content

Instantly share code, notes, and snippets.

@mlapida
Last active December 7, 2017 08:00
Show Gist options
  • Save mlapida/22162294400b18a63c6c to your computer and use it in GitHub Desktop.
Save mlapida/22162294400b18a63c6c to your computer and use it in GitHub Desktop.
This is my attempt to get Alexa to return Plex's On Deck and Recently Downloaded lists. It's not the prettiest, but Plex's API isn't the best at the moment. Step-by-step blog post may be found here: http://mlapida.com/thoughts/plex-alexa-interacting-with-your-media-server-through-voice
from __future__ import print_function
import urllib
import urllib2
import xml.etree.ElementTree
import logging
#enable basic logging to CloudWatch Logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
print("event.session.application.applicationId=" +
event['session']['application']['applicationId'])
#only run if requested by a specific app ID (past your app ID below)
if (event['session']['application']['applicationId'] !=
"amzn1.echo-sdk-ams.app.[AppIDHere]"):
raise ValueError("Invalid Application ID")
if event['session']['new']:
on_session_started({'requestId': event['request']['requestId']},
event['session'])
if event['request']['type'] == "LaunchRequest":
return on_launch(event['request'], event['session'])
elif event['request']['type'] == "IntentRequest":
return on_intent(event['request'], event['session'])
elif event['request']['type'] == "SessionEndedRequest":
return on_session_ended(event['request'], event['session'])
def on_session_started(session_started_request, session):
#called when the session starts
#nothing to do here
print("on_session_started requestId=" + session_started_request['requestId']
+ ", sessionId=" + session['sessionId'])
def on_launch(launch_request, session):
#called when the user launches the skill without specifying what they want
print("on_launch requestId=" + launch_request['requestId'] +
", sessionId=" + session['sessionId'])
# Dispatch to your skill's launch
return get_welcome_response()
def on_intent(intent_request, session):
#called when the user specifies an intent for this skill
print("on_intent requestId=" + intent_request['requestId'] +
", sessionId=" + session['sessionId'])
intent = intent_request['intent']
intent_name = intent_request['intent']['name']
#Plex Credentials - Please Fill In
username = "[username here]"
password = "[password]"
authKey = plex_login(username,password)
# Dispatch to your skill's intent handlers
if intent_name == "Plex":
return plex_list_on_desk(intent, session, authKey)
elif intent_name == "OnDeck":
return plex_list_on_desk(intent, session, authKey)
elif intent_name == "Download":
return plex_list_download(intent, session, authKey)
elif intent_name == "AMAZON.HelpIntent":
return get_welcome_response()
else:
raise ValueError("Invalid intent")
def on_session_ended(session_ended_request, session):
#Called when the user ends the session
print("on_session_ended requestId=" + session_ended_request['requestId'] +
", sessionId=" + session['sessionId'])
# add cleanup logic here
# --------------- Functions that control the skill's behavior ------------------
def get_welcome_response():
#the standard welcome message
session_attributes = {}
card_title = "Welcome"
speech_output = "Welcome to the Plex Skill. " \
"You can requet items on deck " \
"or list downloads"
# If the user either does not reply to the welcome message or says something
# that is not understood, they will be prompted again with this text.
reprompt_text = "Welcome to the Plex Skill. " \
"You can requet items on deck " \
"or list downloads"
should_end_session = False
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
def plex_list_on_desk(intent, session, authKey):
#return on deck TV shows
card_title = intent['name']
session_attributes = {}
should_end_session = True
#call the on deck function
OnDeck = ret_on_deck(str(find_plex_server(authKey)),authKey)
speech_output = OnDeck
reprompt_text = OnDeck
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
def plex_list_download(intent, session, authKey):
#return recently downloaded
card_title = intent['name']
session_attributes = {}
should_end_session = True
#call the recently downloaded function
OnDeck = ret_download(str(find_plex_server(authKey)),authKey)
speech_output = OnDeck
reprompt_text = OnDeck
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
# --------------- Helpers that build all of the responses ----------------------
def build_speechlet_response(title, output, reprompt_text, should_end_session):
#creates the JSON payload for Alexa
return {
'outputSpeech': {
'type': 'PlainText',
'text': output
},
'card': {
'type': 'Simple',
'title': 'SessionSpeechlet - ' + title,
'content': 'SessionSpeechlet - ' + output
},
'reprompt': {
'outputSpeech': {
'type': 'PlainText',
'text': reprompt_text
}
},
'shouldEndSession': should_end_session
}
def build_response(session_attributes, speechlet_response):
return {
'version': '1.0',
'sessionAttributes': session_attributes,
'response': speechlet_response
}
# --------------- Functions specific to Plex ----------------------
def find_plex_server(authKey):
#Select the first server you find
url = "https://plex.tv/devices.xml?X-Plex-Token=" + authKey
try:
request = urllib2.Request(url)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
x = 0
#[TODO]: There should be a better way to do this. Check the API docs
for atype in e.findall('Device'):
if atype.get('provides') == "server":
for conns in atype.findall('Connection'):
if x == 0:
return(conns.get('uri'))
x += 1
except urllib2.URLError, e:
print(e)
def ret_on_deck(url,authKey):
#returns a list of TV Shows that are "on deck"
MainServerURL = url + "/library/onDeck?X-Plex-Token=" + authKey
s = "TV Shows On Deck: \n"
try:
request = urllib2.Request(MainServerURL)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
#look for the TV shows only
for atype in e.findall('Video'):
if atype.get('librarySectionTitle') == "TV Shows":
s += atype.get('grandparentTitle').split("(")[0].strip() + ". \n"
return s
except urllib2.URLError, e:
print(e)
def ret_download(url,authKey):
#returns a list of downloads
MainServerURL = url + "/library/recentlyAdded?X-Plex-Token=" + authKey
#start the string
s = "Recently Added: \n"
#number of shows to list
c = 5
try:
request = urllib2.Request(MainServerURL)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
t = 0
m = 0
#search for TV shows
for atype in e.findall('Directory'):
if t == 0:
s = s + "In TV: \n"
if atype.get('type') == "season" and t < c:
s += atype.get('parentTitle').split("(")[0].strip() + ". \n"
t += 1
#search for Movies
for atype in e.findall('Video'):
if m == 0:
s += "In Movies: \n"
if atype.get('type') == "movie" and m < c:
s += atype.get('title').split("(")[0].strip() + ". \n"
m += 1
return s
except urllib2.URLError, e:
print(e)
def plex_login(username,password):
#returns a Plex auth token
try:
url = "https://plex.tv/users/sign_in.xml"
headers = {
'x-plex-device-name': "AWS Lambda",
'x-plex-device': "AWSv01",
'x-plex-client-identifier': "049ouolknf9u42oihen"
}
values = {
'user[login]' : username,
'user[password]': password
}
data = urllib.urlencode(values)
req = urllib2.Request(url,data,headers)
response = urllib2.urlopen(req)
e = xml.etree.ElementTree.fromstring(response.read())
return(e.get('authenticationToken'))
except urllib2.URLError, e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment