Created
May 21, 2016 15:02
-
-
Save Elettronik/cb7bed634d7425b2571a6c547228e6a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import copy | |
import io | |
import logging | |
from babelfish import Language | |
from guessit import guessit | |
try: | |
from lxml import etree | |
except ImportError: | |
try: | |
import xml.etree.cElementTree as etree | |
except ImportError: | |
import xml.etree.ElementTree as etree | |
from requests import Session | |
from zipfile import ZipFile, is_zipfile | |
from subliminal.providers import Provider | |
from subliminal import __version__ | |
from subliminal.cache import SHOW_EXPIRATION_TIME, region | |
from subliminal.exceptions import AuthenticationError, ConfigurationError, TooManyRequests | |
from subliminal.subtitle import (Subtitle, fix_line_ending, guess_matches, sanitize) | |
from subliminal.video import Episode | |
logger = logging.getLogger(__name__) | |
class ItaSASubtitle(Subtitle): | |
provider_name = 'itasa' | |
def __init__(self, sub_id, series, season, episode, video_format, year, tvdb_id, full_data): | |
super(ItaSASubtitle, self).__init__(Language('ita')) | |
self.sub_id = sub_id | |
self.series = series | |
self.season = season | |
self.episode = episode | |
self.format = video_format | |
self.year = year | |
self.tvdb_id = tvdb_id | |
self.full_data = full_data | |
@property | |
def id(self): | |
return self.sub_id | |
def get_matches(self, video, hearing_impaired=False): | |
matches = set() | |
# series | |
if video.series and sanitize(self.series) == sanitize(video.series): | |
matches.add('series') | |
# season | |
if video.season and self.season == video.season: | |
matches.add('season') | |
# episode | |
if video.episode and self.episode == video.episode: | |
matches.add('episode') | |
# format | |
if video.format and video.format.lower() in self.format.lower(): | |
matches.add('format') | |
if not video.format and not self.format: | |
matches.add('format') | |
if video.year and self.year == video.year: | |
matches.add('year') | |
if video.series_tvdb_id and self.tvdb_id == video.series_tvdb_id: | |
matches.add('tvdb_id') | |
# other properties | |
matches |= guess_matches(video, guessit(self.full_data), partial=True) | |
return matches | |
class ItaSAProvider(Provider): | |
languages = {Language('ita')} | |
video_types = (Episode,) | |
server_url = 'https://api.italiansubs.net/api/rest/' | |
apikey = 'd86ad6ec041b334fac1e512174ee04d5' | |
def __init__(self, username=None, password=None): | |
if username is not None and password is None or username is None and password is not None: | |
raise ConfigurationError('Username and password must be specified') | |
self.username = username | |
self.password = password | |
self.logged_in = False | |
self.login_itasa = False | |
def initialize(self): | |
self.session = Session() | |
self.session.headers['User-Agent'] = 'Subliminal/%s' % __version__ | |
# login | |
if self.username is not None and self.password is not None: | |
logger.info('Logging in') | |
params = { | |
'username': self.username, | |
'password': self.password, | |
'apikey': self.apikey | |
} | |
r = self.session.get(self.server_url + 'users/login', params=params, allow_redirects=False, timeout=10) | |
root = etree.fromstring(r.content) | |
if root.find('status').text == 'fail': | |
raise AuthenticationError(root.find('error/message').text) | |
self.auth_code = root.find('data/user/authcode').text | |
data = { | |
'username': self.username, | |
'passwd': self.password, | |
'remember': 'yes', | |
'option': 'com_user', | |
'task': 'login', | |
'silent': 'true' | |
} | |
r = self.session.post('http://www.italiansubs.net/index.php', data=data, allow_redirects=False, timeout=30) | |
r.raise_for_status() | |
self.logged_in = True | |
def terminate(self): | |
self.session.close() | |
self.logged_in = False | |
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME) | |
def _get_show_ids(self): | |
"""Get the ``dict`` of show ids per series by querying the `shows` page. | |
:return: show id per series, lower case and without quotes. | |
:rtype: dict | |
""" | |
# get the show page | |
logger.info('Getting show ids') | |
params = {'apikey': self.apikey} | |
r = self.session.get(self.server_url + 'shows', timeout=10, params=params) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
# populate the show ids | |
show_ids = {} | |
for show in root.findall('data/shows/show'): | |
if show.find('name').text is None: | |
continue | |
show_ids[sanitize(show.find('name').text).lower()] = int(show.find('id').text) | |
logger.debug('Found %d show ids', len(show_ids)) | |
return show_ids | |
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME) | |
def _search_show_id(self, series): | |
"""Search the show id from the `series` | |
:param str series: series of the episode. | |
:return: the show id, if found. | |
:rtype: int or None | |
""" | |
# build the param | |
params = {'apikey': self.apikey, 'q': series} | |
# make the search | |
logger.info('Searching show ids with %r', params) | |
r = self.session.get(self.server_url + 'shows/search', params=params, timeout=10) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
if int(root.find('data/count').text) == 0: | |
logger.warning('Show id not found: no suggestion') | |
return None | |
# Looking for show in first page | |
for show in root.findall('data/shows/show'): | |
if sanitize(show.find('name').text).lower() == sanitize(series.lower()): | |
show_id = int(show.find('id').text) | |
logger.debug('Found show id %d', show_id) | |
return show_id | |
# Not in the first page of result try next (if any) | |
next = root.find('data/next') | |
while next.text is not None: | |
r = self.session.get(next.text, timeout=10) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
logger.info('Loading suggestion page %r', root.find('data/page').text) | |
# Looking for show in following pages | |
for show in root.findall('data/shows/show'): | |
if sanitize(show.find('name').text).lower() == sanitize(series.lower()): | |
show_id = int(show.find('id').text) | |
logger.debug('Found show id %d', show_id) | |
return show_id | |
next = root.find('data/next') | |
# No matches found | |
logger.warning('Show id not found: suggestions does not match') | |
return None | |
def get_show_id(self, series, country_code=None): | |
"""Get the best matching show id for `series`. | |
First search in the result of :meth:`_get_show_ids` and fallback on a search with :meth:`_search_show_id` | |
:param str series: series of the episode. | |
:return: the show id, if found. | |
:rtype: int or None | |
""" | |
series_sanitized = sanitize(series).lower() | |
show_ids = self._get_show_ids() | |
show_id = None | |
# attempt with country | |
if not show_id and country_code: | |
logger.debug('Getting show id with country') | |
show_id = show_ids.get('%s %s' % (series_sanitized, country_code.lower())) | |
# attempt clean | |
if not show_id: | |
logger.debug('Getting show id') | |
show_id = show_ids.get(series_sanitized) | |
# search as last resort | |
if not show_id: | |
logger.warning('Series not found in show ids') | |
show_id = self._search_show_id(series) | |
return show_id | |
def _download_zip(self, sub_id): | |
# download the subtitle | |
logger.info('Downloading subtitle %r', sub_id) | |
params = { | |
'authcode': self.auth_code, | |
'apikey': self.apikey, | |
'subtitle_id': sub_id | |
} | |
r = self.session.get(self.server_url + 'subtitles/download', params=params, timeout=30) | |
r.raise_for_status() | |
return r.content | |
def query(self, series, season, episode, format, resolution, country=None): | |
# To make queries you need to be logged in | |
if not self.logged_in: | |
raise ConfigurationError('Cannot query if not logged in') | |
# get the show id | |
show_id = self.get_show_id(series, country) | |
if show_id is None: | |
logger.error('No show id found for %r ', series) | |
return [] | |
# get the page of the season of the show | |
logger.info('Getting the subtitle of show id %d, season %d episode %d, format %r', show_id, | |
season, episode, format) | |
subtitles = [] | |
# Default format is SDTV | |
sub_format = '' | |
if format is None or format.lower() == 'hdtv': | |
if resolution in ('1080i', '1080p', '720p'): | |
sub_format = resolution | |
else: | |
sub_format = 'normale' | |
else: | |
sub_format = format.lower() | |
# Look for year | |
params = { | |
'apikey': self.apikey | |
} | |
r = self.session.get(self.server_url + 'shows/' + str(show_id), params=params, timeout=30) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
year = root.find('data/show/started').text | |
if year: | |
year = int(year.split('-', 1)[0]) | |
tvdb_id = root.find('data/show/id_tvdb').text | |
params = { | |
'apikey': self.apikey, | |
'show_id': show_id, | |
'q': '%dx%02d' % (season, episode), | |
'version': sub_format | |
} | |
r = self.session.get(self.server_url + 'subtitles/search', params=params, timeout=30) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
if int(root.find('data/count').text) == 0: | |
logger.warning('Subtitles not found') | |
return [] | |
# Looking for subtitles in first page | |
for subtitle in root.findall('data/subtitles/subtitle'): | |
if '%dx%02d' % (season, episode) in subtitle.find('name').text.lower(): | |
logger.debug('Found subtitle id %d - %r - %r', | |
int(subtitle.find('id').text), | |
subtitle.find('name').text, | |
subtitle.find('version').text) | |
sub = ItaSASubtitle( | |
int(subtitle.find('id').text), | |
subtitle.find('show_name').text, | |
season, | |
episode, | |
format, | |
year, | |
tvdb_id, | |
subtitle.find('name').text) | |
subtitles.append(sub) | |
# Not in the first page of result try next (if any) | |
next = root.find('data/next') | |
while next.text is not None: | |
r = self.session.get(next.text, timeout=30) | |
r.raise_for_status() | |
root = etree.fromstring(r.content) | |
logger.info('Loading subtitles page %r', root.data.page.text) | |
# Looking for show in following pages | |
for subtitle in root.findall('data/subtitles/subtitle'): | |
if '%dx%02d' % (season, episode) in subtitle.find('name').text.lower(): | |
logger.debug('Found subtitle id %d - %r - %r', | |
int(subtitle.find('id').text), | |
subtitle.find('name').text, | |
subtitle.find('version').text) | |
sub = ItaSASubtitle( | |
int(subtitle.find('id').text), | |
subtitle.find('show_name').text, | |
season, | |
episode, | |
format, | |
year, | |
tvdb_id, | |
subtitle.find('name').text) | |
subtitles.append(sub) | |
next = root.find('data/next') | |
# Dowload the subs found, can be more than one in zip | |
additional_subs = [] | |
for sub in subtitles: | |
# open the zip | |
content = self._download_zip(sub.sub_id) | |
if not is_zipfile(io.BytesIO(content)): | |
if 'limite di download' in content: | |
raise TooManyRequests() | |
else: | |
raise ConfigurationError('Not a zip file: %r' % content) | |
with ZipFile(io.BytesIO(content)) as zf: | |
if len(zf.namelist()) > 1: | |
for index, name in enumerate(zf.namelist()): | |
if index == 0: | |
# First elemnent | |
sub.content = fix_line_ending(zf.read(name)) | |
sub.full_data = name | |
else: | |
add_sub = copy.deepcopy(sub) | |
add_sub.content = fix_line_ending(zf.read(name)) | |
add_sub.full_data = name | |
additional_subs.append(add_sub) | |
else: | |
sub.content = fix_line_ending(zf.read(zf.namelist()[0])) | |
sub.full_data = zf.namelist()[0] | |
return subtitles + additional_subs | |
def list_subtitles(self, video, languages): | |
return self.query(video.series, video.season, video.episode, video.format, video.resolution) | |
def download_subtitle(self, subtitle): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment