summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2020-05-13 07:16:32 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2020-05-13 07:16:46 +0000
commita04832d167d32615b9f946ac82c39514e88de1c6 (patch)
tree9f92589a01bb46743014b169146b9a217a5f0da8 /tests
parentReleasing progress-linux version 2.6.0-2~progress5+u1. (diff)
downloadparamiko-a04832d167d32615b9f946ac82c39514e88de1c6.tar.xz
paramiko-a04832d167d32615b9f946ac82c39514e88de1c6.zip
Merging upstream version 2.7.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests')
-rw-r--r--tests/test_client.py10
-rw-r--r--tests/test_config.py996
-rw-r--r--tests/test_dss_openssh.key22
-rw-r--r--tests/test_ecdsa_384_openssh.key11
-rw-r--r--tests/test_gssapi.py4
-rw-r--r--tests/test_pkey.py43
-rw-r--r--tests/test_proxy.py144
-rw-r--r--tests/test_rsa_openssh.key28
-rw-r--r--tests/test_rsa_openssh_nopad.key27
-rw-r--r--tests/test_util.py514
-rw-r--r--tests/util.py9
11 files changed, 1300 insertions, 508 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index ad5c36a..60ad310 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -407,15 +407,15 @@ class SSHClientTest(ClientTest):
self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEqual(0, len(self.tc.get_host_keys()))
+ assert len(self.tc.get_host_keys()) == 0
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
- self.assertTrue(self.event.is_set())
- self.assertTrue(self.ts.is_active())
+ assert self.event.is_set()
+ assert self.ts.is_active()
p = weakref.ref(self.tc._transport.packetizer)
- self.assertTrue(p() is not None)
+ assert p() is not None
self.tc.close()
del self.tc
@@ -425,7 +425,7 @@ class SSHClientTest(ClientTest):
gc.collect()
gc.collect()
- self.assertTrue(p() is None)
+ assert p() is None
def test_client_can_be_used_as_context_manager(self):
"""
diff --git a/tests/test_config.py b/tests/test_config.py
index cbd3f62..5e9aa05 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -1,74 +1,992 @@
# This file is part of Paramiko and subject to the license in /LICENSE in this
# repository
-import pytest
+from os.path import expanduser
+from socket import gaierror
-from paramiko import config
-from paramiko.util import parse_ssh_config
-from paramiko.py3compat import StringIO
+from paramiko.py3compat import string_types
+from invoke import Result
+from mock import patch
+from pytest import raises, mark, fixture
-def test_SSHConfigDict_construct_empty():
- assert not config.SSHConfigDict()
+from paramiko import (
+ SSHConfig,
+ SSHConfigDict,
+ CouldNotCanonicalize,
+ ConfigParseError,
+)
+from .util import _config
-def test_SSHConfigDict_construct_from_list():
- assert config.SSHConfigDict([(1, 2)])[1] == 2
+@fixture
+def socket():
+ """
+ Patch all of socket.* in our config module to prevent eg real DNS lookups.
+
+ Also forces getaddrinfo (used in our addressfamily lookup stuff) to always
+ fail by default to mimic usual lack of AddressFamily related crap.
-def test_SSHConfigDict_construct_from_dict():
- assert config.SSHConfigDict({1: 2})[1] == 2
+ Callers who want to mock DNS lookups can then safely assume gethostbyname()
+ will be in use.
+ """
+ with patch("paramiko.config.socket") as mocket:
+ # Reinstate gaierror as an actual exception and not a sub-mock.
+ # (Presumably this would work with any exception, but why not use the
+ # real one?)
+ mocket.gaierror = gaierror
+ # Patch out getaddrinfo, used to detect family-specific IP lookup -
+ # only useful for a few specific tests.
+ mocket.getaddrinfo.side_effect = mocket.gaierror
+ # Patch out getfqdn to return some real string for when it gets called;
+ # some code (eg tokenization) gets mad w/ MagicMocks
+ mocket.getfqdn.return_value = "some.fake.fqdn"
+ yield mocket
-@pytest.mark.parametrize("true_ish", ("yes", "YES", "Yes", True))
-def test_SSHConfigDict_as_bool_true_ish(true_ish):
- assert config.SSHConfigDict({"key": true_ish}).as_bool("key") is True
+def load_config(name):
+ return SSHConfig.from_path(_config(name))
-@pytest.mark.parametrize("false_ish", ("no", "NO", "No", False))
-def test_SSHConfigDict_as_bool(false_ish):
- assert config.SSHConfigDict({"key": false_ish}).as_bool("key") is False
+class TestSSHConfig(object):
+ def setup(self):
+ self.config = load_config("robey")
+ def test_init(self):
+ # No args!
+ with raises(TypeError):
+ SSHConfig("uh oh!")
+ # No args.
+ assert not SSHConfig()._config
-@pytest.mark.parametrize("int_val", ("42", 42))
-def test_SSHConfigDict_as_int(int_val):
- assert config.SSHConfigDict({"key": int_val}).as_int("key") == 42
+ def test_from_text(self):
+ config = SSHConfig.from_text("User foo")
+ assert config.lookup("foo.example.com")["user"] == "foo"
+ def test_from_file(self):
+ with open(_config("robey")) as flo:
+ config = SSHConfig.from_file(flo)
+ assert config.lookup("whatever")["user"] == "robey"
-@pytest.mark.parametrize("non_int", ("not an int", None, object()))
-def test_SSHConfigDict_as_int_failures(non_int):
- conf = config.SSHConfigDict({"key": non_int})
+ def test_from_path(self):
+ # NOTE: DO NOT replace with use of load_config() :D
+ config = SSHConfig.from_path(_config("robey"))
+ assert config.lookup("meh.example.com")["port"] == "3333"
- try:
- int(non_int)
- except Exception as e:
- exception_type = type(e)
+ def test_parse_config(self):
+ expected = [
+ {"host": ["*"], "config": {}},
+ {
+ "host": ["*"],
+ "config": {"identityfile": ["~/.ssh/id_rsa"], "user": "robey"},
+ },
+ {
+ "host": ["*.example.com"],
+ "config": {"user": "bjork", "port": "3333"},
+ },
+ {"host": ["*"], "config": {"crazy": "something dumb"}},
+ {
+ "host": ["spoo.example.com"],
+ "config": {"crazy": "something else"},
+ },
+ ]
+ assert self.config._config == expected
- with pytest.raises(exception_type):
- conf.as_int("key")
+ @mark.parametrize(
+ "host,values",
+ (
+ (
+ "irc.danger.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "irc.danger.com",
+ "user": "robey",
+ },
+ ),
+ (
+ "irc.example.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "irc.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
+ ),
+ (
+ "spoo.example.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "spoo.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
+ ),
+ ),
+ )
+ def test_host_config(self, host, values):
+ expected = dict(
+ values, hostname=host, identityfile=[expanduser("~/.ssh/id_rsa")]
+ )
+ assert self.config.lookup(host) == expected
+ def test_fabric_issue_33(self):
+ config = SSHConfig.from_text(
+ """
+Host www13.*
+ Port 22
-def test_SSHConfig_host_dicts_are_SSHConfigDict_instances():
- test_config_file = """
Host *.example.com
Port 2222
Host *
Port 3333
- """
- f = StringIO(test_config_file)
- config = parse_ssh_config(f)
- assert config.lookup("foo.example.com").as_int("port") == 2222
+"""
+ )
+ host = "www13.example.com"
+ expected = {"hostname": host, "port": "22"}
+ assert config.lookup(host) == expected
+
+ def test_proxycommand_config_equals_parsing(self):
+ """
+ ProxyCommand should not split on equals signs within the value.
+ """
+ config = SSHConfig.from_text(
+ """
+Host space-delimited
+ ProxyCommand foo bar=biz baz
+
+Host equals-delimited
+ ProxyCommand=foo bar=biz baz
+"""
+ )
+ for host in ("space-delimited", "equals-delimited"):
+ value = config.lookup(host)["proxycommand"]
+ assert value == "foo bar=biz baz"
+
+ def test_proxycommand_interpolation(self):
+ """
+ ProxyCommand should perform interpolation on the value
+ """
+ config = SSHConfig.from_text(
+ """
+Host specific
+ Port 37
+ ProxyCommand host %h port %p lol
+
+Host portonly
+ Port 155
+
+Host *
+ Port 25
+ ProxyCommand host %h port %p
+"""
+ )
+ for host, val in (
+ ("foo.com", "host foo.com port 25"),
+ ("specific", "host specific port 37 lol"),
+ ("portonly", "host portonly port 155"),
+ ):
+ assert config.lookup(host)["proxycommand"] == val
+
+ def test_proxycommand_tilde_expansion(self):
+ """
+ Tilde (~) should be expanded inside ProxyCommand
+ """
+ config = SSHConfig.from_text(
+ """
+Host test
+ ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p
+"""
+ )
+ expected = "ssh -F {}/.ssh/test_config bastion nc test 22".format(
+ expanduser("~")
+ )
+ got = config.lookup("test")["proxycommand"]
+ assert got == expected
+
+ @patch("paramiko.config.getpass")
+ def test_controlpath_token_expansion(self, getpass):
+ getpass.getuser.return_value = "gandalf"
+ config = SSHConfig.from_text(
+ """
+Host explicit_user
+ User root
+ ControlPath user %u remoteuser %r
+
+Host explicit_host
+ HostName ohai
+ ControlPath remoteuser %r host %h orighost %n
+ """
+ )
+ result = config.lookup("explicit_user")["controlpath"]
+ # Remote user is User val, local user is User val
+ assert result == "user gandalf remoteuser root"
+ result = config.lookup("explicit_host")["controlpath"]
+ # Remote user falls back to local user; host and orighost may differ
+ assert result == "remoteuser gandalf host ohai orighost explicit_host"
+
+ def test_negation(self):
+ config = SSHConfig.from_text(
+ """
+Host www13.* !*.example.com
+ Port 22
+
+Host *.example.com !www13.*
+ Port 2222
+
+Host www13.*
+ Port 8080
+
+Host *
+ Port 3333
+"""
+ )
+ host = "www13.example.com"
+ expected = {"hostname": host, "port": "8080"}
+ assert config.lookup(host) == expected
+
+ def test_proxycommand(self):
+ config = SSHConfig.from_text(
+ """
+Host proxy-with-equal-divisor-and-space
+ProxyCommand = foo=bar
+
+Host proxy-with-equal-divisor-and-no-space
+ProxyCommand=foo=bar
+
+Host proxy-without-equal-divisor
+ProxyCommand foo=bar:%h-%p
+"""
+ )
+ for host, values in {
+ "proxy-with-equal-divisor-and-space": {
+ "hostname": "proxy-with-equal-divisor-and-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-with-equal-divisor-and-no-space": {
+ "hostname": "proxy-with-equal-divisor-and-no-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-without-equal-divisor": {
+ "hostname": "proxy-without-equal-divisor",
+ "proxycommand": "foo=bar:proxy-without-equal-divisor-22",
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_identityfile(self):
+ config = SSHConfig.from_text(
+ """
+
+IdentityFile id_dsa0
+
+Host *
+IdentityFile id_dsa1
+
+Host dsa2
+IdentityFile id_dsa2
+
+Host dsa2*
+IdentityFile id_dsa22
+"""
+ )
+ for host, values in {
+ "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]},
+ "dsa2": {
+ "hostname": "dsa2",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"],
+ },
+ "dsa22": {
+ "hostname": "dsa22",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_config_addressfamily_and_lazy_fqdn(self):
+ """
+ Ensure the code path honoring non-'all' AddressFamily doesn't asplode
+ """
+ config = SSHConfig.from_text(
+ """
+AddressFamily inet
+IdentityFile something_%l_using_fqdn
+"""
+ )
+ assert config.lookup(
+ "meh"
+ ) # will die during lookup() if bug regresses
+
+ def test_config_dos_crlf_succeeds(self):
+ config = SSHConfig.from_text(
+ """
+Host abcqwerty\r\nHostName 127.0.0.1\r\n
+"""
+ )
+ assert config.lookup("abcqwerty")["hostname"] == "127.0.0.1"
+
+ def test_get_hostnames(self):
+ expected = {"*", "*.example.com", "spoo.example.com"}
+ assert self.config.get_hostnames() == expected
+
+ def test_quoted_host_names(self):
+ config = SSHConfig.from_text(
+ """
+Host "param pam" param "pam"
+ Port 1111
+
+Host "param2"
+ Port 2222
+
+Host param3 parara
+ Port 3333
+Host param4 "p a r" "p" "par" para
+ Port 4444
+"""
+ )
+ res = {
+ "param pam": {"hostname": "param pam", "port": "1111"},
+ "param": {"hostname": "param", "port": "1111"},
+ "pam": {"hostname": "pam", "port": "1111"},
+ "param2": {"hostname": "param2", "port": "2222"},
+ "param3": {"hostname": "param3", "port": "3333"},
+ "parara": {"hostname": "parara", "port": "3333"},
+ "param4": {"hostname": "param4", "port": "4444"},
+ "p a r": {"hostname": "p a r", "port": "4444"},
+ "p": {"hostname": "p", "port": "4444"},
+ "par": {"hostname": "par", "port": "4444"},
+ "para": {"hostname": "para", "port": "4444"},
+ }
+ for host, values in res.items():
+ assert config.lookup(host) == values
-def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances():
- test_config_file = """\
+ def test_quoted_params_in_config(self):
+ config = SSHConfig.from_text(
+ """
+Host "param pam" param "pam"
+ IdentityFile id_rsa
+
+Host "param2"
+ IdentityFile "test rsa key"
+
+Host param3 parara
+ IdentityFile id_rsa
+ IdentityFile "test rsa key"
+"""
+ )
+ res = {
+ "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]},
+ "param": {"hostname": "param", "identityfile": ["id_rsa"]},
+ "pam": {"hostname": "pam", "identityfile": ["id_rsa"]},
+ "param2": {"hostname": "param2", "identityfile": ["test rsa key"]},
+ "param3": {
+ "hostname": "param3",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
+ "parara": {
+ "hostname": "parara",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
+ }
+ for host, values in res.items():
+ assert config.lookup(host) == values
+
+ def test_quoted_host_in_config(self):
+ conf = SSHConfig()
+ correct_data = {
+ "param": ["param"],
+ '"param"': ["param"],
+ "param pam": ["param", "pam"],
+ '"param" "pam"': ["param", "pam"],
+ '"param" pam': ["param", "pam"],
+ 'param "pam"': ["param", "pam"],
+ 'param "pam" p': ["param", "pam", "p"],
+ '"param" pam "p"': ["param", "pam", "p"],
+ '"pa ram"': ["pa ram"],
+ '"pa ram" pam': ["pa ram", "pam"],
+ 'param "p a m"': ["param", "p a m"],
+ }
+ incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a']
+ for host, values in correct_data.items():
+ assert conf._get_hosts(host) == values
+ for host in incorrect_data:
+ with raises(ConfigParseError):
+ conf._get_hosts(host)
+
+ def test_invalid_line_format_excepts(self):
+ with raises(ConfigParseError):
+ load_config("invalid")
+
+ def test_proxycommand_none_issue_418(self):
+ config = SSHConfig.from_text(
+ """
+Host proxycommand-standard-none
+ ProxyCommand None
+
+Host proxycommand-with-equals-none
+ ProxyCommand=None
+"""
+ )
+ for host, values in {
+ "proxycommand-standard-none": {
+ "hostname": "proxycommand-standard-none"
+ },
+ "proxycommand-with-equals-none": {
+ "hostname": "proxycommand-with-equals-none"
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_proxycommand_none_masking(self):
+ # Re: https://github.com/paramiko/paramiko/issues/670
+ config = SSHConfig.from_text(
+ """
+Host specific-host
+ ProxyCommand none
+
+Host other-host
+ ProxyCommand other-proxy
+
+Host *
+ ProxyCommand default-proxy
+"""
+ )
+ # When bug is present, the full stripping-out of specific-host's
+ # ProxyCommand means it actually appears to pick up the default
+ # ProxyCommand value instead, due to cascading. It should (for
+ # backwards compatibility reasons in 1.x/2.x) appear completely blank,
+ # as if the host had no ProxyCommand whatsoever.
+ # Threw another unrelated host in there just for sanity reasons.
+ assert "proxycommand" not in config.lookup("specific-host")
+ assert config.lookup("other-host")["proxycommand"] == "other-proxy"
+ cmd = config.lookup("some-random-host")["proxycommand"]
+ assert cmd == "default-proxy"
+
+ def test_hostname_tokenization(self):
+ result = load_config("hostname-tokenized").lookup("whatever")
+ assert result["hostname"] == "prefix.whatever"
+
+
+class TestSSHConfigDict(object):
+ def test_SSHConfigDict_construct_empty(self):
+ assert not SSHConfigDict()
+
+ def test_SSHConfigDict_construct_from_list(self):
+ assert SSHConfigDict([(1, 2)])[1] == 2
+
+ def test_SSHConfigDict_construct_from_dict(self):
+ assert SSHConfigDict({1: 2})[1] == 2
+
+ @mark.parametrize("true_ish", ("yes", "YES", "Yes", True))
+ def test_SSHConfigDict_as_bool_true_ish(self, true_ish):
+ assert SSHConfigDict({"key": true_ish}).as_bool("key") is True
+
+ @mark.parametrize("false_ish", ("no", "NO", "No", False))
+ def test_SSHConfigDict_as_bool(self, false_ish):
+ assert SSHConfigDict({"key": false_ish}).as_bool("key") is False
+
+ @mark.parametrize("int_val", ("42", 42))
+ def test_SSHConfigDict_as_int(self, int_val):
+ assert SSHConfigDict({"key": int_val}).as_int("key") == 42
+
+ @mark.parametrize("non_int", ("not an int", None, object()))
+ def test_SSHConfigDict_as_int_failures(self, non_int):
+ conf = SSHConfigDict({"key": non_int})
+
+ try:
+ int(non_int)
+ except Exception as e:
+ exception_type = type(e)
+
+ with raises(exception_type):
+ conf.as_int("key")
+
+ def test_SSHConfig_host_dicts_are_SSHConfigDict_instances(self):
+ config = SSHConfig.from_text(
+ """
+Host *.example.com
+ Port 2222
+
+Host *
+ Port 3333
+"""
+ )
+ assert config.lookup("foo.example.com").as_int("port") == 2222
+
+ def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances(self):
+ config = SSHConfig.from_text(
+ """
Host *.example.com
Port 2222
Host *
Port 3333
+"""
+ )
+ assert config.lookup("anything-else").as_int("port") == 3333
+
+
+class TestHostnameCanonicalization(object):
+ # NOTE: this class uses on-disk configs, and ones with real (at time of
+ # writing) DNS names, so that one can easily test OpenSSH's behavior using
+ # "ssh -F path/to/file.config -G <target>".
+
+ def test_off_by_default(self, socket):
+ result = load_config("basic").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+ assert not socket.gethostbyname.called
+
+ def test_explicit_no_same_as_default(self, socket):
+ result = load_config("no-canon").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+ assert not socket.gethostbyname.called
+
+ @mark.parametrize(
+ "config_name",
+ ("canon", "canon-always", "canon-local", "canon-local-always"),
+ )
+ def test_canonicalization_base_cases(self, socket, config_name):
+ result = load_config(config_name).lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ socket.gethostbyname.assert_called_once_with("www.paramiko.org")
+
+ def test_uses_getaddrinfo_when_AddressFamily_given(self, socket):
+ # Undo default 'always fails' mock
+ socket.getaddrinfo.side_effect = None
+ socket.getaddrinfo.return_value = [True] # just need 1st value truthy
+ result = load_config("canon-ipv4").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ assert not socket.gethostbyname.called
+ gai_args = socket.getaddrinfo.call_args[0]
+ assert gai_args[0] == "www.paramiko.org"
+ assert gai_args[2] is socket.AF_INET # Mocked, but, still useful
+
+ @mark.skip
+ def test_empty_CanonicalDomains_canonicalizes_despite_noop(self, socket):
+ # Confirmed this is how OpenSSH behaves as well. Bit silly, but.
+ # TODO: this requires modifying SETTINGS_REGEX, which is a mite scary
+ # (honestly I'd prefer to move to a real parser lib anyhow) and since
+ # this is a very dumb corner case, it's marked skip for now.
+ result = load_config("empty-canon").lookup("www")
+ assert result["hostname"] == "www" # no paramiko.org
+ assert "user" not in result # did not discover canonicalized block
+
+ def test_CanonicalDomains_may_be_set_to_space_separated_list(self, socket):
+ # Test config has a bogus domain, followed by paramiko.org
+ socket.gethostbyname.side_effect = [socket.gaierror, True]
+ result = load_config("multi-canon-domains").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ assert [x[0][0] for x in socket.gethostbyname.call_args_list] == [
+ "www.not-a-real-tld",
+ "www.paramiko.org",
+ ]
+
+ def test_canonicalization_applies_to_single_dot_by_default(self, socket):
+ result = load_config("deep-canon").lookup("sub.www")
+ assert result["hostname"] == "sub.www.paramiko.org"
+ assert result["user"] == "deep"
+
+ def test_canonicalization_not_applied_to_two_dots_by_default(self, socket):
+ result = load_config("deep-canon").lookup("subber.sub.www")
+ assert result["hostname"] == "subber.sub.www"
+ assert "user" not in result
+
+ def test_hostname_depth_controllable_with_max_dots_directive(self, socket):
+ # This config sets MaxDots of 2, so now canonicalization occurs
+ result = load_config("deep-canon-maxdots").lookup("subber.sub.www")
+ assert result["hostname"] == "subber.sub.www.paramiko.org"
+ assert result["user"] == "deeper"
+
+ def test_max_dots_may_be_zero(self, socket):
+ result = load_config("zero-maxdots").lookup("sub.www")
+ assert result["hostname"] == "sub.www"
+ assert "user" not in result
+
+ def test_fallback_yes_does_not_canonicalize_or_error(self, socket):
+ socket.gethostbyname.side_effect = socket.gaierror
+ result = load_config("fallback-yes").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+
+ def test_fallback_no_causes_errors_for_unresolvable_names(self, socket):
+ socket.gethostbyname.side_effect = socket.gaierror
+ with raises(CouldNotCanonicalize) as info:
+ load_config("fallback-no").lookup("doesnotexist")
+ assert str(info.value) == "doesnotexist"
+
+ def test_identityfile_continues_being_appended_to(self, socket):
+ result = load_config("canon").lookup("www")
+ assert result["identityfile"] == ["base.key", "canonicalized.key"]
+
+
+@mark.skip
+class TestCanonicalizationOfCNAMEs(object):
+ def test_permitted_cnames_may_be_one_to_one_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com
+ pass
+
+ def test_permitted_cnames_may_be_one_to_many_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com,*.biz.com
+ pass
+
+ def test_permitted_cnames_may_be_many_to_one_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com
+ pass
+
+ def test_permitted_cnames_may_be_many_to_many_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com,*.baz.com
+ pass
+
+ def test_permitted_cnames_may_be_multiple_mappings(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com *.biz.com:*.baz.com
+ pass
+
+ def test_permitted_cnames_may_be_multiple_complex_mappings(self):
+ # Same as prev but with multiple patterns on both ends in both args
+ pass
+
+
+class TestMatchAll(object):
+ def test_always_matches(self):
+ result = load_config("match-all").lookup("general")
+ assert result["user"] == "awesome"
+
+ def test_may_not_mix_with_non_canonical_keywords(self):
+ for config in ("match-all-and-more", "match-all-and-more-before"):
+ with raises(ConfigParseError):
+ load_config(config).lookup("whatever")
+
+ def test_may_come_after_canonical(self, socket):
+ result = load_config("match-all-after-canonical").lookup("www")
+ assert result["user"] == "awesome"
+
+ def test_may_not_come_before_canonical(self, socket):
+ with raises(ConfigParseError):
+ load_config("match-all-before-canonical")
+
+ def test_after_canonical_not_loaded_when_non_canonicalized(self, socket):
+ result = load_config("match-canonical-no").lookup("a-host")
+ assert "user" not in result
+
+
+def _expect(success_on):
+ """
+ Returns a side_effect-friendly Invoke success result for given command(s).
+
+ Ensures that any other commands fail; this is useful for testing 'Match
+ exec' because it means all other such clauses under test act like no-ops.
+
+ :param success_on:
+ Single string or list of strings, noting commands that should appear to
+ succeed.
"""
- f = StringIO(test_config_file)
- config = parse_ssh_config(f)
- assert config.lookup("anything-else").as_int("port") == 3333
+ if isinstance(success_on, string_types):
+ success_on = [success_on]
+
+ def inner(command, *args, **kwargs):
+ # Sanity checking - we always expect that invoke.run is called with
+ # these.
+ assert kwargs.get("hide", None) == "stdout"
+ assert kwargs.get("warn", None) is True
+ # Fake exit
+ exit = 0 if command in success_on else 1
+ return Result(exited=exit)
+
+ return inner
+
+
+class TestMatchExec(object):
+ @patch("paramiko.config.invoke", new=None)
+ @patch("paramiko.config.invoke_import_error", new=ImportError("meh"))
+ def test_raises_invoke_ImportErrors_at_runtime(self):
+ # Not an ideal test, but I don't know of a non-bad way to fake out
+ # module-time ImportErrors. So we mock the symptoms. Meh!
+ with raises(ImportError) as info:
+ load_config("match-exec").lookup("oh-noes")
+ assert str(info.value) == "meh"
+
+ @patch("paramiko.config.invoke.run")
+ @mark.parametrize(
+ "cmd,user",
+ [
+ ("unquoted", "rando"),
+ ("quoted", "benjamin"),
+ ("quoted spaced", "neil"),
+ ],
+ )
+ def test_accepts_single_possibly_quoted_argument(self, run, cmd, user):
+ run.side_effect = _expect(cmd)
+ result = load_config("match-exec").lookup("whatever")
+ assert result["user"] == user
+
+ @patch("paramiko.config.invoke.run")
+ def test_does_not_match_nonzero_exit_codes(self, run):
+ # Nothing will succeed -> no User ever gets loaded
+ run.return_value = Result(exited=1)
+ result = load_config("match-exec").lookup("whatever")
+ assert "user" not in result
+
+ @patch("paramiko.config.getpass")
+ @patch("paramiko.config.invoke.run")
+ def test_tokenizes_argument(self, run, getpass, socket):
+ socket.gethostname.return_value = "local.fqdn"
+ getpass.getuser.return_value = "gandalf"
+ # Actual exec value is "%d %h %L %l %n %p %r %u"
+ parts = (
+ expanduser("~"),
+ "configured",
+ "local",
+ "some.fake.fqdn",
+ "target",
+ "22",
+ "intermediate",
+ "gandalf",
+ )
+ run.side_effect = _expect(" ".join(parts))
+ result = load_config("match-exec").lookup("target")
+ assert result["port"] == "1337"
+
+ @patch("paramiko.config.invoke.run")
+ def test_works_with_canonical(self, run, socket):
+ # Ensure both stanzas' exec components appear to match
+ run.side_effect = _expect(["uncanonicalized", "canonicalized"])
+ result = load_config("match-exec-canonical").lookup("who-cares")
+ # Prove both config values got loaded up, across the two passes
+ assert result["user"] == "defenseless"
+ assert result["port"] == "8007"
+
+ @patch("paramiko.config.invoke.run")
+ def test_may_be_negated(self, run):
+ run.side_effect = _expect("this succeeds")
+ result = load_config("match-exec-negation").lookup("so-confusing")
+ # If negation did not work, the first of the two Match exec directives
+ # would have set User to 'nope' (and/or the second would have NOT set
+ # User to 'yup')
+ assert result["user"] == "yup"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-exec-no-arg")
+
+ @patch("paramiko.config.invoke.run")
+ def test_works_with_tokenized_hostname(self, run):
+ run.side_effect = _expect("ping target")
+ result = load_config("hostname-exec-tokenized").lookup("target")
+ assert result["hostname"] == "pingable.target"
+
+
+class TestMatchHost(object):
+ def test_matches_target_name_when_no_hostname(self):
+ result = load_config("match-host").lookup("target")
+ assert result["user"] == "rand"
+
+ def test_matches_hostname_from_global_setting(self):
+ # Also works for ones set in regular Host stanzas
+ result = load_config("match-host-name").lookup("anything")
+ assert result["user"] == "silly"
+
+ def test_matches_hostname_from_earlier_match(self):
+ # Corner case: one Match matches original host, sets HostName,
+ # subsequent Match matches the latter.
+ result = load_config("match-host-from-match").lookup("original-host")
+ assert result["user"] == "inner"
+
+ def test_may_be_globbed(self):
+ result = load_config("match-host-glob-list").lookup("whatever")
+ assert result["user"] == "matrim"
+
+ def test_may_be_comma_separated_list(self):
+ for target in ("somehost", "someotherhost"):
+ result = load_config("match-host-glob-list").lookup(target)
+ assert result["user"] == "thom"
+
+ def test_comma_separated_list_may_have_internal_negation(self):
+ conf = load_config("match-host-glob-list")
+ assert conf.lookup("good")["user"] == "perrin"
+ assert "user" not in conf.lookup("goof")
+
+ def test_matches_canonicalized_name(self, socket):
+ # Without 'canonical' explicitly declared, mind.
+ result = load_config("match-host-canonicalized").lookup("www")
+ assert result["user"] == "rand"
+
+ def test_works_with_canonical_keyword(self, socket):
+ # NOTE: distinct from 'happens to be canonicalized' above
+ result = load_config("match-host-canonicalized").lookup("docs")
+ assert result["user"] == "eric"
+
+ def test_may_be_negated(self):
+ conf = load_config("match-host-negated")
+ assert conf.lookup("docs")["user"] == "jeff"
+ assert "user" not in conf.lookup("www")
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-host-no-arg")
+
+
+class TestMatchOriginalHost(object):
+ def test_matches_target_host_not_hostname(self):
+ result = load_config("match-orighost").lookup("target")
+ assert result["hostname"] == "bogus"
+ assert result["user"] == "tuon"
+
+ def test_matches_target_host_not_canonicalized_name(self, socket):
+ result = load_config("match-orighost-canonical").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "tuon"
+
+ def test_may_be_globbed(self):
+ result = load_config("match-orighost").lookup("whatever")
+ assert result["user"] == "matrim"
+
+ def test_may_be_comma_separated_list(self):
+ for target in ("comma", "separated"):
+ result = load_config("match-orighost").lookup(target)
+ assert result["user"] == "chameleon"
+
+ def test_comma_separated_list_may_have_internal_negation(self):
+ result = load_config("match-orighost").lookup("nope")
+ assert "user" not in result
+
+ def test_may_be_negated(self):
+ result = load_config("match-orighost").lookup("docs")
+ assert result["user"] == "thom"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-orighost-no-arg")
+
+
+class TestMatchUser(object):
+ def test_matches_configured_username(self):
+ result = load_config("match-user-explicit").lookup("anything")
+ assert result["hostname"] == "dumb"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_matches_local_username_by_default(self, getuser):
+ getuser.return_value = "gandalf"
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "gondor"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_globbed(self, getuser):
+ for user in ("bilbo", "bombadil"):
+ getuser.return_value = user
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "shire"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_comma_separated_list(self, getuser):
+ for user in ("aragorn", "frodo"):
+ getuser.return_value = user
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "moria"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_comma_separated_list_may_have_internal_negation(self, getuser):
+ getuser.return_value = "legolas"
+ result = load_config("match-user").lookup("anything")
+ assert "port" not in result
+ getuser.return_value = "gimli"
+ result = load_config("match-user").lookup("anything")
+ assert result["port"] == "7373"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_negated(self, getuser):
+ getuser.return_value = "saruman"
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "mordor"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-user-no-arg")
+
+
+# NOTE: highly derivative of previous suite due to the former's use of
+# localuser fallback. Doesn't seem worth conflating/refactoring right now.
+class TestMatchLocalUser(object):
+ @patch("paramiko.config.getpass.getuser")
+ def test_matches_local_username(self, getuser):
+ getuser.return_value = "gandalf"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "gondor"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_globbed(self, getuser):
+ for user in ("bilbo", "bombadil"):
+ getuser.return_value = user
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "shire"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_comma_separated_list(self, getuser):
+ for user in ("aragorn", "frodo"):
+ getuser.return_value = user
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "moria"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_comma_separated_list_may_have_internal_negation(self, getuser):
+ getuser.return_value = "legolas"
+ result = load_config("match-localuser").lookup("anything")
+ assert "port" not in result
+ getuser.return_value = "gimli"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["port"] == "7373"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_negated(self, getuser):
+ getuser.return_value = "saruman"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "mordor"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-localuser-no-arg")
+
+
+class TestComplexMatching(object):
+ # NOTE: this is still a cherry-pick of a few levels of complexity, there's
+ # no point testing literally all possible combinations.
+
+ def test_originalhost_host(self):
+ result = load_config("match-complex").lookup("target")
+ assert result["hostname"] == "bogus"
+ assert result["user"] == "rand"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_originalhost_localuser(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("remote")
+ assert result["user"] == "calrissian"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_everything_but_all(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("www")
+ assert result["port"] == "7777"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_everything_but_all_with_some_negated(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("docs")
+ assert result["port"] == "1234"
+
+ def test_negated_canonical(self, socket):
+ # !canonical in a config that is not canonicalized - does match
+ result = load_config("match-canonical-no").lookup("specific")
+ assert result["user"] == "overload"
+ # !canonical in a config that is canonicalized - does NOT match
+ result = load_config("match-canonical-yes").lookup("www")
+ assert result["user"] == "hidden"
diff --git a/tests/test_dss_openssh.key b/tests/test_dss_openssh.key
new file mode 100644
index 0000000..2a9f892
--- /dev/null
+++ b/tests/test_dss_openssh.key
@@ -0,0 +1,22 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABAsyq4pxL
+R5sOprPDHGpvzxAAAAEAAAAAEAAAGxAAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vW
+pNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3i
+r492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJ
+zMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH
+99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6
+H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznN
+JKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQ
+HKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXL
+sXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQAAAgCH6XUl1hYWB6kgCSHV
+a4C+vQHrgFNgNwEQnE074LXHXlAhxC+Dm8XTGqVPX1KRPWzadq9/+v6pqLFqiRueB86uRb
+J5WtAbUs3WwxAaC5Mi+mn42MBfL9PIwWPWCvstrAq9Nyj3EBMeX3XFLxN3RuGXIQnY/5rF
+f5hriUVxhWDQGIVbBKhkpn7Geqg6nLpn7iqQhzFmFGjPmAdrllgdVGJRLyIN6BRsaltDdy
+vxufkvGzKudvQ85QvsaoFJQ6K1d0S7907pexvxmWpcO7zchXb6i09BITWOAKIcHpVkbNQw
++8pzSdpggsAwCRbfk/Jkezz8sXVUCfmmJ23NFUw04/0ZbilCADRsUaPfafgVPeDznBnuCm
+tfXa4JSrVUvPdwoex3SKZmYsFXwsuOEQnFkhUGHfWwTbmOmxzy6dtC24KYhnWG5OGFVJXh
+3B8jQJGGs2ANfusI/Z0o15tAnQy5fqsLf9TT3RX7RG2ujIiDBsU+A1g//IXmSxxkUOQMZs
+v+cMI8KfODAXmQtB30+yAgoV03Zb/bdptv+HqPT4eeecstJUxzEGYADt1mDq3uV7fQbNmo
+80bppU52JjztrJb7hBmXsXHPRRK6spQ1FCatqvu1ggZeXZpEifNsHeqCljt87ueXsQsORY
+pvhLzjTbTKZmjLDPuB+GxUNLEKh1ZNyAqKng==
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_ecdsa_384_openssh.key b/tests/test_ecdsa_384_openssh.key
new file mode 100644
index 0000000..8a160ce
--- /dev/null
+++ b/tests/test_ecdsa_384_openssh.key
@@ -0,0 +1,11 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABDwIHkBEZ
+75XuqQS6/7daAIAAAAEAAAAAEAAACIAAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlz
+dHAzODQAAABhBIch5LXTq/L/TWsTGG6dIktxD8DIMh7EfvoRmWsks6CuNDTvFvbQNtY4QO
+1mn5OXegHbS0M5DPIS++wpKGFP3suDEH08O35vZQasLNrL0tO2jyyEnzB2ZEx3PPYci811
+ygAAAOBKGxFl+JcMHjldOdTA9iwv88gxoelCwln/NATglUuyzHMLJwx53n8NLqrnHALvbz
+RHjyTmjU4dbSM9o9Vjhcvq+1aipjAQg2qx825f7T4BMoKyhLBS/qTg7RfyW/h0Sbequ1wl
+PhBfwhv0LUphRFsGdnOgrXWfZqWqxOP1WhJWIh1p+ja5va/Ii/+hD6RORQjvzbHTPJA53c
+OguISImkx0vdqPuFTLyclaC3eO4Px68Ki0b8cdyivExbAWLkNOtBdIAgeO7Egbruu4O5Sn
+I6bn1Kc+kZlWtO02IkwSA5DaKw==
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 30ffb56..308caa9 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -204,13 +204,13 @@ class GSSAPITest(KerberosTestCase):
c_token = token[0].Buffer
self.assertNotEquals(0, error)
- def test_2_gssapi_sspi_client(self):
+ def test_gssapi_sspi_client(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
self._gssapi_sspi_test()
- def test_3_gssapi_sspi_server(self):
+ def test_gssapi_sspi_server(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 0e60126..8d88545 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -38,6 +38,9 @@ PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgw
PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" # noqa
PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" # noqa
PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" # noqa
+PUB_RSA_2K_OPENSSH = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBoD46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3yyhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMqKOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkER" # noqa
+PUB_DSS_1K_OPENSSH = "ssh-dss AAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vWpNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3ir492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJzMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznNJKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQHKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXLsXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQ==" # noqa
+PUB_EC_384_OPENSSH = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBIch5LXTq/L/TWsTGG6dIktxD8DIMh7EfvoRmWsks6CuNDTvFvbQNtY4QO1mn5OXegHbS0M5DPIS++wpKGFP3suDEH08O35vZQasLNrL0tO2jyyEnzB2ZEx3PPYci811yg==" # noqa
FINGER_RSA = "1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5"
FINGER_DSS = "1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c"
@@ -45,6 +48,9 @@ FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60"
FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65"
FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e"
SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa
+FINGER_RSA_2K_OPENSSH = "2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6"
+FINGER_DSS_1K_OPENSSH = "1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82"
+FINGER_EC_384_OPENSSH = "384 72:14:df:c1:9a:c3:e6:0e:11:29:d6:32:18:7b:ea:9b"
RSA_PRIVATE_OUT = """\
-----BEGIN RSA PRIVATE KEY-----
@@ -437,6 +443,39 @@ class KeyTest(unittest.TestCase):
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
+ def test_load_openssh_format_RSA_key(self):
+ key = RSAKey.from_private_key_file(
+ _support("test_rsa_openssh.key"), b"television"
+ )
+ self.assertEqual("ssh-rsa", key.get_name())
+ self.assertEqual(PUB_RSA_2K_OPENSSH.split()[1], key.get_base64())
+ self.assertEqual(2048, key.get_bits())
+ exp_rsa = b(FINGER_RSA_2K_OPENSSH.split()[1].replace(":", ""))
+ my_rsa = hexlify(key.get_fingerprint())
+ self.assertEqual(exp_rsa, my_rsa)
+
+ def test_load_openssh_format_DSS_key(self):
+ key = DSSKey.from_private_key_file(
+ _support("test_dss_openssh.key"), b"television"
+ )
+ self.assertEqual("ssh-dss", key.get_name())
+ self.assertEqual(PUB_DSS_1K_OPENSSH.split()[1], key.get_base64())
+ self.assertEqual(1024, key.get_bits())
+ exp_rsa = b(FINGER_DSS_1K_OPENSSH.split()[1].replace(":", ""))
+ my_rsa = hexlify(key.get_fingerprint())
+ self.assertEqual(exp_rsa, my_rsa)
+
+ def test_load_openssh_format_EC_key(self):
+ key = ECDSAKey.from_private_key_file(
+ _support("test_ecdsa_384_openssh.key"), b"television"
+ )
+ self.assertEqual("ecdsa-sha2-nistp384", key.get_name())
+ self.assertEqual(PUB_EC_384_OPENSSH.split()[1], key.get_base64())
+ self.assertEqual(384, key.get_bits())
+ exp_fp = b(FINGER_EC_384_OPENSSH.split()[1].replace(":", ""))
+ my_fp = hexlify(key.get_fingerprint())
+ self.assertEqual(exp_fp, my_fp)
+
def test_salt_size(self):
# Read an existing encrypted private key
file_ = _support("test_rsa_password.key")
@@ -455,6 +494,10 @@ class KeyTest(unittest.TestCase):
finally:
os.remove(newfile)
+ def test_load_openssh_format_RSA_nopad(self):
+ # check just not exploding with 'Invalid key'
+ RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key"))
+
def test_stringification(self):
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3
diff --git a/tests/test_proxy.py b/tests/test_proxy.py
new file mode 100644
index 0000000..2e0b0e5
--- /dev/null
+++ b/tests/test_proxy.py
@@ -0,0 +1,144 @@
+import signal
+import socket
+
+from mock import patch
+from pytest import raises
+
+from paramiko import ProxyCommand, ProxyCommandFailure
+
+
+class TestProxyCommand(object):
+ @patch("paramiko.proxy.subprocess")
+ def test_init_takes_command_string(self, subprocess):
+ ProxyCommand(command_line="do a thing")
+ subprocess.Popen.assert_called_once_with(
+ ["do", "a", "thing"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ bufsize=0,
+ )
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_send_writes_to_process_stdin_returning_length(self, Popen):
+ proxy = ProxyCommand("hi")
+ written = proxy.send(b"data")
+ Popen.return_value.stdin.write.assert_called_once_with(b"data")
+ assert written == len(b"data")
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_send_raises_ProxyCommandFailure_on_error(self, Popen):
+ Popen.return_value.stdin.write.side_effect = IOError(0, "whoops")
+ with raises(ProxyCommandFailure) as info:
+ ProxyCommand("hi").send("data")
+ assert info.value.command == "hi"
+ assert info.value.error == "whoops"
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_reads_from_process_stdout_returning_bytes(
+ self, select, os_read, Popen
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ # Intentionally returning <5 at a time sometimes
+ os_read.side_effect = [b"was", b"te", b"of ti", b"me"]
+ proxy = ProxyCommand("hi")
+ data = proxy.recv(5)
+ assert data == b"waste"
+ assert [x[0] for x in os_read.call_args_list] == [
+ (fileno, 5),
+ (fileno, 2),
+ ]
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_returns_buffer_on_timeout_if_any_read(
+ self, select, os_read, Popen
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ os_read.side_effect = [b"was", socket.timeout]
+ proxy = ProxyCommand("hi")
+ data = proxy.recv(5)
+ assert data == b"was" # not b"waste"
+ assert os_read.call_args[0] == (fileno, 2)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_raises_timeout_if_nothing_read(self, select, os_read, Popen):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ os_read.side_effect = socket.timeout
+ proxy = ProxyCommand("hi")
+ with raises(socket.timeout):
+ proxy.recv(5)
+ assert os_read.call_args[0] == (fileno, 5)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_raises_ProxyCommandFailure_on_non_timeout_error(
+ self, select, os_read, Popen
+ ):
+ select.return_value = [Popen.return_value.stdout], None, None
+ os_read.side_effect = IOError(0, "whoops")
+ with raises(ProxyCommandFailure) as info:
+ ProxyCommand("hi").recv(5)
+ assert info.value.command == "hi"
+ assert info.value.error == "whoops"
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.kill")
+ def test_close_kills_subprocess(self, os_kill, Popen):
+ proxy = ProxyCommand("hi")
+ proxy.close()
+ os_kill.assert_called_once_with(Popen.return_value.pid, signal.SIGTERM)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_closed_exposes_whether_subprocess_has_exited(self, Popen):
+ proxy = ProxyCommand("hi")
+ Popen.return_value.returncode = None
+ assert proxy.closed is False
+ assert proxy._closed is False
+ Popen.return_value.returncode = 0
+ assert proxy.closed is True
+ assert proxy._closed is True
+
+ @patch("paramiko.proxy.time.time")
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_timeout_affects_whether_timeout_is_raised(
+ self, select, os_read, Popen, time
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ # Base case: None timeout means no timing out
+ os_read.return_value = b"meh"
+ proxy = ProxyCommand("yello")
+ assert proxy.timeout is None
+ # Implicit 'no raise' check
+ assert proxy.recv(3) == b"meh"
+ # Use settimeout to set timeout, and it is honored
+ time.side_effect = [0, 10] # elapsed > 7
+ proxy = ProxyCommand("ohnoz")
+ proxy.settimeout(7)
+ assert proxy.timeout == 7
+ with raises(socket.timeout):
+ proxy.recv(3)
+
+ @patch("paramiko.proxy.subprocess", new=None)
+ @patch("paramiko.proxy.subprocess_import_error", new=ImportError("meh"))
+ def test_raises_subprocess_ImportErrors_at_runtime(self):
+ # Not an ideal test, but I don't know of a non-bad way to fake out
+ # module-time ImportErrors. So we mock the symptoms. Meh!
+ with raises(ImportError) as info:
+ ProxyCommand("hi!!!")
+ assert str(info.value) == "meh"
diff --git a/tests/test_rsa_openssh.key b/tests/test_rsa_openssh.key
new file mode 100644
index 0000000..afc15f1
--- /dev/null
+++ b/tests/test_rsa_openssh.key
@@ -0,0 +1,28 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+ b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABD0R3hOFS
+ FMb2SJeo5h8QPNAAAAEAAAAAEAAAEXAAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX
+ 0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBo
+ D46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3y
+ yhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0
+ YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMq
+ KOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkERAAAD0JnzCJYfDeiUQ6
+ 9LOAb6/NnhKvFjCdBYal60MfLcLBHvzHLJvTneQ4f1Vknq8xEVmRba7SDSfwaEybP/1FsP
+ SGH6FNKA5gKllemgmcaUVr3wtNPtjX4WgsyHcwCRgHmOiyNrUj0OZR5wbZabHIIyirl4wa
+ LBz8Jb3GalKEagtyWsBKDCKHCFNzh8xmsT1SWhnC7baRyC8e3krQm9hGbNhpj6Q5AtN3ql
+ wBVamUp0eKxkt70mKBKI4v3DR8KqrEndeK6d0cegVEkE67fqa99a5J3uSDC8mglKrHiKEs
+ dU1dh/bOF/H3aFpINlRwvlZ95Opby7rG0BHgbZONq0+VUnABVzNTM5Xd5UKjjCF28CrQBf
+ XS6WeHeUx2zHtOmL1xdePk+Bii+SSUl3pLa4SDwX4nV95cSPx8vMm8dJEruxad6+MPoSuy
+ Oyho89jqUTSgC/RPejuTgrnB3WbzE5SJb+V3zMata0J1wxbNfYKG9U+VucUZhP4+jzfNqH
+ B/v8JqtuxnqR8NjPsK2+8wJxebL2KVNjKOm//6P3KSDsavpscGpVWOM06zUlwWCB26W3pP
+ X/+xO9aR5wiBteFKoJG1waziIjqhOJSmvq+I/texUKEUd/eEFNt10Ubc0zy0sRYVN8rIRJ
+ masQzCYuUylDzCa4ar1s4qngBZzWL2PRkPuXuhoHuT0J5no174GR6+6EAYZZhnq0tkYrhZ
+ Ar0tQ4CDlI235a3MPHzvABuwYuWys1tBuLAb+6Gc6CmCiQ+mhojfQUBYG5T65iRFA5UQsH
+ O1RLEC3yasxGcBI6d0J/fwOP/YLktNu3AeUumr0N9Xgf02DlBNwd+4GOI0LcQvl/3J8ppo
+ bamTppKPEZ2d32VNEO+Z6Zx5DlIVm5gDeMvIvdwap445VnhL3ZZH2NCkAcXM9+0WH+Quas
+ JCAMgPYiP9FzF+8Onmj2OmhgIVj/9eanhS3/GLrRC4xCvER2V7PwgB0I5qY110BPEttDyo
+ IvYE51kvtdW447SK7HZywJnkyw2RNm+29dvWJJwSQckUHuZkXEtmEPk0ePL3yf2NH5XYJc
+ pXX6Zac0KemCPIHr8l7GogE4Rb2BBTqddkegb9piz6QTAPcQnn+GuMFG06IBhUrgcMEQ8x
+ UOXYUUrT5HvSxWUcgaYH1nfC3bTWmDaodw8/HQKyF6c44rujO2s2NLFOCAyQMUNdhh3lfD
+ yHYLO7xYkP6xzzkpk+2lwBoeYdQdAwlKN/XqC8ZhBfwTdem/1hh1BpQJFbbFftWxU8gxxi
+ iuI+vmlsuIsxKoGCq8YXuophx62lo=
+ -----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_rsa_openssh_nopad.key b/tests/test_rsa_openssh_nopad.key
new file mode 100644
index 0000000..61ac1b1
--- /dev/null
+++ b/tests/test_rsa_openssh_nopad.key
@@ -0,0 +1,27 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABFwAAAAdzc2gtcn
+NhAAAAAwEAAQAAAQEAnyMwWSwrbJxxQZWMJO5xR6eAA9De4t3GViqDRaQt/BgsvzZ14SUz
+aOL/A370fKxhx/JLIOOGA0o5B0/ct+CL7XFqMi5r5+iA9VcIeYKKtoAkrEvRnagNW0WVWx
+thTnE01g8Pb7fDqzI2cBuBNZ2vGNm2m4UTGC8/kl/0ES1V3KqA7lPlTrkTYg9L/ornvVHc
+c8gEbMwx9XXVRzbWiuDE176ojrudY9CZduVSOgW+HK3rKkqLBs/91jv0zUK0oqTQBLR7E2
+V2GWPDU4BjlHTtYr0jpKOGDr1DLu4+NiD/mX+tGMdH6ehbDii0kXmOUaZjs4OxuK3XA/gi
+KZLdj1jQQwAAA7iNnvAVjZ7wFQAAAAdzc2gtcnNhAAABAQCfIzBZLCtsnHFBlYwk7nFHp4
+AD0N7i3cZWKoNFpC38GCy/NnXhJTNo4v8DfvR8rGHH8ksg44YDSjkHT9y34IvtcWoyLmvn
+6ID1Vwh5goq2gCSsS9GdqA1bRZVbG2FOcTTWDw9vt8OrMjZwG4E1na8Y2babhRMYLz+SX/
+QRLVXcqoDuU+VOuRNiD0v+iue9UdxzyARszDH1ddVHNtaK4MTXvqiOu51j0Jl25VI6Bb4c
+resqSosGz/3WO/TNQrSipNAEtHsTZXYZY8NTgGOUdO1ivSOko4YOvUMu7j42IP+Zf60Yx0
+fp6FsOKLSReY5RpmOzg7G4rdcD+CIpkt2PWNBDAAAAAwEAAQAAAQEAnmMbn+VCYxth7fC2
+R5u6y6J+201sSUiKOwCdHxdFXX+CKd4+fRPVkzM6tXQKSnwX5jXVaKqLm4KoOArYl3q6Sl
+1zYParF2plz8oL+URgYzwvQ/1CaDP29zzOZptdwgESoWrj5kF0UlPrsrDtbTvAJm+qPCe6
+1XtRPpKaDO6eYr0PM2QTElZy3mDBUBvu816LdG/ZtnB9g5UsocT5mmhpHTHdjrpwNu5TBe
+ACVodDn5Fu66OlrrnQi4IPCAWKJ1YuzEkZqLhs1L3oMHACsmzrLjzW74SjY4kWTTvGiC6i
+tDoycycThk9EGLGNso99Q1fe84/OZUff7aI3yK9KvLL7oQAAAIEAh2+XrJXSBx/v9E3aJH
+ncgQH1snXr7LcSRqcWicHdbm8JsOTT3TkyXHGlSZ2rr/Y0u5V1ZSO6roJLrAHsDJzx0x0U
+xE/5mpzhD+yIKQwnWkZFLzYEnYDFdXDMzmghUIik9AW7n9dtS8UtVFGaL6Vs2YCOuLqeT9
+nZUkm3UUZ+7QIAAACBAM23DFjQ0/Op2ri7fJA2qFBdXqoJdNHuyYEIrKbB6XaaSUz52+IB
+MbccxEz3vPsHh69tZoJ+xZNbFJe9wdmbF+DQpoukHkJnzpk/pUq8LjQMzZfwv41X8zqaq4
+AOA7g27Rk8aKewhCXjhkr0hHEaSiuqIIindFaFti5sQMi2mtkXAAAAgQDGCXkpuKZK61p9
+L6G5yZSQBCgVtm0iQEbyDXWHjy/GqLtxJjqdyaRK57hXGjbzgJJraSy+sNP9uv2QOvyZvB
+3XaPWwUYVQ34WyibCqqUaPiHxX7T1lZV+asbwgbmSqYtH5dUEJ8zT572mCwxnRjX63PwDo
+5vBbR/qAW5lvRYsltQAAAAFh
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_util.py b/tests/test_util.py
index 465e75b..8ce260d 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -28,32 +28,10 @@ import unittest
import paramiko
import paramiko.util
-from paramiko import SSHConfig
-from paramiko.util import lookup_ssh_host_config as host_config, safe_string
-from paramiko.py3compat import StringIO, byte_ord
+from paramiko.util import safe_string
+from paramiko.py3compat import byte_ord
-# Note some lines in this configuration have trailing spaces on purpose
-test_config_file = """\
-Host *
- User robey
- IdentityFile =~/.ssh/id_rsa
-
-# comment
-Host *.example.com
- \tUser bjork
-Port=3333
-Host *
-"""
-
-dont_strip_whitespace_please = "\t \t Crazy something dumb "
-
-test_config_file += dont_strip_whitespace_please
-test_config_file += """
-Host spoo.example.com
-Crazy something else
-"""
-
test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
@@ -65,150 +43,74 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
class UtilTest(unittest.TestCase):
- def test_import(self):
+ def test_imports(self):
"""
verify that all the classes can be imported from paramiko.
"""
- symbols = paramiko.__all__
- self.assertTrue("Transport" in symbols)
- self.assertTrue("SSHClient" in symbols)
- self.assertTrue("MissingHostKeyPolicy" in symbols)
- self.assertTrue("AutoAddPolicy" in symbols)
- self.assertTrue("RejectPolicy" in symbols)
- self.assertTrue("WarningPolicy" in symbols)
- self.assertTrue("SecurityOptions" in symbols)
- self.assertTrue("SubsystemHandler" in symbols)
- self.assertTrue("Channel" in symbols)
- self.assertTrue("RSAKey" in symbols)
- self.assertTrue("DSSKey" in symbols)
- self.assertTrue("Message" in symbols)
- self.assertTrue("SSHException" in symbols)
- self.assertTrue("AuthenticationException" in symbols)
- self.assertTrue("PasswordRequiredException" in symbols)
- self.assertTrue("BadAuthenticationType" in symbols)
- self.assertTrue("ChannelException" in symbols)
- self.assertTrue("SFTP" in symbols)
- self.assertTrue("SFTPFile" in symbols)
- self.assertTrue("SFTPHandle" in symbols)
- self.assertTrue("SFTPClient" in symbols)
- self.assertTrue("SFTPServer" in symbols)
- self.assertTrue("SFTPError" in symbols)
- self.assertTrue("SFTPAttributes" in symbols)
- self.assertTrue("SFTPServerInterface" in symbols)
- self.assertTrue("ServerInterface" in symbols)
- self.assertTrue("BufferedFile" in symbols)
- self.assertTrue("Agent" in symbols)
- self.assertTrue("AgentKey" in symbols)
- self.assertTrue("HostKeys" in symbols)
- self.assertTrue("SSHConfig" in symbols)
- self.assertTrue("util" in symbols)
-
- def test_parse_config(self):
- global test_config_file
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- config._config,
- [
- {"host": ["*"], "config": {}},
- {
- "host": ["*"],
- "config": {
- "identityfile": ["~/.ssh/id_rsa"],
- "user": "robey",
- },
- },
- {
- "host": ["*.example.com"],
- "config": {"user": "bjork", "port": "3333"},
- },
- {"host": ["*"], "config": {"crazy": "something dumb"}},
- {
- "host": ["spoo.example.com"],
- "config": {"crazy": "something else"},
- },
- ],
- )
-
- def test_host_config(self):
- global test_config_file
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
-
- for host, values in {
- "irc.danger.com": {
- "crazy": "something dumb",
- "hostname": "irc.danger.com",
- "user": "robey",
- },
- "irc.example.com": {
- "crazy": "something dumb",
- "hostname": "irc.example.com",
- "user": "robey",
- "port": "3333",
- },
- "spoo.example.com": {
- "crazy": "something dumb",
- "hostname": "spoo.example.com",
- "user": "robey",
- "port": "3333",
- },
- }.items():
- values = dict(
- values,
- hostname=host,
- identityfile=[os.path.expanduser("~/.ssh/id_rsa")],
- )
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
+ for name in (
+ "Agent",
+ "AgentKey",
+ "AuthenticationException",
+ "AutoAddPolicy",
+ "BadAuthenticationType",
+ "BufferedFile",
+ "Channel",
+ "ChannelException",
+ "ConfigParseError",
+ "CouldNotCanonicalize",
+ "DSSKey",
+ "HostKeys",
+ "Message",
+ "MissingHostKeyPolicy",
+ "PasswordRequiredException",
+ "RSAKey",
+ "RejectPolicy",
+ "SFTP",
+ "SFTPAttributes",
+ "SFTPClient",
+ "SFTPError",
+ "SFTPFile",
+ "SFTPHandle",
+ "SFTPServer",
+ "SFTPServerInterface",
+ "SSHClient",
+ "SSHConfig",
+ "SSHConfigDict",
+ "SSHException",
+ "SecurityOptions",
+ "ServerInterface",
+ "SubsystemHandler",
+ "Transport",
+ "WarningPolicy",
+ "util",
+ ):
+ assert name in paramiko.__all__
def test_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(
sha1, b"ABCDEFGH", "This is my secret passphrase.", 64
)
hex = "".join(["%02x" % byte_ord(c) for c in x])
- self.assertEqual(
- hex,
- "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b", # noqa
- )
+ hexpected = "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b" # noqa
+ assert hex == hexpected
def test_host_keys(self):
with open("hostfile.temp", "w") as f:
f.write(test_hosts_file)
try:
hostdict = paramiko.util.load_host_keys("hostfile.temp")
- self.assertEqual(2, len(hostdict))
- self.assertEqual(1, len(list(hostdict.values())[0]))
- self.assertEqual(1, len(list(hostdict.values())[1]))
+ assert 2 == len(hostdict)
+ assert 1 == len(list(hostdict.values())[0])
+ assert 1 == len(list(hostdict.values())[1])
fp = hexlify(
hostdict["secure.example.com"]["ssh-rsa"].get_fingerprint()
).upper()
- self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
+ assert b"E6684DB30E109B67B70FF1DC5C7F1363" == fp
finally:
os.unlink("hostfile.temp")
- def test_host_config_expose_issue_33(self):
- test_config_file = """
-Host www13.*
- Port 22
-
-Host *.example.com
- Port 2222
-
-Host *
- Port 3333
- """
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- host = "www13.example.com"
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- {"hostname": host, "port": "22"},
- )
-
def test_eintr_retry(self):
- self.assertEqual("foo", paramiko.util.retry_on_signal(lambda: "foo"))
+ assert "foo" == paramiko.util.retry_on_signal(lambda: "foo")
# Variables that are set by raises_intr
intr_errors_remaining = [3]
@@ -221,8 +123,8 @@ Host *
raise IOError(errno.EINTR, "file", "interrupted system call")
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
- self.assertEqual(0, intr_errors_remaining[0])
- self.assertEqual(4, call_count[0])
+ assert 0 == intr_errors_remaining[0]
+ assert 4 == call_count[0]
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, "file", "file not found")
@@ -240,269 +142,10 @@ Host *
lambda: paramiko.util.retry_on_signal(raises_other_exception),
)
- def test_proxycommand_config_equals_parsing(self):
- """
- ProxyCommand should not split on equals signs within the value.
- """
- conf = """
-Host space-delimited
- ProxyCommand foo bar=biz baz
-
-Host equals-delimited
- ProxyCommand=foo bar=biz baz
-"""
- f = StringIO(conf)
- config = paramiko.util.parse_ssh_config(f)
- for host in ("space-delimited", "equals-delimited"):
- self.assertEqual(
- host_config(host, config)["proxycommand"], "foo bar=biz baz"
- )
-
- def test_proxycommand_interpolation(self):
- """
- ProxyCommand should perform interpolation on the value
- """
- config = paramiko.util.parse_ssh_config(
- StringIO(
- """
-Host specific
- Port 37
- ProxyCommand host %h port %p lol
-
-Host portonly
- Port 155
-
-Host *
- Port 25
- ProxyCommand host %h port %p
-"""
- )
- )
- for host, val in (
- ("foo.com", "host foo.com port 25"),
- ("specific", "host specific port 37 lol"),
- ("portonly", "host portonly port 155"),
- ):
- self.assertEqual(host_config(host, config)["proxycommand"], val)
-
- def test_proxycommand_tilde_expansion(self):
- """
- Tilde (~) should be expanded inside ProxyCommand
- """
- config = paramiko.util.parse_ssh_config(
- StringIO(
- """
-Host test
- ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p
-"""
- )
- )
- self.assertEqual(
- "ssh -F %s/.ssh/test_config bastion nc test 22"
- % os.path.expanduser("~"),
- host_config("test", config)["proxycommand"],
- )
-
- def test_host_config_test_negation(self):
- test_config_file = """
-Host www13.* !*.example.com
- Port 22
-
-Host *.example.com !www13.*
- Port 2222
-
-Host www13.*
- Port 8080
-
-Host *
- Port 3333
- """
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- host = "www13.example.com"
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- {"hostname": host, "port": "8080"},
- )
-
- def test_host_config_test_proxycommand(self):
- test_config_file = """
-Host proxy-with-equal-divisor-and-space
-ProxyCommand = foo=bar
-
-Host proxy-with-equal-divisor-and-no-space
-ProxyCommand=foo=bar
-
-Host proxy-without-equal-divisor
-ProxyCommand foo=bar:%h-%p
- """
- for host, values in {
- "proxy-with-equal-divisor-and-space": {
- "hostname": "proxy-with-equal-divisor-and-space",
- "proxycommand": "foo=bar",
- },
- "proxy-with-equal-divisor-and-no-space": {
- "hostname": "proxy-with-equal-divisor-and-no-space",
- "proxycommand": "foo=bar",
- },
- "proxy-without-equal-divisor": {
- "hostname": "proxy-without-equal-divisor",
- "proxycommand": "foo=bar:proxy-without-equal-divisor-22",
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_host_config_test_identityfile(self):
- test_config_file = """
-
-IdentityFile id_dsa0
-
-Host *
-IdentityFile id_dsa1
-
-Host dsa2
-IdentityFile id_dsa2
-
-Host dsa2*
-IdentityFile id_dsa22
- """
- for host, values in {
- "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]},
- "dsa2": {
- "hostname": "dsa2",
- "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"],
- },
- "dsa22": {
- "hostname": "dsa22",
- "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_config_addressfamily_and_lazy_fqdn(self):
- """
- Ensure the code path honoring non-'all' AddressFamily doesn't asplode
- """
- test_config = """
-AddressFamily inet
-IdentityFile something_%l_using_fqdn
-"""
- config = paramiko.util.parse_ssh_config(StringIO(test_config))
- assert config.lookup(
- "meh"
- ) # will die during lookup() if bug regresses
-
def test_clamp_value(self):
- self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
- self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
- self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
-
- def test_config_dos_crlf_succeeds(self):
- config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
- config = paramiko.SSHConfig()
- config.parse(config_file)
- self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
-
- def test_get_hostnames(self):
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"}
- )
-
- def test_quoted_host_names(self):
- test_config_file = """\
-Host "param pam" param "pam"
- Port 1111
-
-Host "param2"
- Port 2222
-
-Host param3 parara
- Port 3333
-
-Host param4 "p a r" "p" "par" para
- Port 4444
-"""
- res = {
- "param pam": {"hostname": "param pam", "port": "1111"},
- "param": {"hostname": "param", "port": "1111"},
- "pam": {"hostname": "pam", "port": "1111"},
- "param2": {"hostname": "param2", "port": "2222"},
- "param3": {"hostname": "param3", "port": "3333"},
- "parara": {"hostname": "parara", "port": "3333"},
- "param4": {"hostname": "param4", "port": "4444"},
- "p a r": {"hostname": "p a r", "port": "4444"},
- "p": {"hostname": "p", "port": "4444"},
- "par": {"hostname": "par", "port": "4444"},
- "para": {"hostname": "para", "port": "4444"},
- }
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- for host, values in res.items():
- assert paramiko.util.lookup_ssh_host_config(host, config) == values
-
- def test_quoted_params_in_config(self):
- test_config_file = """\
-Host "param pam" param "pam"
- IdentityFile id_rsa
-
-Host "param2"
- IdentityFile "test rsa key"
-
-Host param3 parara
- IdentityFile id_rsa
- IdentityFile "test rsa key"
-"""
- res = {
- "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]},
- "param": {"hostname": "param", "identityfile": ["id_rsa"]},
- "pam": {"hostname": "pam", "identityfile": ["id_rsa"]},
- "param2": {"hostname": "param2", "identityfile": ["test rsa key"]},
- "param3": {
- "hostname": "param3",
- "identityfile": ["id_rsa", "test rsa key"],
- },
- "parara": {
- "hostname": "parara",
- "identityfile": ["id_rsa", "test rsa key"],
- },
- }
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- for host, values in res.items():
- assert paramiko.util.lookup_ssh_host_config(host, config) == values
-
- def test_quoted_host_in_config(self):
- conf = SSHConfig()
- correct_data = {
- "param": ["param"],
- '"param"': ["param"],
- "param pam": ["param", "pam"],
- '"param" "pam"': ["param", "pam"],
- '"param" pam': ["param", "pam"],
- 'param "pam"': ["param", "pam"],
- 'param "pam" p': ["param", "pam", "p"],
- '"param" pam "p"': ["param", "pam", "p"],
- '"pa ram"': ["pa ram"],
- '"pa ram" pam': ["pa ram", "pam"],
- 'param "p a m"': ["param", "p a m"],
- }
- incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a']
- for host, values in correct_data.items():
- assert conf._get_hosts(host) == values
- for host in incorrect_data:
- self.assertRaises(Exception, conf._get_hosts, host)
+ assert 32768 == paramiko.util.clamp_value(32767, 32768, 32769)
+ assert 32767 == paramiko.util.clamp_value(32767, 32765, 32769)
+ assert 32769 == paramiko.util.clamp_value(32767, 32770, 32769)
def test_safe_string(self):
vanilla = b"vanilla"
@@ -515,54 +158,3 @@ Host param3 parara
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
assert safe_has_bytes == expected_bytes, msg
-
- def test_proxycommand_none_issue_418(self):
- test_config_file = """
-Host proxycommand-standard-none
- ProxyCommand None
-
-Host proxycommand-with-equals-none
- ProxyCommand=None
- """
- for host, values in {
- "proxycommand-standard-none": {
- "hostname": "proxycommand-standard-none"
- },
- "proxycommand-with-equals-none": {
- "hostname": "proxycommand-with-equals-none"
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_proxycommand_none_masking(self):
- # Re: https://github.com/paramiko/paramiko/issues/670
- source_config = """
-Host specific-host
- ProxyCommand none
-
-Host other-host
- ProxyCommand other-proxy
-
-Host *
- ProxyCommand default-proxy
-"""
- config = paramiko.SSHConfig()
- config.parse(StringIO(source_config))
- # When bug is present, the full stripping-out of specific-host's
- # ProxyCommand means it actually appears to pick up the default
- # ProxyCommand value instead, due to cascading. It should (for
- # backwards compatibility reasons in 1.x/2.x) appear completely blank,
- # as if the host had no ProxyCommand whatsoever.
- # Threw another unrelated host in there just for sanity reasons.
- self.assertFalse("proxycommand" in config.lookup("specific-host"))
- self.assertEqual(
- config.lookup("other-host")["proxycommand"], "other-proxy"
- )
- self.assertEqual(
- config.lookup("some-random-host")["proxycommand"], "default-proxy"
- )
diff --git a/tests/util.py b/tests/util.py
index cdc835c..9057f51 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -9,8 +9,15 @@ from paramiko.py3compat import builtins
from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
+tests_dir = dirname(realpath(__file__))
+
+
def _support(filename):
- return join(dirname(realpath(__file__)), filename)
+ return join(tests_dir, filename)
+
+
+def _config(name):
+ return join(tests_dir, "configs", name)
needs_gssapi = pytest.mark.skipif(