1
# Copyright 2012-2014 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Tests for the omshell.py file."""
6
from __future__ import (
17
from itertools import product
21
from textwrap import dedent
23
from maastesting.factory import factory
24
from maastesting.fakemethod import FakeMethod
25
from maastesting.fixtures import TempDirectory
26
from maastesting.testcase import MAASTestCase
31
from provisioningserver import omshell
32
import provisioningserver.omshell
33
from provisioningserver.omshell import (
38
from provisioningserver.utils.shell import ExternalProcessError
39
from testtools.matchers import (
45
class TestOmshell(MAASTestCase):
47
def test_initialisation(self):
48
server_address = factory.make_string()
49
shared_key = factory.make_string()
50
shell = Omshell(server_address, shared_key)
52
shell, MatchesStructure.byEquality(
53
server_address=server_address,
54
shared_key=shared_key))
56
def test_create_calls_omshell_correctly(self):
57
server_address = factory.make_string()
58
shared_key = factory.make_string()
59
ip_address = factory.getRandomIPAddress()
60
mac_address = factory.getRandomMACAddress()
61
shell = Omshell(server_address, shared_key)
63
# Instead of calling a real omshell, we'll just record the
64
# parameters passed to Popen.
65
recorder = FakeMethod(result=(0, "hardware-type"))
68
shell.create(ip_address, mac_address)
70
expected_script = dedent("""\
76
set hardware-address = {mac}
81
expected_script = expected_script.format(
82
server=server_address, key=shared_key, ip=ip_address,
85
# Check that the 'stdin' arg contains the correct set of
88
[1, (expected_script,)],
89
[recorder.call_count, recorder.extract_args()[0]])
91
def test_create_raises_when_omshell_fails(self):
92
# If the call to omshell doesn't result in output containing the
93
# magic string 'hardware-type' it means the set of commands
96
server_address = factory.make_string()
97
shared_key = factory.make_string()
98
ip_address = factory.getRandomIPAddress()
99
mac_address = factory.getRandomMACAddress()
100
shell = Omshell(server_address, shared_key)
102
# Fake a call that results in a failure with random output.
103
random_output = factory.make_string()
104
recorder = FakeMethod(result=(0, random_output))
105
shell._run = recorder
107
exc = self.assertRaises(
108
ExternalProcessError, shell.create, ip_address, mac_address)
109
self.assertEqual(random_output, exc.output)
111
def test_create_succeeds_when_host_map_already_exists(self):
112
# To omshell, creating the same host map twice is an error. But
113
# Omshell.create swallows the error and makes it look like
116
'ip': factory.getRandomIPAddress(),
117
'mac': factory.getRandomMACAddress(),
118
'hostname': factory.make_name('hostname')
120
shell = Omshell(factory.make_name('server'), factory.make_name('key'))
121
# This is the kind of error output we get if a host map has
122
# already been created.
123
error_output = dedent("""\
126
hardware-address = %(mac)s
127
name = "%(hostname)s"
129
can't open object: I/O error
132
hardware-address = %(mac)s
133
name = "%(hostname)s"
135
shell._run = Mock(return_value=(0, error_output))
136
shell.create(params['ip'], params['mac'])
137
# The test is that we get here without error.
140
def test_remove_calls_omshell_correctly(self):
141
server_address = factory.make_string()
142
shared_key = factory.make_string()
143
ip_address = factory.getRandomIPAddress()
144
shell = Omshell(server_address, shared_key)
146
# Instead of calling a real omshell, we'll just record the
147
# parameters passed to Popen.
148
recorder = FakeMethod(result=(0, "thing1\nthing2\nobj: <null>"))
149
shell._run = recorder
151
shell.remove(ip_address)
153
expected_script = dedent("""\
162
expected_script = expected_script.format(
163
server=server_address, key=shared_key, ip=ip_address)
165
# Check that the 'stdin' arg contains the correct set of
167
self.assertEqual([(expected_script,)], recorder.extract_args())
169
def test_remove_raises_when_omshell_fails(self):
170
# If the call to omshell doesn't result in output ending in the
171
# text 'obj: <null>' we can be fairly sure this operation
173
server_address = factory.make_string()
174
shared_key = factory.make_string()
175
ip_address = factory.getRandomIPAddress()
176
shell = Omshell(server_address, shared_key)
178
# Fake a call that results in a failure with random output.
179
random_output = factory.make_string()
180
recorder = FakeMethod(result=(0, random_output))
181
shell._run = recorder
183
exc = self.assertRaises(
184
subprocess.CalledProcessError, shell.remove, ip_address)
185
self.assertEqual(random_output, exc.output)
187
def test_remove_works_when_extraneous_blank_last_lines(self):
188
# Sometimes omshell puts blank lines after the 'obj: <null>' so
189
# we need to test that the code still works if that's the case.
190
server_address = factory.make_string()
191
shared_key = factory.make_string()
192
ip_address = factory.getRandomIPAddress()
193
shell = Omshell(server_address, shared_key)
195
# Fake a call that results in a something with our special output.
196
output = "\n> obj: <null>\n\n"
197
self.patch(shell, '_run').return_value = (0, output)
198
self.assertIsNone(shell.remove(ip_address))
200
def test_remove_works_when_extraneous_gt_char_present(self):
201
# Sometimes omshell puts a leading '>' character in responses.
202
# We need to test that the code still works if that's the case.
203
server_address = factory.make_string()
204
shared_key = factory.make_string()
205
ip_address = factory.getRandomIPAddress()
206
shell = Omshell(server_address, shared_key)
208
# Fake a call that results in a something with our special output.
209
output = "\n>obj: <null>\n>\n"
210
self.patch(shell, '_run').return_value = (0, output)
211
self.assertIsNone(shell.remove(ip_address))
213
def test_remove_works_when_object_already_removed(self):
214
server_address = factory.make_string()
215
shared_key = factory.make_string()
216
ip_address = factory.getRandomIPAddress()
217
shell = Omshell(server_address, shared_key)
219
output = "obj: <null>\nobj: host\ncan't open object: not found\n"
220
self.patch(shell, '_run').return_value = (0, output)
221
self.assertIsNone(shell.remove(ip_address))
224
class Test_generate_omapi_key(MAASTestCase):
225
"""Tests for omshell.generate_omapi_key"""
227
def test_generate_omapi_key_returns_a_key(self):
228
key = generate_omapi_key()
229
# Could test for != None here, but the keys end in == for a 512
230
# bit length key, so that's a better check that the script was
231
# actually run and produced output.
232
self.assertThat(key, EndsWith("=="))
234
def test_generate_omapi_key_leaves_no_temp_files(self):
235
tmpdir = self.useFixture(TempDirectory()).path
236
# Make mkdtemp() in omshell nest all directories within tmpdir.
237
self.patch(tempfile, 'tempdir', tmpdir)
239
self.assertEqual([], os.listdir(tmpdir))
241
def test_generate_omapi_key_raises_assertionerror_on_no_output(self):
242
self.patch(omshell, 'call_dnssec_keygen', FakeMethod())
243
self.assertRaises(AssertionError, generate_omapi_key)
245
def test_generate_omapi_key_raises_assertionerror_on_bad_output(self):
246
def returns_junk(tmpdir):
247
key_name = factory.make_string()
248
factory.make_file(tmpdir, "%s.private" % key_name)
251
self.patch(omshell, 'call_dnssec_keygen', returns_junk)
252
self.assertRaises(AssertionError, generate_omapi_key)
254
def test_run_repeated_keygen(self):
256
"+no", "/no", "no+", "no/",
257
"+NO", "/NO", "NO+", "NO/",
259
bad_patterns_templates = {
260
"foo%sbar", "one\ntwo\n%s\nthree\n", "%s",
262
# Test that a known bad key is ignored and we generate a new one
265
# This key is known to fail with omshell.
266
"YXY5pr+No/8NZeodSd27wWbI8N6kIjMF/nrnFIlPwVLuByJKkQcBRtfDrD"
267
"LLG2U9/ND7/bIlJxEGTUnyipffHQ==",
269
# Fabricate a range of keys containing the known-bad pattern.
271
template % pattern for template, pattern in product(
272
bad_patterns_templates, bad_patterns))
273
# An iterator that we can exhaust without mutating bad_keys.
274
iter_bad_keys = iter(bad_keys)
275
# Reference to the original parse_key_value_file, before we patch.
276
parse_key_value_file = provisioningserver.omshell.parse_key_value_file
278
# Patch parse_key_value_file to return each of the known-bad keys
279
# we've created, followed by reverting to its usual behaviour.
280
def side_effect(*args, **kwargs):
282
return {'Key': next(iter_bad_keys)}
283
except StopIteration:
284
return parse_key_value_file(*args, **kwargs)
286
mock = self.patch(provisioningserver.omshell, 'parse_key_value_file')
287
mock.side_effect = side_effect
289
# generate_omapi_key() does not return a key known to be bad.
290
self.assertNotIn(generate_omapi_key(), bad_keys)
293
class TestCallDnsSecKeygen(MAASTestCase):
294
"""Tests for omshell.call_dnssec_keygen."""
296
def test_runs_external_script(self):
297
call_and_check = self.patch(
298
provisioningserver.omshell, 'call_and_check')
299
target_dir = self.make_dir()
300
path = os.environ.get("PATH", "").split(os.pathsep)
301
path.append("/usr/sbin")
302
call_dnssec_keygen(target_dir)
303
call_and_check.assert_called_once_with(
304
['dnssec-keygen', '-r', '/dev/urandom', '-a', 'HMAC-MD5',
305
'-b', '512', '-n', 'HOST', '-K', target_dir, '-q', 'omapi_key'],