summaryrefslogtreecommitdiffstats
path: root/addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py
diff options
context:
space:
mode:
Diffstat (limited to 'addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py')
-rw-r--r--addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py466
1 files changed, 466 insertions, 0 deletions
diff --git a/addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py b/addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py
new file mode 100644
index 0000000..3b5019f
--- /dev/null
+++ b/addons/metadata.tvshows.themoviedb.org.python/libs/tmdb.py
@@ -0,0 +1,466 @@
+# -*- coding: UTF-8 -*-
+#
+# Copyright (C) 2020, Team Kodi
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+# pylint: disable=missing-docstring
+
+"""Functions to interact with TMDb API"""
+
+from __future__ import absolute_import, unicode_literals
+
+import unicodedata
+from math import floor
+from pprint import pformat
+from . import cache, data_utils, api_utils, settings, imdbratings, traktratings
+from .utils import logger
+try:
+ from typing import Text, Optional, Union, List, Dict, Any # pylint: disable=unused-import
+ InfoType = Dict[Text, Any] # pylint: disable=invalid-name
+except ImportError:
+ pass
+
+HEADERS = (
+ ('User-Agent', 'Kodi TV Show scraper by Team Kodi; contact pkscout@kodi.tv'),
+ ('Accept', 'application/json'),
+)
+api_utils.set_headers(dict(HEADERS))
+
+TMDB_PARAMS = {'api_key': settings.TMDB_CLOWNCAR, 'language': settings.LANG}
+BASE_URL = 'https://api.themoviedb.org/3/{}'
+EPISODE_GROUP_URL = BASE_URL.format('tv/episode_group/{}')
+SEARCH_URL = BASE_URL.format('search/tv')
+FIND_URL = BASE_URL.format('find/{}')
+SHOW_URL = BASE_URL.format('tv/{}')
+SEASON_URL = BASE_URL.format('tv/{}/season/{}')
+EPISODE_URL = BASE_URL.format('tv/{}/season/{}/episode/{}')
+FANARTTV_URL = 'https://webservice.fanart.tv/v3/tv/{}'
+FANARTTV_PARAMS = {'api_key': settings.FANARTTV_CLOWNCAR}
+if settings.FANARTTV_CLIENTKEY:
+ FANARTTV_PARAMS['client_key'] = settings.FANARTTV_CLIENTKEY
+
+
+def search_show(title, year=None):
+ # type: (Text, Text) -> List
+ """
+ Search for a single TV show
+
+ :param title: TV show title to search
+ : param year: the year to search (optional)
+ :return: a list with found TV shows
+ """
+ params = TMDB_PARAMS.copy()
+ results = []
+ ext_media_id = data_utils.parse_media_id(title)
+ if ext_media_id:
+ logger.debug('using %s of %s to find show' %
+ (ext_media_id['type'], ext_media_id['title']))
+ if ext_media_id['type'] == 'tmdb_id':
+ search_url = SHOW_URL.format(ext_media_id['title'])
+ else:
+ search_url = FIND_URL.format(ext_media_id['title'])
+ params['external_source'] = ext_media_id['type']
+ else:
+ logger.debug('using title of %s to find show' % title)
+ search_url = SEARCH_URL
+ params['query'] = unicodedata.normalize('NFKC', title)
+ if year:
+ params['first_air_date_year'] = str(year)
+ resp = api_utils.load_info(
+ search_url, params=params, verboselog=settings.VERBOSELOG)
+ if resp is not None:
+ if ext_media_id:
+ if ext_media_id['type'] == 'tmdb_id':
+ if resp.get('success') == 'false':
+ results = []
+ else:
+ results = [resp]
+ else:
+ results = resp.get('tv_results', [])
+ else:
+ results = resp.get('results', [])
+ return results
+
+
+def load_episode_list(show_info, season_map, ep_grouping):
+ # type: (InfoType, Dict, Text) -> Optional[InfoType]
+ """get the IMDB ratings details"""
+ """Load episode list from themoviedb.org API"""
+ episode_list = []
+ if ep_grouping is not None:
+ logger.debug(
+ 'Getting episodes with episode grouping of ' + ep_grouping)
+ episode_group_url = EPISODE_GROUP_URL.format(ep_grouping)
+ custom_order = api_utils.load_info(
+ episode_group_url, params=TMDB_PARAMS, verboselog=settings.VERBOSELOG)
+ if custom_order is not None:
+ show_info['seasons'] = []
+ for custom_season in custom_order.get('groups', []):
+ season_episodes = []
+ try:
+ current_season = season_map.get(
+ str(custom_season['episodes'][0]['season_number']), {}).copy()
+ except IndexError:
+ continue
+ current_season['name'] = custom_season['name']
+ current_season['season_number'] = custom_season['order']
+ for episode in custom_season['episodes']:
+ episode['org_seasonnum'] = episode['season_number']
+ episode['org_epnum'] = episode['episode_number']
+ episode['season_number'] = custom_season['order']
+ episode['episode_number'] = episode['order'] + 1
+ season_episodes.append(episode)
+ episode_list.append(episode)
+ current_season['episodes'] = season_episodes
+ show_info['seasons'].append(current_season)
+ else:
+ logger.debug('Getting episodes from standard season list')
+ show_info['seasons'] = []
+ for key, value in season_map.items():
+ show_info['seasons'].append(value)
+ for season in show_info.get('seasons', []):
+ for episode in season.get('episodes', []):
+ episode['org_seasonnum'] = episode['season_number']
+ episode['org_epnum'] = episode['episode_number']
+ episode_list.append(episode)
+ show_info['episodes'] = episode_list
+ return show_info
+
+
+def load_show_info(show_id, ep_grouping=None, named_seasons=None):
+ # type: (Text, Text, Dict) -> Optional[InfoType]
+ """
+ Get full info for a single show
+
+ :param show_id: themoviedb.org show ID
+ :param ep_grouping: the episode group from TMDb
+ :param named_seasons: the named seasons from the NFO file
+ :return: show info or None
+ """
+ if named_seasons == None:
+ named_seasons = []
+ show_info = cache.load_show_info_from_cache(show_id)
+ if show_info is None:
+ logger.debug('no cache file found, loading from scratch')
+ show_url = SHOW_URL.format(show_id)
+ params = TMDB_PARAMS.copy()
+ params['append_to_response'] = 'credits,content_ratings,external_ids,images,videos'
+ params['include_image_language'] = '%s,en,null' % settings.LANG[0:2]
+ params['include_video_language'] = '%s,en,null' % settings.LANG[0:2]
+ show_info = api_utils.load_info(
+ show_url, params=params, verboselog=settings.VERBOSELOG)
+ if show_info is None:
+ return None
+ if show_info['overview'] == '' and settings.LANG != 'en-US':
+ params['language'] = 'en-US'
+ del params['append_to_response']
+ show_info_backup = api_utils.load_info(
+ show_url, params=params, verboselog=settings.VERBOSELOG)
+ if show_info_backup is not None:
+ show_info['overview'] = show_info_backup.get('overview', '')
+ params['language'] = settings.LANG
+ season_map = {}
+ params['append_to_response'] = 'credits,images'
+ for season in show_info.get('seasons', []):
+ season_url = SEASON_URL.format(
+ show_id, season.get('season_number', 0))
+ season_info = api_utils.load_info(
+ season_url, params=params, default={}, verboselog=settings.VERBOSELOG)
+ if (season_info.get('overview', '') == '' or season_info.get('name', '').lower().startswith('season')) and settings.LANG != 'en-US':
+ params['language'] = 'en-US'
+ season_info_backup = api_utils.load_info(
+ season_url, params=params, default={}, verboselog=settings.VERBOSELOG)
+ params['language'] = settings.LANG
+ if season_info.get('overview', '') == '':
+ season_info['overview'] = season_info_backup.get(
+ 'overview', '')
+ if season_info.get('name', '').lower().startswith('season'):
+ season_info['name'] = season_info_backup.get('name', '')
+ # this is part of a work around for xbmcgui.ListItem.addSeasons() not respecting NFO file information
+ for named_season in named_seasons:
+ if str(named_season[0]) == str(season.get('season_number')):
+ logger.debug('adding season name of %s from named seasons in NFO for season %s' % (
+ named_season[1], season['season_number']))
+ season_info['name'] = named_season[1]
+ break
+ # end work around
+ season_info['images'] = _sort_image_types(
+ season_info.get('images', {}))
+ season_map[str(season.get('season_number', 0))] = season_info
+ show_info = load_episode_list(show_info, season_map, ep_grouping)
+ show_info['ratings'] = load_ratings(show_info)
+ show_info = load_fanarttv_art(show_info)
+ show_info['images'] = _sort_image_types(show_info.get('images', {}))
+ show_info = trim_artwork(show_info)
+ cast_check = []
+ cast = []
+ for season in reversed(show_info.get('seasons', [])):
+ for cast_member in season.get('credits', {}).get('cast', []):
+ if cast_member.get('name', '') not in cast_check:
+ cast.append(cast_member)
+ cast_check.append(cast_member.get('name', ''))
+ show_info['credits']['cast'] = cast
+ logger.debug('saving show info to the cache')
+ if settings.VERBOSELOG:
+ logger.debug(format(pformat(show_info)))
+ cache.cache_show_info(show_info)
+ else:
+ logger.debug('using cached show info')
+ return show_info
+
+
+def load_episode_info(show_id, episode_id):
+ # type: (Text, Text) -> Optional[InfoType]
+ """
+ Load episode info
+
+ :param show_id:
+ :param episode_id:
+ :return: episode info or None
+ """
+ show_info = load_show_info(show_id)
+ if show_info is not None:
+ try:
+ episode_info = show_info['episodes'][int(episode_id)]
+ except KeyError:
+ return None
+ # this ensures we are using the season/ep from the episode grouping if provided
+ ep_url = EPISODE_URL.format(
+ show_info['id'], episode_info['org_seasonnum'], episode_info['org_epnum'])
+ params = TMDB_PARAMS.copy()
+ params['append_to_response'] = 'credits,external_ids,images'
+ params['include_image_language'] = '%s,en,null' % settings.LANG[0:2]
+ ep_return = api_utils.load_info(
+ ep_url, params=params, verboselog=settings.VERBOSELOG)
+ if ep_return is None:
+ return None
+ bad_return_name = False
+ bad_return_overview = False
+ check_name = ep_return.get('name')
+ if check_name == None:
+ bad_return_name = True
+ ep_return['name'] = 'Episode ' + \
+ str(episode_info['episode_number'])
+ elif check_name.lower().startswith('episode') or check_name == '':
+ bad_return_name = True
+ if ep_return.get('overview', '') == '':
+ bad_return_overview = True
+ if (bad_return_overview or bad_return_name) and settings.LANG != 'en-US':
+ params['language'] = 'en-US'
+ del params['append_to_response']
+ ep_return_backup = api_utils.load_info(
+ ep_url, params=params, verboselog=settings.VERBOSELOG)
+ if ep_return_backup is not None:
+ if bad_return_overview:
+ ep_return['overview'] = ep_return_backup.get(
+ 'overview', '')
+ if bad_return_name:
+ ep_return['name'] = ep_return_backup.get(
+ 'name', 'Episode ' + str(episode_info['episode_number']))
+ ep_return['images'] = _sort_image_types(ep_return.get('images', {}))
+ ep_return['season_number'] = episode_info['season_number']
+ ep_return['episode_number'] = episode_info['episode_number']
+ ep_return['org_seasonnum'] = episode_info['org_seasonnum']
+ ep_return['org_epnum'] = episode_info['org_epnum']
+ ep_return['ratings'] = load_ratings(
+ ep_return, show_imdb_id=show_info.get('external_ids', {}).get('imdb_id'))
+ for season in show_info.get('seasons', []):
+ if season.get('season_number') == episode_info['season_number']:
+ ep_return['season_cast'] = season.get(
+ 'credits', {}).get('cast', [])
+ break
+ show_info['episodes'][int(episode_id)] = ep_return
+ cache.cache_show_info(show_info)
+ return ep_return
+ return None
+
+
+def load_ratings(the_info, show_imdb_id=''):
+ # type: (InfoType, Text) -> Dict
+ """
+ Load the ratings for the show/episode
+
+ :param the_info: show or episode info
+ :param show_imdb_id: show IMDB
+ :return: ratings or empty dict
+ """
+ ratings = {}
+ imdb_id = the_info.get('external_ids', {}).get('imdb_id')
+ for rating_type in settings.RATING_TYPES:
+ logger.debug('setting rating using %s' % rating_type)
+ if rating_type == 'tmdb':
+ ratings['tmdb'] = {'votes': the_info['vote_count'],
+ 'rating': the_info['vote_average']}
+ elif rating_type == 'imdb' and imdb_id:
+ imdb_rating = imdbratings.get_details(imdb_id).get('ratings')
+ if imdb_rating:
+ ratings.update(imdb_rating)
+ elif rating_type == 'trakt':
+ if show_imdb_id:
+ season = the_info['org_seasonnum']
+ episode = the_info['org_epnum']
+ resp = traktratings.get_details(
+ show_imdb_id, season=season, episode=episode)
+ else:
+ resp = traktratings.get_details(imdb_id)
+ trakt_rating = resp.get('ratings')
+ if trakt_rating:
+ ratings.update(trakt_rating)
+ logger.debug('returning ratings of\n{}'.format(pformat(ratings)))
+ return ratings
+
+
+def load_fanarttv_art(show_info):
+ # type: (InfoType) -> Optional[InfoType]
+ """
+ Add fanart.tv images for a show
+
+ :param show_info: the current show info
+ :return: show info
+ """
+ tvdb_id = show_info.get('external_ids', {}).get('tvdb_id')
+ if tvdb_id and settings.FANARTTV_ENABLE:
+ fanarttv_url = FANARTTV_URL.format(tvdb_id)
+ artwork = api_utils.load_info(
+ fanarttv_url, params=FANARTTV_PARAMS, verboselog=settings.VERBOSELOG)
+ if artwork is None:
+ return show_info
+ for fanarttv_type, tmdb_type in settings.FANARTTV_MAPPING.items():
+ if not show_info['images'].get(tmdb_type) and not tmdb_type.startswith('season'):
+ show_info['images'][tmdb_type] = []
+ for item in artwork.get(fanarttv_type, []):
+ lang = item.get('lang')
+ if lang == '' or lang == '00':
+ lang = None
+ filepath = ''
+ if lang is None or lang == settings.LANG[0:2] or lang == 'en':
+ filepath = item.get('url')
+ if filepath:
+ if tmdb_type.startswith('season'):
+ image_type = tmdb_type[6:]
+ for s in range(len(show_info.get('seasons', []))):
+ season_num = show_info['seasons'][s]['season_number']
+ artseason = item.get('season', '')
+ if not show_info['seasons'][s].get('images'):
+ show_info['seasons'][s]['images'] = {}
+ if not show_info['seasons'][s]['images'].get(image_type):
+ show_info['seasons'][s]['images'][image_type] = []
+ if artseason == '' or artseason == str(season_num):
+ show_info['seasons'][s]['images'][image_type].append(
+ {'file_path': filepath, 'type': 'fanarttv', 'iso_639_1': lang})
+ else:
+ show_info['images'][tmdb_type].append(
+ {'file_path': filepath, 'type': 'fanarttv', 'iso_639_1': lang})
+ return show_info
+
+
+def trim_artwork(show_info):
+ # type: (InfoType) -> Optional[InfoType]
+ """
+ Trim artwork to keep the text blob below 65K characters
+
+ :param show_info: the current show info
+ :return: show info
+ """
+ image_counts = {}
+ image_total = 0
+ backdrops_total = 0
+ for image_type, image_list in show_info.get('images', {}).items():
+ total = len(image_list)
+ if image_type == 'backdrops':
+ backdrops_total = backdrops_total + total
+ else:
+ image_counts[image_type] = {'total': total}
+ image_total = image_total + total
+ for season in show_info.get('seasons', []):
+ for image_type, image_list in season.get('images', {}).items():
+ total = len(image_list)
+ thetype = '%s_%s' % (str(season['season_number']), image_type)
+ image_counts[thetype] = {'total': total}
+ image_total = image_total + total
+ if image_total <= settings.MAXIMAGES and backdrops_total <= settings.MAXIMAGES:
+ return show_info
+ if backdrops_total > settings.MAXIMAGES:
+ logger.error('there are %s fanart images' % str(backdrops_total))
+ logger.error('that is more than the max of %s, image results will be trimmed to the max' % str(
+ settings.MAXIMAGES))
+ reduce = -1 * (backdrops_total - settings.MAXIMAGES)
+ del show_info['images']['backdrops'][reduce:]
+ if image_total > settings.MAXIMAGES:
+ reduction = (image_total - settings.MAXIMAGES)/image_total
+ logger.error('there are %s non-fanart images' % str(image_total))
+ logger.error('that is more than the max of %s, image results will be trimmed by %s' % (
+ str(settings.MAXIMAGES), str(reduction)))
+ for key, value in image_counts.items():
+ image_counts[key]['reduce'] = -1 * \
+ int(floor(value['total'] * reduction))
+ logger.debug('%s: %s' % (key, pformat(image_counts[key])))
+ for image_type, image_list in show_info.get('images', {}).items():
+ if image_type == 'backdrops':
+ continue # already handled backdrops above
+ reduce = image_counts[image_type]['reduce']
+ if reduce != 0:
+ del show_info['images'][image_type][reduce:]
+ for s in range(len(show_info.get('seasons', []))):
+ for image_type, image_list in show_info['seasons'][s].get('images', {}).items():
+ thetype = '%s_%s' % (
+ str(show_info['seasons'][s]['season_number']), image_type)
+ reduce = image_counts[thetype]['reduce']
+ if reduce != 0:
+ del show_info['seasons'][s]['images'][image_type][reduce:]
+ return show_info
+
+
+def _sort_image_types(imagelist):
+ # type: (Dict) -> Dict
+ """
+ sort the images by language
+
+ :param imagelist:
+ :return: imagelist
+ """
+ for image_type, images in imagelist.items():
+ imagelist[image_type] = _image_sort(images, image_type)
+ return imagelist
+
+
+def _image_sort(images, image_type):
+ # type: (List, Text) -> List
+ """
+ sort the images by language
+
+ :param images:
+ :param image_type:
+ :return: list of images
+ """
+ lang_pref = []
+ lang_null = []
+ lang_en = []
+ firstimage = True
+ for image in images:
+ image_lang = image.get('iso_639_1')
+ if image_lang == settings.LANG[0:2]:
+ lang_pref.append(image)
+ elif image_lang == 'en':
+ lang_en.append(image)
+ else:
+ if firstimage:
+ lang_pref.append(image)
+ else:
+ lang_null.append(image)
+ firstimage = False
+ if image_type == 'posters':
+ return lang_pref + lang_en + lang_null
+ else:
+ return lang_pref + lang_null + lang_en