diff options
Diffstat (limited to '')
-rw-r--r-- | paramiko/auth_strategy.py | 306 |
1 files changed, 306 insertions, 0 deletions
diff --git a/paramiko/auth_strategy.py b/paramiko/auth_strategy.py new file mode 100644 index 0000000..03c1d87 --- /dev/null +++ b/paramiko/auth_strategy.py @@ -0,0 +1,306 @@ +""" +Modern, adaptable authentication machinery. + +Replaces certain parts of `.SSHClient`. For a concrete implementation, see the +``OpenSSHAuthStrategy`` class in `Fabric <https://fabfile.org>`_. +""" + +from collections import namedtuple + +from .agent import AgentKey +from .util import get_logger +from .ssh_exception import AuthenticationException + + +class AuthSource: + """ + Some SSH authentication source, such as a password, private key, or agent. + + See subclasses in this module for concrete implementations. + + All implementations must accept at least a ``username`` (``str``) kwarg. + """ + + def __init__(self, username): + self.username = username + + def _repr(self, **kwargs): + # TODO: are there any good libs for this? maybe some helper from + # structlog? + pairs = [f"{k}={v!r}" for k, v in kwargs.items()] + joined = ", ".join(pairs) + return f"{self.__class__.__name__}({joined})" + + def __repr__(self): + return self._repr() + + def authenticate(self, transport): + """ + Perform authentication. + """ + raise NotImplementedError + + +class NoneAuth(AuthSource): + """ + Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 . + """ + + def authenticate(self, transport): + return transport.auth_none(self.username) + + +class Password(AuthSource): + """ + Password authentication. + + :param callable password_getter: + A lazy callable that should return a `str` password value at + authentication time, such as a `functools.partial` wrapping + `getpass.getpass`, an API call to a secrets store, or similar. + + If you already know the password at instantiation time, you should + simply use something like ``lambda: "my literal"`` (for a literal, but + also, shame on you!) or ``lambda: variable_name`` (for something stored + in a variable). + """ + + def __init__(self, username, password_getter): + super().__init__(username=username) + self.password_getter = password_getter + + def __repr__(self): + # Password auth is marginally more 'username-caring' than pkeys, so may + # as well log that info here. + return super()._repr(user=self.username) + + def authenticate(self, transport): + # Lazily get the password, in case it's prompting a user + # TODO: be nice to log source _of_ the password? + password = self.password_getter() + return transport.auth_password(self.username, password) + + +# TODO 4.0: twiddle this, or PKey, or both, so they're more obviously distinct. +# TODO 4.0: the obvious is to make this more wordy (PrivateKeyAuth), the +# minimalist approach might be to rename PKey to just Key (esp given all the +# subclasses are WhateverKey and not WhateverPKey) +class PrivateKey(AuthSource): + """ + Essentially a mixin for private keys. + + Knows how to auth, but leaves key material discovery/loading/decryption to + subclasses. + + Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted + `.PKey` instance before calling ``super().authenticate``; typically + either in their ``__init__``, or in an overridden ``authenticate`` prior to + its `super` call. + """ + + def authenticate(self, transport): + return transport.auth_publickey(self.username, self.pkey) + + +class InMemoryPrivateKey(PrivateKey): + """ + An in-memory, decrypted `.PKey` object. + """ + + def __init__(self, username, pkey): + super().__init__(username=username) + # No decryption (presumably) necessary! + self.pkey = pkey + + def __repr__(self): + # NOTE: most of interesting repr-bits for private keys is in PKey. + # TODO: tacking on agent-ness like this is a bit awkward, but, eh? + rep = super()._repr(pkey=self.pkey) + if isinstance(self.pkey, AgentKey): + rep += " [agent]" + return rep + + +class OnDiskPrivateKey(PrivateKey): + """ + Some on-disk private key that needs opening and possibly decrypting. + + :param str source: + String tracking where this key's path was specified; should be one of + ``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``. + :param Path path: + The filesystem path this key was loaded from. + :param PKey pkey: + The `PKey` object this auth source uses/represents. + """ + + def __init__(self, username, source, path, pkey): + super().__init__(username=username) + self.source = source + allowed = ("ssh-config", "python-config", "implicit-home") + if source not in allowed: + raise ValueError(f"source argument must be one of: {allowed!r}") + self.path = path + # Superclass wants .pkey, other two are mostly for display/debugging. + self.pkey = pkey + + def __repr__(self): + return self._repr( + key=self.pkey, source=self.source, path=str(self.path) + ) + + +# TODO re sources: is there anything in an OpenSSH config file that doesn't fit +# into what Paramiko already had kwargs for? + + +SourceResult = namedtuple("SourceResult", ["source", "result"]) + +# TODO: tempting to make this an OrderedDict, except the keys essentially want +# to be rich objects (AuthSources) which do not make for useful user indexing? +# TODO: members being vanilla tuples is pretty old-school/expedient; they +# "really" want to be something that's type friendlier (unless the tuple's 2nd +# member being a Union of two types is "fine"?), which I assume means yet more +# classes, eg an abstract SourceResult with concrete AuthSuccess and +# AuthFailure children? +# TODO: arguably we want __init__ typechecking of the members (or to leverage +# mypy by classifying this literally as list-of-AuthSource?) +class AuthResult(list): + """ + Represents a partial or complete SSH authentication attempt. + + This class conceptually extends `AuthStrategy` by pairing the former's + authentication **sources** with the **results** of trying to authenticate + with them. + + `AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the + form ``namedtuple('SourceResult', 'source', 'result')`` (where the + ``source`` member is an `AuthSource` and the ``result`` member is either a + return value from the relevant `.Transport` method, or an exception + object). + + .. note:: + Transport auth method results are always themselves a ``list`` of "next + allowable authentication methods". + + In the simple case of "you just authenticated successfully", it's an + empty list; if your auth was rejected but you're allowed to try again, + it will be a list of string method names like ``pubkey`` or + ``password``. + + The ``__str__`` of this class represents the empty-list scenario as the + word ``success``, which should make reading the result of an + authentication session more obvious to humans. + + Instances also have a `strategy` attribute referencing the `AuthStrategy` + which was attempted. + """ + + def __init__(self, strategy, *args, **kwargs): + self.strategy = strategy + super().__init__(*args, **kwargs) + + def __str__(self): + # NOTE: meaningfully distinct from __repr__, which still wants to use + # superclass' implementation. + # TODO: go hog wild, use rich.Table? how is that on degraded term's? + # TODO: test this lol + return "\n".join( + f"{x.source} -> {x.result or 'success'}" for x in self + ) + + +# TODO 4.0: descend from SSHException or even just Exception +class AuthFailure(AuthenticationException): + """ + Basic exception wrapping an `AuthResult` indicating overall auth failure. + + Note that `AuthFailure` descends from `AuthenticationException` but is + generally "higher level"; the latter is now only raised by individual + `AuthSource` attempts and should typically only be seen by users when + encapsulated in this class. It subclasses `AuthenticationException` + primarily for backwards compatibility reasons. + """ + + def __init__(self, result): + self.result = result + + def __str__(self): + return "\n" + str(self.result) + + +class AuthStrategy: + """ + This class represents one or more attempts to auth with an SSH server. + + By default, subclasses must at least accept an ``ssh_config`` + (`.SSHConfig`) keyword argument, but may opt to accept more as needed for + their particular strategy. + """ + + def __init__( + self, + ssh_config, + ): + self.ssh_config = ssh_config + self.log = get_logger(__name__) + + def get_sources(self): + """ + Generator yielding `AuthSource` instances, in the order to try. + + This is the primary override point for subclasses: you figure out what + sources you need, and ``yield`` them. + + Subclasses _of_ subclasses may find themselves wanting to do things + like filtering or discarding around a call to `super`. + """ + raise NotImplementedError + + def authenticate(self, transport): + """ + Handles attempting `AuthSource` instances yielded from `get_sources`. + + You *normally* won't need to override this, but it's an option for + advanced users. + """ + succeeded = False + overall_result = AuthResult(strategy=self) + # TODO: arguably we could fit in a "send none auth, record allowed auth + # types sent back" thing here as OpenSSH-client does, but that likely + # wants to live in fabric.OpenSSHAuthStrategy as not all target servers + # will implement it! + # TODO: needs better "server told us too many attempts" checking! + for source in self.get_sources(): + self.log.debug(f"Trying {source}") + try: # NOTE: this really wants to _only_ wrap the authenticate()! + result = source.authenticate(transport) + succeeded = True + # TODO: 'except PartialAuthentication' is needed for 2FA and + # similar, as per old SSHClient.connect - it is the only way + # AuthHandler supplies access to the 'name-list' field from + # MSG_USERAUTH_FAILURE, at present. + except Exception as e: + result = e + # TODO: look at what this could possibly raise, we don't really + # want Exception here, right? just SSHException subclasses? or + # do we truly want to capture anything at all with assumption + # it's easy enough for users to look afterwards? + # NOTE: showing type, not message, for tersity & also most of + # the time it's basically just "Authentication failed." + source_class = e.__class__.__name__ + self.log.info( + f"Authentication via {source} failed with {source_class}" + ) + overall_result.append(SourceResult(source, result)) + if succeeded: + break + # Gotta die here if nothing worked, otherwise Transport's main loop + # just kinda hangs out until something times out! + if not succeeded: + raise AuthFailure(result=overall_result) + # Success: give back what was done, in case they care. + return overall_result + + # TODO: is there anything OpenSSH client does which _can't_ cleanly map to + # iterating a generator? |