~ubuntu-branches/ubuntu/utopic/maas/utopic-security

« back to all changes in this revision

Viewing changes to src/provisioningserver/tests/test_omshell.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez, Jeroen Vermeulen, Andres Rodriguez, Jason Hobbs, Raphaël Badin, Louis Bouchard, Gavin Panella
  • Date: 2014-08-21 19:36:30 UTC
  • mfrom: (1.3.1)
  • Revision ID: package-import@ubuntu.com-20140821193630-kertpu5hd8yyss8h
Tags: 1.7.0~beta7+bzr3266-0ubuntu1
* New Upstream Snapshot, Beta 7 bzr3266

[ Jeroen Vermeulen ]
* debian/extras/99-maas-sudoers
  debian/maas-dhcp.postinst
  debian/rules
  - Add second DHCP server instance for IPv6.
* debian/maas-region-controller-min.install
  debian/maas-region-controller-min.lintian-overrides
  - Install deployment user-data: maas_configure_interfaces.py script.
* debian/maas-cluster-controller.links
  debian/maas-cluster-controller.install
  debian/maas-cluster-controller.postinst
  - Reflect Celery removal changes made in trunk r3067.
  - Don't install celeryconfig_cluster.py any longer. 
  - Don't install maas_local_celeryconfig_cluster.py any longer.
  - Don't symlink maas_local_celeryconfig_cluster.py from /etc to /usr.
  - Don't insert UUID into maas_local_celeryconfig_cluster.py.

[ Andres Rodriguez ]
* debian/maas-region-controller-min.postrm: Cleanup lefover files.
* debian/maas-dhcp.postrm: Clean leftover configs.
* Provide new maas-proxy package that replaces the usage of
  squid-deb-proxy:
  - debian/control: New maas-proxy package that replaces the usage
    of squid-deb-proxy; Drop depends on squid-deb-proxy.
  - Add upstrart job.
  - Ensure squid3 is stopped as maas-proxy uses a caching proxy.
* Remove Celery references to cluster controller:
  - Rename upstart job from maas-pserv to maas-cluster; rename
    maas-cluster-celery to maas-cluster-register. Ensure services
    are stopped on upgrade.
  - debian/maintscript: Cleanup config files.
  - Remove all references to the MAAS celery daemon and config
    files as we don't use it like that anymore
* Move some entries in debian/maintscript to
  debian/maas-cluster-controller.maintscript
* Remove usage of txlongpoll and rabbitmq-server. Handle upgrades
  to ensure these are removed correctly.

[ Jason Hobbs ]
* debian/maas-region-controller-min.install: Install
  maas-generate-winrm-cert script.

[ Raphaël Badin ]
* debian/extras/maas-region-admin: Bypass django-admin as it prints
  spurious messages to stdout (LP: #1365130).

[Louis Bouchard]
* debian/maas-cluster-controller.postinst:
  - Exclude /var/log/maas/rsyslog when changing ownership
    (LP: #1346703)

[Gavin Panella]
* debian/maas-cluster-controller.maas-clusterd.upstart:
  - Don't start-up the cluster controller unless a shared-secret has
    been installed.
* debian/maas-cluster-controller.maas-cluster-register.upstart: Drop.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2012-2014 Canonical Ltd.  This software is licensed under the
2
 
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
 
4
 
"""Tests for the omshell.py file."""
5
 
 
6
 
from __future__ import (
7
 
    absolute_import,
8
 
    print_function,
9
 
    unicode_literals,
10
 
    )
11
 
 
12
 
str = None
13
 
 
14
 
__metaclass__ = type
15
 
__all__ = []
16
 
 
17
 
from itertools import product
18
 
import os
19
 
import subprocess
20
 
import tempfile
21
 
from textwrap import dedent
22
 
 
23
 
from maastesting.factory import factory
24
 
from maastesting.fakemethod import FakeMethod
25
 
from maastesting.fixtures import TempDirectory
26
 
from maastesting.testcase import MAASTestCase
27
 
from mock import (
28
 
    ANY,
29
 
    Mock,
30
 
    )
31
 
from provisioningserver import omshell
32
 
import provisioningserver.omshell
33
 
from provisioningserver.omshell import (
34
 
    call_dnssec_keygen,
35
 
    generate_omapi_key,
36
 
    Omshell,
37
 
    )
38
 
from provisioningserver.utils.shell import ExternalProcessError
39
 
from testtools.matchers import (
40
 
    EndsWith,
41
 
    MatchesStructure,
42
 
    )
43
 
 
44
 
 
45
 
class TestOmshell(MAASTestCase):
46
 
 
47
 
    def test_initialisation(self):
48
 
        server_address = factory.make_string()
49
 
        shared_key = factory.make_string()
50
 
        shell = Omshell(server_address, shared_key)
51
 
        self.assertThat(
52
 
            shell, MatchesStructure.byEquality(
53
 
                server_address=server_address,
54
 
                shared_key=shared_key))
55
 
 
56
 
    def test_create_calls_omshell_correctly(self):
57
 
        server_address = factory.make_string()
58
 
        shared_key = factory.make_string()
59
 
        ip_address = factory.getRandomIPAddress()
60
 
        mac_address = factory.getRandomMACAddress()
61
 
        shell = Omshell(server_address, shared_key)
62
 
 
63
 
        # Instead of calling a real omshell, we'll just record the
64
 
        # parameters passed to Popen.
65
 
        recorder = FakeMethod(result=(0, "hardware-type"))
66
 
        shell._run = recorder
67
 
 
68
 
        shell.create(ip_address, mac_address)
69
 
 
70
 
        expected_script = dedent("""\
71
 
            server {server}
72
 
            key omapi_key {key}
73
 
            connect
74
 
            new host
75
 
            set ip-address = {ip}
76
 
            set hardware-address = {mac}
77
 
            set hardware-type = 1
78
 
            set name = "{ip}"
79
 
            create
80
 
            """)
81
 
        expected_script = expected_script.format(
82
 
            server=server_address, key=shared_key, ip=ip_address,
83
 
            mac=mac_address)
84
 
 
85
 
        # Check that the 'stdin' arg contains the correct set of
86
 
        # commands.
87
 
        self.assertEqual(
88
 
            [1, (expected_script,)],
89
 
            [recorder.call_count, recorder.extract_args()[0]])
90
 
 
91
 
    def test_create_raises_when_omshell_fails(self):
92
 
        # If the call to omshell doesn't result in output containing the
93
 
        # magic string 'hardware-type' it means the set of commands
94
 
        # failed.
95
 
 
96
 
        server_address = factory.make_string()
97
 
        shared_key = factory.make_string()
98
 
        ip_address = factory.getRandomIPAddress()
99
 
        mac_address = factory.getRandomMACAddress()
100
 
        shell = Omshell(server_address, shared_key)
101
 
 
102
 
        # Fake a call that results in a failure with random output.
103
 
        random_output = factory.make_string()
104
 
        recorder = FakeMethod(result=(0, random_output))
105
 
        shell._run = recorder
106
 
 
107
 
        exc = self.assertRaises(
108
 
            ExternalProcessError, shell.create, ip_address, mac_address)
109
 
        self.assertEqual(random_output, exc.output)
110
 
 
111
 
    def test_create_succeeds_when_host_map_already_exists(self):
112
 
        # To omshell, creating the same host map twice is an error.  But
113
 
        # Omshell.create swallows the error and makes it look like
114
 
        # success.
115
 
        params = {
116
 
            'ip': factory.getRandomIPAddress(),
117
 
            'mac': factory.getRandomMACAddress(),
118
 
            'hostname': factory.make_name('hostname')
119
 
        }
120
 
        shell = Omshell(factory.make_name('server'), factory.make_name('key'))
121
 
        # This is the kind of error output we get if a host map has
122
 
        # already been created.
123
 
        error_output = dedent("""\
124
 
            obj: host
125
 
            ip-address = %(ip)s
126
 
            hardware-address = %(mac)s
127
 
            name = "%(hostname)s"
128
 
            >
129
 
            can't open object: I/O error
130
 
            obj: host
131
 
            ip-address = %(ip)s
132
 
            hardware-address = %(mac)s
133
 
            name = "%(hostname)s"
134
 
            """) % params
135
 
        shell._run = Mock(return_value=(0, error_output))
136
 
        shell.create(params['ip'], params['mac'])
137
 
        # The test is that we get here without error.
138
 
        pass
139
 
 
140
 
    def test_remove_calls_omshell_correctly(self):
141
 
        server_address = factory.make_string()
142
 
        shared_key = factory.make_string()
143
 
        ip_address = factory.getRandomIPAddress()
144
 
        shell = Omshell(server_address, shared_key)
145
 
 
146
 
        # Instead of calling a real omshell, we'll just record the
147
 
        # parameters passed to Popen.
148
 
        recorder = FakeMethod(result=(0, "thing1\nthing2\nobj: <null>"))
149
 
        shell._run = recorder
150
 
 
151
 
        shell.remove(ip_address)
152
 
 
153
 
        expected_script = dedent("""\
154
 
            server {server}
155
 
            key omapi_key {key}
156
 
            connect
157
 
            new host
158
 
            set name = "{ip}"
159
 
            open
160
 
            remove
161
 
            """)
162
 
        expected_script = expected_script.format(
163
 
            server=server_address, key=shared_key, ip=ip_address)
164
 
 
165
 
        # Check that the 'stdin' arg contains the correct set of
166
 
        # commands.
167
 
        self.assertEqual([(expected_script,)], recorder.extract_args())
168
 
 
169
 
    def test_remove_raises_when_omshell_fails(self):
170
 
        # If the call to omshell doesn't result in output ending in the
171
 
        # text 'obj: <null>' we can be fairly sure this operation
172
 
        # failed.
173
 
        server_address = factory.make_string()
174
 
        shared_key = factory.make_string()
175
 
        ip_address = factory.getRandomIPAddress()
176
 
        shell = Omshell(server_address, shared_key)
177
 
 
178
 
        # Fake a call that results in a failure with random output.
179
 
        random_output = factory.make_string()
180
 
        recorder = FakeMethod(result=(0, random_output))
181
 
        shell._run = recorder
182
 
 
183
 
        exc = self.assertRaises(
184
 
            subprocess.CalledProcessError, shell.remove, ip_address)
185
 
        self.assertEqual(random_output, exc.output)
186
 
 
187
 
    def test_remove_works_when_extraneous_blank_last_lines(self):
188
 
        # Sometimes omshell puts blank lines after the 'obj: <null>' so
189
 
        # we need to test that the code still works if that's the case.
190
 
        server_address = factory.make_string()
191
 
        shared_key = factory.make_string()
192
 
        ip_address = factory.getRandomIPAddress()
193
 
        shell = Omshell(server_address, shared_key)
194
 
 
195
 
        # Fake a call that results in a something with our special output.
196
 
        output = "\n> obj: <null>\n\n"
197
 
        self.patch(shell, '_run').return_value = (0, output)
198
 
        self.assertIsNone(shell.remove(ip_address))
199
 
 
200
 
    def test_remove_works_when_extraneous_gt_char_present(self):
201
 
        # Sometimes omshell puts a leading '>' character in responses.
202
 
        # We need to test that the code still works if that's the case.
203
 
        server_address = factory.make_string()
204
 
        shared_key = factory.make_string()
205
 
        ip_address = factory.getRandomIPAddress()
206
 
        shell = Omshell(server_address, shared_key)
207
 
 
208
 
        # Fake a call that results in a something with our special output.
209
 
        output = "\n>obj: <null>\n>\n"
210
 
        self.patch(shell, '_run').return_value = (0, output)
211
 
        self.assertIsNone(shell.remove(ip_address))
212
 
 
213
 
    def test_remove_works_when_object_already_removed(self):
214
 
        server_address = factory.make_string()
215
 
        shared_key = factory.make_string()
216
 
        ip_address = factory.getRandomIPAddress()
217
 
        shell = Omshell(server_address, shared_key)
218
 
 
219
 
        output = "obj: <null>\nobj: host\ncan't open object: not found\n"
220
 
        self.patch(shell, '_run').return_value = (0, output)
221
 
        self.assertIsNone(shell.remove(ip_address))
222
 
 
223
 
 
224
 
class Test_generate_omapi_key(MAASTestCase):
225
 
    """Tests for omshell.generate_omapi_key"""
226
 
 
227
 
    def test_generate_omapi_key_returns_a_key(self):
228
 
        key = generate_omapi_key()
229
 
        # Could test for != None here, but the keys end in == for a 512
230
 
        # bit length key, so that's a better check that the script was
231
 
        # actually run and produced output.
232
 
        self.assertThat(key, EndsWith("=="))
233
 
 
234
 
    def test_generate_omapi_key_leaves_no_temp_files(self):
235
 
        tmpdir = self.useFixture(TempDirectory()).path
236
 
        # Make mkdtemp() in omshell nest all directories within tmpdir.
237
 
        self.patch(tempfile, 'tempdir', tmpdir)
238
 
        generate_omapi_key()
239
 
        self.assertEqual([], os.listdir(tmpdir))
240
 
 
241
 
    def test_generate_omapi_key_raises_assertionerror_on_no_output(self):
242
 
        self.patch(omshell, 'call_dnssec_keygen', FakeMethod())
243
 
        self.assertRaises(AssertionError, generate_omapi_key)
244
 
 
245
 
    def test_generate_omapi_key_raises_assertionerror_on_bad_output(self):
246
 
        def returns_junk(tmpdir):
247
 
            key_name = factory.make_string()
248
 
            factory.make_file(tmpdir, "%s.private" % key_name)
249
 
            return key_name
250
 
 
251
 
        self.patch(omshell, 'call_dnssec_keygen', returns_junk)
252
 
        self.assertRaises(AssertionError, generate_omapi_key)
253
 
 
254
 
    def test_run_repeated_keygen(self):
255
 
        bad_patterns = {
256
 
            "+no", "/no", "no+", "no/",
257
 
            "+NO", "/NO", "NO+", "NO/",
258
 
            }
259
 
        bad_patterns_templates = {
260
 
            "foo%sbar", "one\ntwo\n%s\nthree\n", "%s",
261
 
            }
262
 
        # Test that a known bad key is ignored and we generate a new one
263
 
        # to replace it.
264
 
        bad_keys = {
265
 
            # This key is known to fail with omshell.
266
 
            "YXY5pr+No/8NZeodSd27wWbI8N6kIjMF/nrnFIlPwVLuByJKkQcBRtfDrD"
267
 
            "LLG2U9/ND7/bIlJxEGTUnyipffHQ==",
268
 
            }
269
 
        # Fabricate a range of keys containing the known-bad pattern.
270
 
        bad_keys.update(
271
 
            template % pattern for template, pattern in product(
272
 
                bad_patterns_templates, bad_patterns))
273
 
        # An iterator that we can exhaust without mutating bad_keys.
274
 
        iter_bad_keys = iter(bad_keys)
275
 
        # Reference to the original parse_key_value_file, before we patch.
276
 
        parse_key_value_file = provisioningserver.omshell.parse_key_value_file
277
 
 
278
 
        # Patch parse_key_value_file to return each of the known-bad keys
279
 
        # we've created, followed by reverting to its usual behaviour.
280
 
        def side_effect(*args, **kwargs):
281
 
            try:
282
 
                return {'Key': next(iter_bad_keys)}
283
 
            except StopIteration:
284
 
                return parse_key_value_file(*args, **kwargs)
285
 
 
286
 
        mock = self.patch(provisioningserver.omshell, 'parse_key_value_file')
287
 
        mock.side_effect = side_effect
288
 
 
289
 
        # generate_omapi_key() does not return a key known to be bad.
290
 
        self.assertNotIn(generate_omapi_key(), bad_keys)
291
 
 
292
 
 
293
 
class TestCallDnsSecKeygen(MAASTestCase):
294
 
    """Tests for omshell.call_dnssec_keygen."""
295
 
 
296
 
    def test_runs_external_script(self):
297
 
        call_and_check = self.patch(
298
 
            provisioningserver.omshell, 'call_and_check')
299
 
        target_dir = self.make_dir()
300
 
        path = os.environ.get("PATH", "").split(os.pathsep)
301
 
        path.append("/usr/sbin")
302
 
        call_dnssec_keygen(target_dir)
303
 
        call_and_check.assert_called_once_with(
304
 
            ['dnssec-keygen', '-r', '/dev/urandom', '-a', 'HMAC-MD5',
305
 
             '-b', '512', '-n', 'HOST', '-K', target_dir, '-q', 'omapi_key'],
306
 
            env=ANY)