2
"""Validate ability of the user to import and remove ssh keys"""
4
from __future__ import print_function
12
from deploy_stack import (
16
add_basic_testing_arguments,
24
log = logging.getLogger("assess_ssh_keys")
29
def __init__(self, fingerprint, comment):
30
self.fingerprint = fingerprint
31
self.comment = comment
34
def from_fingerprint_line(cls, line):
35
fingerprint, comment = line.split(" ", 1)
37
raise ValueError("Not an ssh fingerprint: {!r}".format(line))
38
if comment.startswith("(") and comment.endswith(")"):
39
comment = comment[1:-1]
40
return cls(fingerprint, comment)
43
return "{} ({})".format(self.fingerprint, self.comment)
46
return "{}({}, {})".format(
47
self.__class__.__name__, self.fingerprint, self.comment)
50
_SSH_KEYS_LEAD = "Keys used in model: "
53
def parse_ssh_keys_output(output, expected_model):
54
"""Parse and validate output from `juju ssh-keys` command."""
55
if not output.startswith(_SSH_KEYS_LEAD):
56
raise AssertionError("Invalid ssh-keys output: {!r}".format(output))
57
lines = output.splitlines()
58
model = lines[0][len(_SSH_KEYS_LEAD):]
59
if expected_model != model:
60
log.warning("Expected keys for model: {} got: {}".format(
61
expected_model, model))
62
return [SSHKey.from_fingerprint_line(line) for line in lines[1:]]
65
def expect_juju_failure(fail_pattern, method, *args, **kwargs):
66
"""Assert method fails with expected output included."""
67
fail_re = re.compile(fail_pattern, re.MULTILINE)
69
output = method(*args, **kwargs)
70
except subprocess.CalledProcessError as e:
71
# The errors go to stderr, but as the current behaviour is to not
72
# exit calls will have merged stderr into stdout, so check output.
73
if fail_re.search(e.output) is None:
75
"Juju failed with output not matching: {!r} {!r}".format(
76
e.output, fail_pattern))
78
if fail_re.search(output) is None:
80
"Juju did not fail with output matching: {!r} {!r}".format(
81
output, fail_pattern))
82
log.info("Error found in output but the juju process exited 0.")
86
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC7ibpRhiMie+Ytu5XqSPrvuXol1LMVztWWS"
87
"Tuja0As95VvqoCBxyKMmtnROYGhwF2BUHdHD5HdrwJ5WpIxhh+APhBuI9fZ52YbFhcxU/NxQ1"
88
"y8xw2sfm8HH0DGeg3ssRWzFVUTJ4QOAkJzy2zxiK3BfwQr5W5UIDnAtMBv56J7E4DFe6skabn"
89
"dWxOP8JzLtNFr/w3p/yAh/Akv6eJus8fBCKNYYy1/A+sUAZc/+dZLxk5qtfXqwIMtxFtK39vf"
90
"BlvVU0tpMAPhaEb/Vzq7Zyj3nscPGjNXE2g7TUvhlKCA5tdjWbug9U2YqwowwYfz/RE3qvXfZ"
91
"GtNpuBvxaXWDgpp example-key"
95
def assert_has_full_key(client, key):
96
output = client.ssh_keys(full=True)
97
if key not in output.splitlines()[1:]:
99
"Expected key not found:\nwant: {}\ngot: {}".format(key, output))
100
log.info("Found full key as expected")
103
def assert_has_key_matching_comment(client, comment_pattern):
104
log.info("Expecting key with comment matching: %r", comment_pattern)
105
comment_re = re.compile(comment_pattern)
107
keys = parse_ssh_keys_output(client.ssh_keys(), client.env.environment)
109
if comment_re.match(key.comment) is not None:
111
log.info("Matching key found: %s", key)
112
# No break so all matches are logged
114
raise AssertionError(
115
"No keys matching comment:\npattern: {!r}\nkeys: {}".format(
116
comment_pattern, "\n".join(map(str, keys))))
119
def assess_ssh_keys(client):
120
initial_keys_output = client.ssh_keys()
121
initial_keys = parse_ssh_keys_output(
122
initial_keys_output, client.env.environment)
124
"Initial keys in default model:\n%s",
125
"\n".join(map(str, initial_keys)))
127
log.info("Testing expected error when adding an invalid key")
128
pattern = r'cannot add key "badness": invalid ssh key: badness$'
129
expect_juju_failure(pattern, client.add_ssh_key, "badness")
131
log.info("Testing success when adding a valid key")
132
client.add_ssh_key(VALID_KEY)
133
assert_has_full_key(client, VALID_KEY)
135
log.info("Testing expected error when adding duplicate key")
136
pattern = r'^cannot add key ".*": duplicate ssh key: .*$'
137
expect_juju_failure(pattern, client.add_ssh_key, VALID_KEY)
139
log.info("Testing success when importing keys from github")
140
client.import_ssh_key("gh:jujubot")
141
assert_has_key_matching_comment(client, r'.*gh:jujubot')
143
log.info("Testing success when importing keys from launchpad")
144
client.import_ssh_key("lp:go-bot")
145
assert_has_key_matching_comment(client, r'.*lp:go-bot')
147
log.info("Testing expected error when removing a non-existent key")
149
r'^cannot remove key id "no-such-key": invalid ssh key: no-such-key$'
151
expect_juju_failure(pattern, client.remove_ssh_key, "no-such-key")
153
log.info("Testing CURRENTLY KNOWN ERROR with removing the juju-client-key")
154
pattern = r'^cannot remove key id "juju-client-key": .*: juju-client-key$'
155
expect_juju_failure(pattern, client.remove_ssh_key, "juju-client-key")
157
log.info("TODO test expected error removing the juju-system-key")
158
log.info("TODO test behavior when multiple models are involved")
159
log.info("TODO test removing keys by both comment and fingerprint")
162
def parse_args(argv):
163
"""Parse all arguments."""
164
parser = argparse.ArgumentParser(description="Test juju ssh key handling")
165
add_basic_testing_arguments(parser)
166
return parser.parse_args(argv)
170
args = parse_args(argv)
171
configure_logging(args.verbose)
172
bs_manager = BootstrapManager.from_args(args)
173
with bs_manager.booted_context(args.upload_tools):
174
assess_ssh_keys(bs_manager.client)
178
if __name__ == '__main__':