summaryrefslogtreecommitdiffstats
path: root/addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py')
-rw-r--r--addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py451
1 files changed, 451 insertions, 0 deletions
diff --git a/addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py b/addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py
new file mode 100644
index 0000000..779789a
--- /dev/null
+++ b/addons/metadata.tvshows.themoviedb.org.python/libs/data_utils.py
@@ -0,0 +1,451 @@
+# -*- coding: UTF-8 -*-
+#
+# Copyright (C) 2020, Team Kodi
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+# pylint: disable=missing-docstring
+#
+# This is based on the metadata.tvmaze scrapper by Roman Miroshnychenko aka Roman V.M.
+
+"""Functions to process data"""
+
+from __future__ import absolute_import, unicode_literals
+
+import re
+import json
+from xbmc import Actor, VideoStreamDetail
+from collections import namedtuple
+from .utils import safe_get, logger
+from . import settings, api_utils
+
+try:
+ from typing import Optional, Tuple, Text, Dict, List, Any # pylint: disable=unused-import
+ from xbmcgui import ListItem # pylint: disable=unused-import
+ InfoType = Dict[Text, Any] # pylint: disable=invalid-name
+except ImportError:
+ pass
+
+TMDB_PARAMS = {'api_key': settings.TMDB_CLOWNCAR, 'language': settings.LANG}
+BASE_URL = 'https://api.themoviedb.org/3/{}'
+FIND_URL = BASE_URL.format('find/{}')
+TAG_RE = re.compile(r'<[^>]+>')
+
+# Regular expressions are listed in order of priority.
+# "TMDB" provider is preferred than other providers (IMDB and TheTVDB),
+# because external providers IDs need to be converted to TMDB_ID.
+SHOW_ID_REGEXPS = (
+ r'(themoviedb)\.org/tv/(\d+).*/episode_group/(.*)', # TMDB_http_link
+ r'(themoviedb)\.org/tv/(\d+)', # TMDB_http_link
+ r'(themoviedb)\.org/./tv/(\d+)', # TMDB_http_link
+ r'(tmdb)\.org/./tv/(\d+)', # TMDB_http_link
+ r'(imdb)\.com/.+/(tt\d+)', # IMDB_http_link
+ r'(thetvdb)\.com.+&id=(\d+)', # TheTVDB_http_link
+ r'(thetvdb)\.com/series/(\d+)', # TheTVDB_http_link
+ r'(thetvdb)\.com/api/.*series/(\d+)', # TheTVDB_http_link
+ r'(thetvdb)\.com/.*?"id":(\d+)', # TheTVDB_http_link
+ r'<uniqueid.+?type="(tvdb|imdb)".*?>([t\d]+?)</uniqueid>'
+)
+
+
+SUPPORTED_ARTWORK_TYPES = {'poster', 'banner'}
+IMAGE_SIZES = ('large', 'original', 'medium')
+CLEAN_PLOT_REPLACEMENTS = (
+ ('<b>', '[B]'),
+ ('</b>', '[/B]'),
+ ('<i>', '[I]'),
+ ('</i>', '[/I]'),
+ ('</p><p>', '[CR]'),
+)
+VALIDEXTIDS = ['tmdb_id', 'imdb_id', 'tvdb_id']
+
+UrlParseResult = namedtuple(
+ 'UrlParseResult', ['provider', 'show_id', 'ep_grouping'])
+
+
+def _clean_plot(plot):
+ # type: (Text) -> Text
+ """Replace HTML tags with Kodi skin tags"""
+ for repl in CLEAN_PLOT_REPLACEMENTS:
+ plot = plot.replace(repl[0], repl[1])
+ plot = TAG_RE.sub('', plot)
+ return plot
+
+
+def _set_cast(cast_info, vtag):
+ # type: (InfoType, ListItem) -> ListItem
+ """Save cast info to list item"""
+ cast = []
+ for item in cast_info:
+ actor = {
+ 'name': item['name'],
+ 'role': item.get('character', item.get('character_name', '')),
+ 'order': item['order'],
+ }
+ thumb = None
+ if safe_get(item, 'profile_path') is not None:
+ thumb = settings.IMAGEROOTURL + item['profile_path']
+ cast.append(Actor(actor['name'], actor['role'], actor['order'], thumb))
+ vtag.setCast(cast)
+
+
+def _get_credits(show_info):
+ # type: (InfoType) -> List[Text]
+ """Extract show creator(s) and writer(s) from show info"""
+ credits = []
+ for item in show_info.get('created_by', []):
+ credits.append(item['name'])
+ for item in show_info.get('credits', {}).get('crew', []):
+ isWriter = item.get('job', '').lower() == 'writer' or item.get(
+ 'department', '').lower() == 'writing'
+ if isWriter and item.get('name') not in credits:
+ credits.append(item['name'])
+ return credits
+
+
+def _get_directors(episode_info):
+ # type: (InfoType) -> List[Text]
+ """Extract episode writer(s) from episode info"""
+ directors_ = []
+ for item in episode_info.get('credits', {}).get('crew', []):
+ if item.get('job') == 'Director':
+ directors_.append(item['name'])
+ return directors_
+
+
+def _set_unique_ids(ext_ids, vtag):
+ # type: (Dict, ListItem) -> ListItem
+ """Extract unique ID in various online databases"""
+ return_ids = {}
+ for key, value in ext_ids.items():
+ if key in VALIDEXTIDS and value:
+ if key == 'tmdb_id':
+ isTMDB = True
+ else:
+ isTMDB = False
+ shortkey = key[:-3]
+ str_value = str(value)
+ vtag.setUniqueID(str_value, type=shortkey, isdefault=isTMDB)
+ return_ids[shortkey] = str_value
+ return return_ids
+
+
+def _set_rating(the_info, vtag):
+ # type: (InfoType, ListItem) -> None
+ """Set show/episode rating"""
+ first = True
+ for rating_type in settings.RATING_TYPES:
+ logger.debug('adding rating type of %s' % rating_type)
+ rating = float(the_info.get('ratings', {}).get(
+ rating_type, {}).get('rating', '0'))
+ votes = int(the_info.get('ratings', {}).get(
+ rating_type, {}).get('votes', '0'))
+ logger.debug("adding rating of %s and votes of %s" %
+ (str(rating), str(votes)))
+ if rating > 0:
+ vtag.setRating(rating, votes=votes,
+ type=rating_type, isdefault=first)
+ first = False
+
+
+def _add_season_info(show_info, vtag):
+ # type: (InfoType, ListItem) -> None
+ """Add info for show seasons"""
+ for season in show_info['seasons']:
+ logger.debug('adding information for season %s to list item' %
+ season['season_number'])
+ vtag.addSeason(season['season_number'],
+ safe_get(season, 'name', ''))
+ for image_type, image_list in season.get('images', {}).items():
+ if image_type == 'posters':
+ destination = 'poster'
+ else:
+ destination = image_type
+ for image in image_list:
+ theurl, previewurl = get_image_urls(image)
+ if theurl:
+ vtag.addAvailableArtwork(
+ theurl, arttype=destination, preview=previewurl, season=season['season_number'])
+
+
+def get_image_urls(image):
+ # type: (Dict) -> Tuple[Text, Text]
+ """Get image URLs from image information"""
+ if image.get('file_path', '').endswith('.svg'):
+ return None, None
+ if image.get('type') == 'fanarttv':
+ theurl = image['file_path']
+ previewurl = theurl.replace(
+ '.fanart.tv/fanart/', '.fanart.tv/preview/')
+ else:
+ theurl = settings.IMAGEROOTURL + image['file_path']
+ previewurl = settings.PREVIEWROOTURL + image['file_path']
+ return theurl, previewurl
+
+
+def set_show_artwork(show_info, list_item):
+ # type: (InfoType, ListItem) -> ListItem
+ """Set available images for a show"""
+ vtag = list_item.getVideoInfoTag()
+ for image_type, image_list in show_info.get('images', {}).items():
+ if image_type == 'backdrops':
+ fanart_list = []
+ for image in image_list:
+ theurl, previewurl = get_image_urls(image)
+ if image.get('iso_639_1') != None and settings.CATLANDSCAPE and theurl:
+ vtag.addAvailableArtwork(
+ theurl, arttype="landscape", preview=previewurl)
+ elif theurl:
+ fanart_list.append({'image': theurl})
+ if fanart_list:
+ list_item.setAvailableFanart(fanart_list)
+ else:
+ if image_type == 'posters':
+ destination = 'poster'
+ elif image_type == 'logos':
+ destination = 'clearlogo'
+ else:
+ destination = image_type
+ for image in image_list:
+ theurl, previewurl = get_image_urls(image)
+ if theurl:
+ vtag.addAvailableArtwork(
+ theurl, arttype=destination, preview=previewurl)
+ return list_item
+
+
+def add_main_show_info(list_item, show_info, full_info=True):
+ # type: (ListItem, InfoType, bool) -> ListItem
+ """Add main show info to a list item"""
+ vtag = list_item.getVideoInfoTag()
+ original_name = show_info.get('original_name')
+ if settings.KEEPTITLE and original_name:
+ showname = original_name
+ else:
+ showname = show_info['name']
+ plot = _clean_plot(safe_get(show_info, 'overview', ''))
+ vtag.setTitle(showname)
+ vtag.setOriginalTitle(original_name)
+ vtag.setTvShowTitle(showname)
+ vtag.setPlot(plot)
+ vtag.setPlotOutline(plot)
+ vtag.setMediaType('tvshow')
+ ext_ids = {'tmdb_id': show_info['id']}
+ ext_ids.update(show_info.get('external_ids', {}))
+ epguide_ids = _set_unique_ids(ext_ids, vtag)
+ vtag.setEpisodeGuide(json.dumps(epguide_ids))
+ if show_info.get('first_air_date'):
+ vtag.setYear(int(show_info['first_air_date'][:4]))
+ vtag.setPremiered(show_info['first_air_date'])
+ if full_info:
+ vtag.setTvShowStatus(safe_get(show_info, 'status', ''))
+ genre_list = safe_get(show_info, 'genres', {})
+ genres = []
+ for genre in genre_list:
+ genres.append(genre['name'])
+ vtag.setGenres(genres)
+ networks = show_info.get('networks', [])
+ if networks:
+ network = networks[0]
+ country = network.get('origin_country', '')
+ else:
+ network = None
+ country = None
+ if network and country and settings.STUDIOCOUNTRY:
+ vtag.setStudios(['{0} ({1})'.format(network['name'], country)])
+ elif network:
+ vtag.setStudios([network['name']])
+ if country:
+ vtag.setCountries([country])
+ content_ratings = show_info.get(
+ 'content_ratings', {}).get('results', {})
+ if content_ratings:
+ mpaa = ''
+ mpaa_backup = ''
+ for content_rating in content_ratings:
+ iso = content_rating.get('iso_3166_1', '').lower()
+ if iso == 'us':
+ mpaa_backup = content_rating.get('rating')
+ if iso == settings.CERT_COUNTRY.lower():
+ mpaa = content_rating.get('rating', '')
+ if not mpaa:
+ mpaa = mpaa_backup
+ if mpaa:
+ vtag.setMpaa(settings.CERT_PREFIX + mpaa)
+ vtag.setWriters(_get_credits(show_info))
+ if settings.ENABTRAILER:
+ trailer = _parse_trailer(show_info.get(
+ 'videos', {}).get('results', {}))
+ if trailer:
+ vtag.setTrailer(trailer)
+ list_item = set_show_artwork(show_info, list_item)
+ _add_season_info(show_info, vtag)
+ _set_cast(show_info['credits']['cast'], vtag)
+ _set_rating(show_info, vtag)
+ else:
+ image = show_info.get('poster_path', '')
+ if image and not image.endswith('.svg'):
+ theurl = settings.IMAGEROOTURL + image
+ previewurl = settings.PREVIEWROOTURL + image
+ vtag.addAvailableArtwork(
+ theurl, arttype='poster', preview=previewurl)
+ logger.debug('adding tv show information for %s to list item' % showname)
+ return list_item
+
+
+def add_episode_info(list_item, episode_info, full_info=True):
+ # type: (ListItem, InfoType, bool) -> ListItem
+ """Add episode info to a list item"""
+ title = episode_info.get('name', 'Episode ' +
+ str(episode_info['episode_number']))
+ vtag = list_item.getVideoInfoTag()
+ vtag.setTitle(title)
+ vtag.setSeason(episode_info['season_number'])
+ vtag.setEpisode(episode_info['episode_number'])
+ vtag.setMediaType('episode')
+ if safe_get(episode_info, 'air_date') is not None:
+ vtag.setFirstAired(episode_info['air_date'])
+ if full_info:
+ summary = safe_get(episode_info, 'overview')
+ if summary is not None:
+ plot = _clean_plot(summary)
+ vtag.setPlot(plot)
+ vtag.setPlotOutline(plot)
+ if safe_get(episode_info, 'air_date') is not None:
+ vtag.setPremiered(episode_info['air_date'])
+ duration = episode_info.get('runtime')
+ if duration:
+ videostream = VideoStreamDetail(duration=int(duration)*60)
+ vtag.addVideoStream(videostream)
+ _set_cast(
+ episode_info['season_cast'] + episode_info['credits']['guest_stars'], vtag)
+ ext_ids = {'tmdb_id': episode_info['id']}
+ ext_ids.update(episode_info.get('external_ids', {}))
+ _set_unique_ids(ext_ids, vtag)
+ _set_rating(episode_info, vtag)
+ for image in episode_info.get('images', {}).get('stills', []):
+ theurl, previewurl = get_image_urls(image)
+ if theurl:
+ vtag.addAvailableArtwork(
+ theurl, arttype='thumb', preview=previewurl)
+ vtag.setWriters(_get_credits(episode_info))
+ vtag.setDirectors(_get_directors(episode_info))
+ logger.debug('adding episode information for S%sE%s - %s to list item' %
+ (episode_info['season_number'], episode_info['episode_number'], title))
+ return list_item
+
+
+def parse_nfo_url(nfo):
+ # type: (Text) -> Optional[UrlParseResult]
+ """Extract show ID and named seasons from NFO file contents"""
+ # work around for xbmcgui.ListItem.addSeason overwriting named seasons from NFO files
+ ns_regex = r'<namedseason number="(.*)">(.*)</namedseason>'
+ ns_match = re.findall(ns_regex, nfo, re.I)
+ sid_match = None
+ ep_grouping = None
+ for regexp in SHOW_ID_REGEXPS:
+ logger.debug('trying regex to match service from parsing nfo:')
+ logger.debug(regexp)
+ show_id_match = re.search(regexp, nfo, re.I)
+ if show_id_match:
+ logger.debug('match group 1: ' + show_id_match.group(1))
+ logger.debug('match group 2: ' + show_id_match.group(2))
+ if show_id_match.group(1) == "themoviedb" or show_id_match.group(1) == "tmdb":
+ try:
+ ep_grouping = show_id_match.group(3)
+ except IndexError:
+ pass
+ tmdb_id = show_id_match.group(2)
+ else:
+ tmdb_id = _convert_ext_id(
+ show_id_match.group(1), show_id_match.group(2))
+ if tmdb_id:
+ logger.debug('match group 3: ' + str(ep_grouping))
+ sid_match = UrlParseResult('tmdb', tmdb_id, ep_grouping)
+ break
+ return sid_match, ns_match
+
+
+def _convert_ext_id(ext_provider, ext_id):
+ # type: (Text, Text) -> Text
+ """get a TMDb ID from an external ID"""
+ providers_dict = {'imdb': 'imdb_id',
+ 'thetvdb': 'tvdb_id',
+ 'tvdb': 'tvdb_id'}
+ show_url = FIND_URL.format(ext_id)
+ params = TMDB_PARAMS.copy()
+ provider = providers_dict.get(ext_provider)
+ if provider:
+ params['external_source'] = provider
+ show_info = api_utils.load_info(show_url, params=params)
+ else:
+ show_info = None
+ if show_info:
+ tv_results = show_info.get('tv_results')
+ if tv_results:
+ return tv_results[0].get('id')
+ return None
+
+
+def parse_media_id(title):
+ # type: (Text) -> Dict
+ """get the ID from a title and return with the type"""
+ title = title.lower()
+ if title.startswith('tt') and title[2:].isdigit():
+ # IMDB ID works alone because it is clear
+ return {'type': 'imdb_id', 'title': title}
+ # IMDB ID with prefix to match
+ elif title.startswith('imdb/tt') and title[7:].isdigit():
+ # IMDB ID works alone because it is clear
+ return {'type': 'imdb_id', 'title': title[5:]}
+ elif title.startswith('tmdb/') and title[5:].isdigit(): # TMDB ID
+ return {'type': 'tmdb_id', 'title': title[5:]}
+ elif title.startswith('tvdb/') and title[5:].isdigit(): # TVDB ID
+ return {'type': 'tvdb_id', 'title': title[5:]}
+ return None
+
+
+def _parse_trailer(results):
+ # type: (Text) -> Text
+ """create a valid Tubed or YouTube plugin trailer URL"""
+ if results:
+ if settings.PLAYERSOPT == 'tubed':
+ addon_player = 'plugin://plugin.video.tubed/?mode=play&video_id='
+ elif settings.PLAYERSOPT == 'youtube':
+ addon_player = 'plugin://plugin.video.youtube/?action=play_video&videoid='
+ backup_keys = []
+ for video_lang in [settings.LANG[0:2], 'en']:
+ for result in results:
+ if result.get('site') == 'YouTube' and result.get('iso_639_1') == video_lang:
+ key = result.get('key')
+ if result.get('type') == 'Trailer':
+ if _check_youtube(key):
+ # video is available and is defined as "Trailer" by TMDB. Perfect link!
+ return addon_player+key
+ else:
+ # video is available, but NOT defined as "Trailer" by TMDB. Saving it as backup in case it doesn't find any perfect link.
+ backup_keys.append(key)
+ for keybackup in backup_keys:
+ if _check_youtube(keybackup):
+ return addon_player+keybackup
+ return None
+
+
+def _check_youtube(key):
+ # type: (Text) -> bool
+ """check to see if the YouTube key returns a valid link"""
+ chk_link = "https://www.youtube.com/watch?v="+key
+ check = api_utils.load_info(chk_link, resp_type='not_json')
+ if not check or "Video unavailable" in check: # video not available
+ return False
+ return True