diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-19 15:05:49 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-19 15:05:49 +0000 |
commit | d395bd510fa4f4376dc5237ab2f8d190a920d35d (patch) | |
tree | 3e3b16b44064938be801aede14381562bae14f6a /deluge/common.py | |
parent | Adding upstream version 2.0.3. (diff) | |
download | deluge-d395bd510fa4f4376dc5237ab2f8d190a920d35d.tar.xz deluge-d395bd510fa4f4376dc5237ab2f8d190a920d35d.zip |
Adding upstream version 2.1.1.upstream/2.1.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'deluge/common.py')
-rw-r--r-- | deluge/common.py | 327 |
1 files changed, 169 insertions, 158 deletions
diff --git a/deluge/common.py b/deluge/common.py index 26e1605..ecf90a3 100644 --- a/deluge/common.py +++ b/deluge/common.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2007,2008 Andrew Resch <andrewresch@gmail.com> # @@ -8,25 +7,25 @@ # """Common functions for various parts of Deluge to use.""" -from __future__ import division, print_function, unicode_literals - import base64 import binascii import functools import glob -import locale import logging import numbers import os import platform import re +import socket import subprocess import sys import tarfile import time from contextlib import closing from datetime import datetime -from io import BytesIO, open +from io import BytesIO +from urllib.parse import unquote_plus, urljoin +from urllib.request import pathname2url import pkg_resources @@ -38,14 +37,6 @@ try: except ImportError: chardet = None -try: - from urllib.parse import unquote_plus, urljoin - from urllib.request import pathname2url -except ImportError: - # PY2 fallback - from urlparse import urljoin # pylint: disable=ungrouped-imports - from urllib import pathname2url, unquote_plus # pylint: disable=ungrouped-imports - # Windows workaround for HTTPS requests requiring certificate authority bundle. # see: https://twistedmatrix.com/trac/ticket/9209 if platform.system() in ('Windows', 'Microsoft'): @@ -53,6 +44,11 @@ if platform.system() in ('Windows', 'Microsoft'): os.environ['SSL_CERT_FILE'] = where() +try: + import ifaddr +except ImportError: + ifaddr = None + if platform.system() not in ('Windows', 'Microsoft', 'Darwin'): # gi makes dbus available on Window but don't import it as unused. @@ -81,7 +77,11 @@ TORRENT_STATE = [ # The output formatting for json.dump JSON_FORMAT = {'indent': 4, 'sort_keys': True, 'ensure_ascii': False} -PY2 = sys.version_info.major == 2 +DBUS_FM_ID = 'org.freedesktop.FileManager1' +DBUS_FM_PATH = '/org/freedesktop/FileManager1' + +# Retained for plugin backward compatibility +PY2 = False def get_version(): @@ -108,10 +108,8 @@ def get_default_config_dir(filename=None): def save_config_path(resource): app_data_path = os.environ.get('APPDATA') if not app_data_path: - try: - import winreg - except ImportError: - import _winreg as winreg # For Python 2. + import winreg + hkey = winreg.OpenKey( winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\Shell Folders', @@ -144,14 +142,14 @@ def get_default_download_dir(): try: user_dirs_path = os.path.join(xdg_config_home, 'user-dirs.dirs') - with open(user_dirs_path, 'r', encoding='utf8') as _file: + with open(user_dirs_path, encoding='utf8') as _file: for line in _file: if not line.startswith('#') and line.startswith('XDG_DOWNLOAD_DIR'): download_dir = os.path.expandvars( line.partition('=')[2].rstrip().strip('"') ) break - except IOError: + except OSError: pass if not download_dir: @@ -175,8 +173,8 @@ def archive_files(arc_name, filepaths, message=None, rotate=10): from deluge.configmanager import get_config_dir - # Set archive compression to lzma with bz2 fallback. - arc_comp = 'xz' if not PY2 else 'bz2' + # Set archive compression to lzma + arc_comp = 'xz' archive_dir = os.path.join(get_config_dir(), 'archive') timestamp = datetime.now().replace(microsecond=0).isoformat().replace(':', '-') @@ -272,7 +270,7 @@ def get_os_version(): os_version = list(platform.mac_ver()) os_version[1] = '' # versioninfo always empty. elif distro: - os_version = distro.linux_distribution() + os_version = (distro.name(), distro.version(), distro.codename()) else: os_version = (platform.release(),) @@ -355,27 +353,30 @@ def show_file(path, timestamp=None): timestamp, timestamp, ) + if dbus: bus = dbus.SessionBus() - filemanager1 = bus.get_object( - 'org.freedesktop.FileManager1', '/org/freedesktop/FileManager1' - ) - paths = [urljoin('file:', pathname2url(path))] - filemanager1.ShowItems( - paths, startup_id, dbus_interface='org.freedesktop.FileManager1' - ) - else: - env = os.environ.copy() - env['DESKTOP_STARTUP_ID'] = startup_id.replace('dbus', 'xdg-open') - # No option in xdg to highlight a file so just open parent folder. - subprocess.Popen(['xdg-open', os.path.dirname(path.rstrip('/'))], env=env) + try: + filemanager1 = bus.get_object(DBUS_FM_ID, DBUS_FM_PATH) + except dbus.exceptions.DBusException as ex: + log.debug('Unable to get dbus file manager: %s', ex) + # Fallback to xdg-open + else: + paths = [urljoin('file:', pathname2url(path))] + filemanager1.ShowItems(paths, startup_id, dbus_interface=DBUS_FM_ID) + return + + env = os.environ.copy() + env['DESKTOP_STARTUP_ID'] = startup_id.replace('dbus', 'xdg-open') + # No option in xdg to highlight a file so just open parent folder. + subprocess.Popen(['xdg-open', os.path.dirname(path.rstrip('/'))], env=env) def open_url_in_browser(url): """ - Opens a url in the desktop's default browser + Opens a URL in the desktop's default browser - :param url: the url to open + :param url: the URL to open :type url: string """ @@ -430,27 +431,27 @@ def fsize(fsize_b, precision=1, shortform=False): '110 KiB' Note: - This function has been refactored for perfomance with the + This function has been refactored for performance with the fsize units being translated outside the function. """ - if fsize_b >= 1024 ** 4: + if fsize_b >= 1024**4: return '%.*f %s' % ( precision, - fsize_b / 1024 ** 4, + fsize_b / 1024**4, tib_txt_short if shortform else tib_txt, ) - elif fsize_b >= 1024 ** 3: + elif fsize_b >= 1024**3: return '%.*f %s' % ( precision, - fsize_b / 1024 ** 3, + fsize_b / 1024**3, gib_txt_short if shortform else gib_txt, ) - elif fsize_b >= 1024 ** 2: + elif fsize_b >= 1024**2: return '%.*f %s' % ( precision, - fsize_b / 1024 ** 2, + fsize_b / 1024**2, mib_txt_short if shortform else mib_txt, ) elif fsize_b >= 1024: @@ -502,28 +503,28 @@ def fspeed(bps, precision=1, shortform=False): """ - if bps < 1024 ** 2: + if bps < 1024**2: return '%.*f %s' % ( precision, bps / 1024, _('K/s') if shortform else _('KiB/s'), ) - elif bps < 1024 ** 3: + elif bps < 1024**3: return '%.*f %s' % ( precision, - bps / 1024 ** 2, + bps / 1024**2, _('M/s') if shortform else _('MiB/s'), ) - elif bps < 1024 ** 4: + elif bps < 1024**4: return '%.*f %s' % ( precision, - bps / 1024 ** 3, + bps / 1024**3, _('G/s') if shortform else _('GiB/s'), ) else: return '%.*f %s' % ( precision, - bps / 1024 ** 4, + bps / 1024**4, _('T/s') if shortform else _('TiB/s'), ) @@ -546,9 +547,9 @@ def fpeer(num_peers, total_peers): """ if total_peers > -1: - return '{:d} ({:d})'.format(num_peers, total_peers) + return f'{num_peers:d} ({total_peers:d})' else: - return '{:d}'.format(num_peers) + return f'{num_peers:d}' def ftime(secs): @@ -565,7 +566,7 @@ def ftime(secs): '6h 23m' Note: - This function has been refactored for perfomance. + This function has been refactored for performance. """ @@ -574,17 +575,17 @@ def ftime(secs): if secs <= 0: time_str = '' elif secs < 60: - time_str = '{}s'.format(secs) + time_str = f'{secs}s' elif secs < 3600: - time_str = '{}m {}s'.format(secs // 60, secs % 60) + time_str = f'{secs // 60}m {secs % 60}s' elif secs < 86400: - time_str = '{}h {}m'.format(secs // 3600, secs // 60 % 60) + time_str = f'{secs // 3600}h {secs // 60 % 60}m' elif secs < 604800: - time_str = '{}d {}h'.format(secs // 86400, secs // 3600 % 24) + time_str = f'{secs // 86400}d {secs // 3600 % 24}h' elif secs < 31449600: - time_str = '{}w {}d'.format(secs // 604800, secs // 86400 % 7) + time_str = f'{secs // 604800}w {secs // 86400 % 7}d' else: - time_str = '{}y {}w'.format(secs // 31449600, secs // 604800 % 52) + time_str = f'{secs // 31449600}y {secs // 604800 % 52}w' return time_str @@ -638,17 +639,17 @@ def tokenize(text): size_units = [ {'prefix': 'b', 'divider': 1, 'singular': 'byte', 'plural': 'bytes'}, - {'prefix': 'KiB', 'divider': 1024 ** 1}, - {'prefix': 'MiB', 'divider': 1024 ** 2}, - {'prefix': 'GiB', 'divider': 1024 ** 3}, - {'prefix': 'TiB', 'divider': 1024 ** 4}, - {'prefix': 'PiB', 'divider': 1024 ** 5}, - {'prefix': 'KB', 'divider': 1000 ** 1}, - {'prefix': 'MB', 'divider': 1000 ** 2}, - {'prefix': 'GB', 'divider': 1000 ** 3}, - {'prefix': 'TB', 'divider': 1000 ** 4}, - {'prefix': 'PB', 'divider': 1000 ** 5}, - {'prefix': 'm', 'divider': 1000 ** 2}, + {'prefix': 'KiB', 'divider': 1024**1}, + {'prefix': 'MiB', 'divider': 1024**2}, + {'prefix': 'GiB', 'divider': 1024**3}, + {'prefix': 'TiB', 'divider': 1024**4}, + {'prefix': 'PiB', 'divider': 1024**5}, + {'prefix': 'KB', 'divider': 1000**1}, + {'prefix': 'MB', 'divider': 1000**2}, + {'prefix': 'GB', 'divider': 1000**3}, + {'prefix': 'TB', 'divider': 1000**4}, + {'prefix': 'PB', 'divider': 1000**5}, + {'prefix': 'm', 'divider': 1000**2}, ] @@ -695,7 +696,7 @@ def is_url(url): """ A simple test to check if the URL is valid - :param url: the url to test + :param url: the URL to test :type url: string :returns: True or False :rtype: bool @@ -706,6 +707,9 @@ def is_url(url): True """ + if not url: + return False + return url.partition('://')[0] in ('http', 'https', 'ftp', 'udp') @@ -720,6 +724,9 @@ def is_infohash(infohash): bool: True if valid infohash, False otherwise. """ + if not infohash: + return False + return len(infohash) == 40 and infohash.isalnum() @@ -727,13 +734,15 @@ MAGNET_SCHEME = 'magnet:?' XT_BTIH_PARAM = 'xt=urn:btih:' DN_PARAM = 'dn=' TR_PARAM = 'tr=' +TR_TIER_PARAM = 'tr.' +TR_TIER_REGEX = re.compile(r'^tr.(\d+)=(\S+)') def is_magnet(uri): """ - A check to determine if a uri is a valid bittorrent magnet uri + A check to determine if a URI is a valid bittorrent magnet URI - :param uri: the uri to check + :param uri: the URI to check :type uri: string :returns: True or False :rtype: bool @@ -769,8 +778,6 @@ def get_magnet_info(uri): """ - tr0_param = 'tr.' - tr0_param_regex = re.compile(r'^tr.(\d+)=(\S+)') if not uri.startswith(MAGNET_SCHEME): return {} @@ -798,12 +805,14 @@ def get_magnet_info(uri): tracker = unquote_plus(param[len(TR_PARAM) :]) trackers[tracker] = tier tier += 1 - elif param.startswith(tr0_param): - try: - tier, tracker = re.match(tr0_param_regex, param).groups() - trackers[tracker] = tier - except AttributeError: - pass + elif param.startswith(TR_TIER_PARAM): + tracker_match = re.match(TR_TIER_REGEX, param) + if not tracker_match: + continue + + tier, tracker = tracker_match.groups() + tracker = unquote_plus(tracker) + trackers[tracker] = int(tier) if info_hash: if not name: @@ -819,7 +828,7 @@ def get_magnet_info(uri): def create_magnet_uri(infohash, name=None, trackers=None): - """Creates a magnet uri + """Creates a magnet URI Args: infohash (str): The info-hash of the torrent. @@ -827,7 +836,7 @@ def create_magnet_uri(infohash, name=None, trackers=None): trackers (list or dict, optional): A list of trackers or dict or {tracker: tier} pairs. Returns: - str: A magnet uri string. + str: A magnet URI string. """ try: @@ -898,6 +907,29 @@ def free_space(path): return disk_data.f_bavail * block_size +def is_interface(interface): + """Check if interface is a valid IP or network adapter. + + Args: + interface (str): The IP or interface name to test. + + Returns: + bool: Whether interface is valid is not. + + Examples: + Windows: + >>> is_interface('{7A30AE62-23ZA-3744-Z844-A5B042524871}') + >>> is_interface('127.0.0.1') + True + Linux: + >>> is_interface('lo') + >>> is_interface('127.0.0.1') + True + + """ + return is_ip(interface) or is_interface_name(interface) + + def is_ip(ip): """A test to see if 'ip' is a valid IPv4 or IPv6 address. @@ -933,15 +965,12 @@ def is_ipv4(ip): """ - import socket - try: - if windows_check(): - return socket.inet_aton(ip) - else: - return socket.inet_pton(socket.AF_INET, ip) - except socket.error: + socket.inet_pton(socket.AF_INET, ip) + except OSError: return False + else: + return True def is_ipv6(ip): @@ -960,23 +989,51 @@ def is_ipv6(ip): """ try: - import ipaddress - except ImportError: - import socket - - try: - return socket.inet_pton(socket.AF_INET6, ip) - except (socket.error, AttributeError): - if windows_check(): - log.warning('Unable to verify IPv6 Address on Windows.') - return True + socket.inet_pton(socket.AF_INET6, ip) + except OSError: + return False else: + return True + + +def is_interface_name(name): + """Returns True if an interface name exists. + + Args: + name (str): The Interface to test. eg. eth0 linux. GUID on Windows. + + Returns: + bool: Whether name is valid or not. + + Examples: + >>> is_interface_name("eth0") + True + >>> is_interface_name("{7A30AE62-23ZA-3744-Z844-A5B042524871}") + True + + """ + + if not windows_check(): try: - return ipaddress.IPv6Address(decode_bytes(ip)) - except ipaddress.AddressValueError: + socket.if_nametoindex(name) + except OSError: pass + else: + return True + + if ifaddr: + try: + adapters = ifaddr.get_adapters() + except OSError: + return True + else: + return any([name == a.name for a in adapters]) + + if windows_check(): + regex = '^{[0-9A-Z]{8}-([0-9A-Z]{4}-){3}[0-9A-Z]{12}}$' + return bool(re.search(regex, str(name))) - return False + return True def decode_bytes(byte_str, encoding='utf8'): @@ -1007,9 +1064,9 @@ def decode_bytes(byte_str, encoding='utf8'): if encoding.lower() not in ['utf8', 'utf-8']: encodings.insert(0, lambda: (encoding, 'strict')) - for l in encodings: + for enc in encodings: try: - return byte_str.decode(*l()) + return byte_str.decode(*enc()) except UnicodeDecodeError: pass return '' @@ -1054,7 +1111,7 @@ def utf8_encode_structure(data): @functools.total_ordering -class VersionSplit(object): +class VersionSplit: """ Used for comparing version numbers. @@ -1138,6 +1195,7 @@ AUTH_LEVEL_DEFAULT = AUTH_LEVEL_NORMAL def create_auth_file(): import stat + import deluge.configmanager auth_file = deluge.configmanager.get_config_dir('auth') @@ -1153,6 +1211,7 @@ def create_auth_file(): def create_localclient_account(append=False): import random from hashlib import sha1 as sha + import deluge.configmanager auth_file = deluge.configmanager.get_config_dir('auth') @@ -1175,7 +1234,7 @@ def create_localclient_account(append=False): def get_localhost_auth(): - """Grabs the localclient auth line from the 'auth' file and creates a localhost uri. + """Grabs the localclient auth line from the 'auth' file and creates a localhost URI. Returns: tuple: With the username and password to login as. @@ -1231,15 +1290,10 @@ def set_env_variable(name, value): http://sourceforge.net/p/gramps/code/HEAD/tree/branches/maintenance/gramps32/src/TransUtils.py """ # Update Python's copy of the environment variables - try: - os.environ[name] = value - except UnicodeEncodeError: - # Python 2 - os.environ[name] = value.encode('utf8') + os.environ[name] = value if windows_check(): - from ctypes import windll - from ctypes import cdll + from ctypes import cdll, windll # Update the copy maintained by Windows (so SysInternals Process Explorer sees it) result = windll.kernel32.SetEnvironmentVariableW(name, value) @@ -1255,56 +1309,13 @@ def set_env_variable(name, value): ) # Update the copy maintained by msvcrt (used by gtk+ runtime) - result = cdll.msvcrt._wputenv('%s=%s' % (name, value)) + result = cdll.msvcrt._wputenv(f'{name}={value}') if result != 0: log.info("Failed to set Env Var '%s' (msvcrt._putenv)", name) else: log.debug("Set Env Var '%s' to '%s' (msvcrt._putenv)", name, value) -def unicode_argv(): - """ Gets sys.argv as list of unicode objects on any platform.""" - if windows_check(): - # Versions 2.x of Python don't support Unicode in sys.argv on - # Windows, with the underlying Windows API instead replacing multi-byte - # characters with '?'. - from ctypes import POINTER, byref, cdll, c_int, windll - from ctypes.wintypes import LPCWSTR, LPWSTR - - get_cmd_linew = cdll.kernel32.GetCommandLineW - get_cmd_linew.argtypes = [] - get_cmd_linew.restype = LPCWSTR - - cmdline_to_argvw = windll.shell32.CommandLineToArgvW - cmdline_to_argvw.argtypes = [LPCWSTR, POINTER(c_int)] - cmdline_to_argvw.restype = POINTER(LPWSTR) - - cmd = get_cmd_linew() - argc = c_int(0) - argv = cmdline_to_argvw(cmd, byref(argc)) - if argc.value > 0: - # Remove Python executable and commands if present - start = argc.value - len(sys.argv) - return [argv[i] for i in range(start, argc.value)] - else: - # On other platforms, we have to find the likely encoding of the args and decode - # First check if sys.stdout or stdin have encoding set - encoding = getattr(sys.stdout, 'encoding') or getattr(sys.stdin, 'encoding') - # If that fails, check what the locale is set to - encoding = encoding or locale.getpreferredencoding() - # As a last resort, just default to utf-8 - encoding = encoding or 'utf-8' - - arg_list = [] - for arg in sys.argv: - try: - arg_list.append(arg.decode(encoding)) - except AttributeError: - arg_list.append(arg) - - return arg_list - - def run_profiled(func, *args, **kwargs): """ Profile a function with cProfile |