diff options
Diffstat (limited to 'deluge/core/rpcserver.py')
-rw-r--r-- | deluge/core/rpcserver.py | 80 |
1 files changed, 16 insertions, 64 deletions
diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py index adb5219..d4ca5d1 100644 --- a/deluge/core/rpcserver.py +++ b/deluge/core/rpcserver.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com> # @@ -8,17 +7,14 @@ # """RPCServer Module""" -from __future__ import unicode_literals - import logging import os -import stat import sys import traceback from collections import namedtuple from types import FunctionType +from typing import Callable, TypeVar, overload -from OpenSSL import crypto from twisted.internet import defer, reactor from twisted.internet.protocol import Factory, connectionDone @@ -29,7 +25,7 @@ from deluge.core.authmanager import ( AUTH_LEVEL_DEFAULT, AUTH_LEVEL_NONE, ) -from deluge.crypto_utils import get_context_factory +from deluge.crypto_utils import check_ssl_keys, get_context_factory from deluge.error import ( DelugeError, IncompatibleClient, @@ -46,6 +42,18 @@ RPC_EVENT = 3 log = logging.getLogger(__name__) +TCallable = TypeVar('TCallable', bound=Callable) + + +@overload +def export(func: TCallable) -> TCallable: + ... + + +@overload +def export(auth_level: int) -> Callable[[TCallable], TCallable]: + ... + def export(auth_level=AUTH_LEVEL_DEFAULT): """ @@ -69,7 +77,7 @@ def export(auth_level=AUTH_LEVEL_DEFAULT): if func.__doc__: if func.__doc__.endswith(' '): indent = func.__doc__.split('\n')[-1] - func.__doc__ += '\n{}'.format(indent) + func.__doc__ += f'\n{indent}' else: func.__doc__ += '\n\n' func.__doc__ += rpc_text @@ -114,7 +122,7 @@ def format_request(call): class DelugeRPCProtocol(DelugeTransferProtocol): def __init__(self): - super(DelugeRPCProtocol, self).__init__() + super().__init__() # namedtuple subclass with auth_level, username for the connected session. self.AuthLevel = namedtuple('SessionAuthlevel', 'auth_level, username') @@ -588,59 +596,3 @@ class RPCServer(component.Component): def stop(self): self.factory.state = 'stopping' - - -def check_ssl_keys(): - """ - Check for SSL cert/key and create them if necessary - """ - ssl_dir = deluge.configmanager.get_config_dir('ssl') - if not os.path.exists(ssl_dir): - # The ssl folder doesn't exist so we need to create it - os.makedirs(ssl_dir) - generate_ssl_keys() - else: - for f in ('daemon.pkey', 'daemon.cert'): - if not os.path.exists(os.path.join(ssl_dir, f)): - generate_ssl_keys() - break - - -def generate_ssl_keys(): - """ - This method generates a new SSL key/cert. - """ - from deluge.common import PY2 - - digest = 'sha256' if not PY2 else b'sha256' - - # Generate key pair - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 2048) - - # Generate cert request - req = crypto.X509Req() - subj = req.get_subject() - setattr(subj, 'CN', 'Deluge Daemon') - req.set_pubkey(pkey) - req.sign(pkey, digest) - - # Generate certificate - cert = crypto.X509() - cert.set_serial_number(0) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 3) # Three Years - cert.set_issuer(req.get_subject()) - cert.set_subject(req.get_subject()) - cert.set_pubkey(req.get_pubkey()) - cert.sign(pkey, digest) - - # Write out files - ssl_dir = deluge.configmanager.get_config_dir('ssl') - with open(os.path.join(ssl_dir, 'daemon.pkey'), 'wb') as _file: - _file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) - with open(os.path.join(ssl_dir, 'daemon.cert'), 'wb') as _file: - _file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) - # Make the files only readable by this user - for f in ('daemon.pkey', 'daemon.cert'): - os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE) |