1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
1 |
#!/usr/bin/env python
|
2 |
"""Validate ability of the user to import and remove ssh keys"""
|
|
3 |
||
4 |
from __future__ import print_function |
|
5 |
||
6 |
import argparse |
|
7 |
import logging |
|
8 |
import re |
|
9 |
import subprocess |
|
10 |
import sys |
|
11 |
||
12 |
from deploy_stack import ( |
|
13 |
BootstrapManager, |
|
14 |
)
|
|
15 |
from utility import ( |
|
16 |
add_basic_testing_arguments, |
|
17 |
configure_logging, |
|
18 |
)
|
|
19 |
||
20 |
||
21 |
__metaclass__ = type |
|
22 |
||
23 |
||
24 |
log = logging.getLogger("assess_ssh_keys") |
|
25 |
||
26 |
||
27 |
class SSHKey: |
|
28 |
||
29 |
def __init__(self, fingerprint, comment): |
|
30 |
self.fingerprint = fingerprint |
|
31 |
self.comment = comment |
|
32 |
||
33 |
@classmethod
|
|
34 |
def from_fingerprint_line(cls, line): |
|
35 |
fingerprint, comment = line.split(" ", 1) |
|
36 |
if False: |
|
37 |
raise ValueError("Not an ssh fingerprint: {!r}".format(line)) |
|
38 |
if comment.startswith("(") and comment.endswith(")"): |
|
39 |
comment = comment[1:-1] |
|
40 |
return cls(fingerprint, comment) |
|
41 |
||
42 |
def __str__(self): |
|
43 |
return "{} ({})".format(self.fingerprint, self.comment) |
|
44 |
||
45 |
def __repr__(self): |
|
46 |
return "{}({}, {})".format( |
|
47 |
self.__class__.__name__, self.fingerprint, self.comment) |
|
48 |
||
49 |
||
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
50 |
_KEYS_LEAD_MODEL = "Keys used in model: " |
51 |
_KEYS_LEAD_ADMIN = "Keys for user admin:" |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
52 |
|
53 |
||
54 |
def parse_ssh_keys_output(output, expected_model): |
|
55 |
"""Parse and validate output from `juju ssh-keys` command."""
|
|
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
56 |
if not output.startswith((_KEYS_LEAD_MODEL, _KEYS_LEAD_ADMIN)): |
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
57 |
raise AssertionError("Invalid ssh-keys output: {!r}".format(output)) |
58 |
lines = output.splitlines() |
|
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
59 |
model = lines[0].split(_KEYS_LEAD_MODEL, 1)[-1] |
60 |
if model != _KEYS_LEAD_ADMIN and expected_model != model: |
|
61 |
raise AssertionError("Expected keys for model: {} got: {}".format( |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
62 |
expected_model, model)) |
63 |
return [SSHKey.from_fingerprint_line(line) for line in lines[1:]] |
|
64 |
||
65 |
||
1553.2.3
by Martin Packman
Fix lint and tweak expected failure handling |
66 |
def expect_juju_failure(fail_pattern, method, *args, **kwargs): |
67 |
"""Assert method fails with expected output included."""
|
|
68 |
fail_re = re.compile(fail_pattern, re.MULTILINE) |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
69 |
try: |
70 |
output = method(*args, **kwargs) |
|
71 |
except subprocess.CalledProcessError as e: |
|
1553.2.3
by Martin Packman
Fix lint and tweak expected failure handling |
72 |
# The errors go to stderr, but as the current behaviour is to not
|
73 |
# exit calls will have merged stderr into stdout, so check output.
|
|
74 |
if fail_re.search(e.output) is None: |
|
75 |
raise AssertionError( |
|
76 |
"Juju failed with output not matching: {!r} {!r}".format( |
|
77 |
e.output, fail_pattern)) |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
78 |
else: |
1553.2.3
by Martin Packman
Fix lint and tweak expected failure handling |
79 |
if fail_re.search(output) is None: |
80 |
raise AssertionError( |
|
81 |
"Juju did not fail with output matching: {!r} {!r}".format( |
|
82 |
output, fail_pattern)) |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
83 |
log.info("Error found in output but the juju process exited 0.") |
84 |
||
85 |
||
86 |
VALID_KEY = ( |
|
87 |
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC7ibpRhiMie+Ytu5XqSPrvuXol1LMVztWWS"
|
|
88 |
"Tuja0As95VvqoCBxyKMmtnROYGhwF2BUHdHD5HdrwJ5WpIxhh+APhBuI9fZ52YbFhcxU/NxQ1"
|
|
89 |
"y8xw2sfm8HH0DGeg3ssRWzFVUTJ4QOAkJzy2zxiK3BfwQr5W5UIDnAtMBv56J7E4DFe6skabn"
|
|
90 |
"dWxOP8JzLtNFr/w3p/yAh/Akv6eJus8fBCKNYYy1/A+sUAZc/+dZLxk5qtfXqwIMtxFtK39vf"
|
|
91 |
"BlvVU0tpMAPhaEb/Vzq7Zyj3nscPGjNXE2g7TUvhlKCA5tdjWbug9U2YqwowwYfz/RE3qvXfZ"
|
|
92 |
"GtNpuBvxaXWDgpp example-key"
|
|
93 |
)
|
|
94 |
||
95 |
||
96 |
def assert_has_full_key(client, key): |
|
97 |
output = client.ssh_keys(full=True) |
|
98 |
if key not in output.splitlines()[1:]: |
|
99 |
raise AssertionError( |
|
100 |
"Expected key not found:\nwant: {}\ngot: {}".format(key, output)) |
|
101 |
log.info("Found full key as expected") |
|
102 |
||
103 |
||
104 |
def assert_has_key_matching_comment(client, comment_pattern): |
|
105 |
log.info("Expecting key with comment matching: %r", comment_pattern) |
|
106 |
comment_re = re.compile(comment_pattern) |
|
107 |
found = False |
|
108 |
keys = parse_ssh_keys_output(client.ssh_keys(), client.env.environment) |
|
109 |
for key in keys: |
|
110 |
if comment_re.match(key.comment) is not None: |
|
111 |
found = True |
|
112 |
log.info("Matching key found: %s", key) |
|
113 |
# No break so all matches are logged
|
|
114 |
if not found: |
|
115 |
raise AssertionError( |
|
116 |
"No keys matching comment:\npattern: {!r}\nkeys: {}".format( |
|
117 |
comment_pattern, "\n".join(map(str, keys)))) |
|
118 |
||
119 |
||
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
120 |
def _assess_remove_internal_key(client, name): |
121 |
pattern = r'^cannot remove key id "{0}": may not delete internal key: {0}$' |
|
122 |
expect_juju_failure(pattern.format(name), client.remove_ssh_key, name) |
|
123 |
||
124 |
||
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
125 |
def assess_ssh_keys(client): |
126 |
initial_keys_output = client.ssh_keys() |
|
127 |
initial_keys = parse_ssh_keys_output( |
|
128 |
initial_keys_output, client.env.environment) |
|
129 |
log.info( |
|
130 |
"Initial keys in default model:\n%s", |
|
131 |
"\n".join(map(str, initial_keys))) |
|
132 |
||
133 |
log.info("Testing expected error when adding an invalid key") |
|
134 |
pattern = r'cannot add key "badness": invalid ssh key: badness$' |
|
135 |
expect_juju_failure(pattern, client.add_ssh_key, "badness") |
|
136 |
||
137 |
log.info("Testing success when adding a valid key") |
|
138 |
client.add_ssh_key(VALID_KEY) |
|
139 |
assert_has_full_key(client, VALID_KEY) |
|
140 |
||
141 |
log.info("Testing expected error when adding duplicate key") |
|
142 |
pattern = r'^cannot add key ".*": duplicate ssh key: .*$' |
|
143 |
expect_juju_failure(pattern, client.add_ssh_key, VALID_KEY) |
|
144 |
||
145 |
log.info("Testing success when importing keys from github") |
|
1673.1.1
by Martin Packman
Change users for imported keys to avoid duplicates |
146 |
client.import_ssh_key("gh:sinzui") |
147 |
assert_has_key_matching_comment(client, r'.*gh:sinzui') |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
148 |
|
149 |
log.info("Testing success when importing keys from launchpad") |
|
1673.1.1
by Martin Packman
Change users for imported keys to avoid duplicates |
150 |
client.import_ssh_key("lp:gz") |
151 |
assert_has_key_matching_comment(client, r'.*lp:gz') |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
152 |
|
153 |
log.info("Testing expected error when removing a non-existent key") |
|
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
154 |
pattern = r'^cannot {0} key id "{1}": invalid ssh key: {1}$'.format( |
155 |
"delete" if client.is_juju1x() else "remove", "no-such-key") |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
156 |
expect_juju_failure(pattern, client.remove_ssh_key, "no-such-key") |
157 |
||
1581.1.1
by Martin Packman
Make assess_ssh_keys 1.25 compatible and update for landed fix |
158 |
log.info("Testing expected error removing the juju internal keys") |
159 |
if client.is_juju1x(): |
|
160 |
log.info("...skipped on juju version %s", client.version) |
|
161 |
else: |
|
162 |
_assess_remove_internal_key(client, "juju-client-key") |
|
163 |
_assess_remove_internal_key(client, "juju-system-key") |
|
1553.2.1
by Martin Packman
Add start of new assess_ssh_keys script and fakejuju support |
164 |
|
165 |
log.info("TODO test behavior when multiple models are involved") |
|
166 |
log.info("TODO test removing keys by both comment and fingerprint") |
|
167 |
||
168 |
||
169 |
def parse_args(argv): |
|
170 |
"""Parse all arguments."""
|
|
171 |
parser = argparse.ArgumentParser(description="Test juju ssh key handling") |
|
172 |
add_basic_testing_arguments(parser) |
|
173 |
return parser.parse_args(argv) |
|
174 |
||
175 |
||
176 |
def main(argv=None): |
|
177 |
args = parse_args(argv) |
|
178 |
configure_logging(args.verbose) |
|
179 |
bs_manager = BootstrapManager.from_args(args) |
|
180 |
with bs_manager.booted_context(args.upload_tools): |
|
181 |
assess_ssh_keys(bs_manager.client) |
|
182 |
return 0 |
|
183 |
||
184 |
||
185 |
if __name__ == '__main__': |
|
186 |
sys.exit(main()) |