~maas-committers/maas/1.2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# Copyright 2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Tests for node actions."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

__metaclass__ = type
__all__ = []

from urlparse import urlparse

from django.core.urlresolvers import reverse
from maasserver.enum import (
    NODE_PERMISSION,
    NODE_STATUS,
    NODE_STATUS_CHOICES_DICT,
    )
from maasserver.exceptions import Redirect
from maasserver.node_action import (
    AcceptAndCommission,
    compile_node_actions,
    Delete,
    NodeAction,
    RetryCommissioning,
    StartNode,
    StopNode,
    )
from maasserver.testing.factory import factory
from maasserver.testing.testcase import TestCase
from provisioningserver.enum import POWER_TYPE
from provisioningserver.power.poweraction import PowerAction


ALL_STATUSES = NODE_STATUS_CHOICES_DICT.keys()


class FakeNodeAction(NodeAction):
    display = "Action label"
    actionable_statuses = ALL_STATUSES
    permission = NODE_PERMISSION.VIEW

    # For testing: an inhibition for inhibit() to return.
    fake_inhibition = None

    def inhibit(self):
        return self.fake_inhibition

    def execute(self):
        pass


class TestNodeAction(TestCase):

    def test_compile_node_actions_returns_available_actions(self):

        class MyAction(FakeNodeAction):
            display = factory.getRandomString()

        actions = compile_node_actions(
            factory.make_node(), factory.make_admin(), classes=[MyAction])
        self.assertEqual([MyAction.display], actions.keys())

    def test_compile_node_actions_checks_node_status(self):

        class MyAction(FakeNodeAction):
            actionable_statuses = (NODE_STATUS.READY, )

        node = factory.make_node(status=NODE_STATUS.DECLARED)
        actions = compile_node_actions(
            node, factory.make_admin(), classes=[MyAction])
        self.assertEqual({}, actions)

    def test_compile_node_actions_checks_permission(self):

        class MyAction(FakeNodeAction):
            permission = NODE_PERMISSION.EDIT

        node = factory.make_node(status=NODE_STATUS.COMMISSIONING)
        actions = compile_node_actions(
            node, factory.make_user(), classes=[MyAction])
        self.assertEqual({}, actions)

    def test_compile_node_actions_includes_inhibited_actions(self):

        class MyAction(FakeNodeAction):
            fake_inhibition = factory.getRandomString()

        actions = compile_node_actions(
            factory.make_node(), factory.make_admin(), classes=[MyAction])
        self.assertEqual([MyAction.display], actions.keys())

    def test_compile_node_actions_maps_display_names(self):

        class Action1(FakeNodeAction):
            display = factory.getRandomString()

        class Action2(FakeNodeAction):
            display = factory.getRandomString()

        actions = compile_node_actions(
            factory.make_node(), factory.make_admin(),
            classes=[Action1, Action2])
        for label, action in actions.items():
            self.assertEqual(label, action.display)

    def test_compile_node_actions_maintains_order(self):
        labels = [factory.getRandomString() for counter in range(4)]
        classes = [
            type(b"Action%d" % counter, (FakeNodeAction,), {'display': label})
            for counter, label in enumerate(labels)]
        actions = compile_node_actions(
            factory.make_node(), factory.make_admin(), classes=classes)
        self.assertSequenceEqual(labels, actions.keys())
        self.assertSequenceEqual(
            labels, [action.display for action in actions.values()])

    def test_is_permitted_allows_if_user_has_permission(self):

        class MyAction(FakeNodeAction):
            permission = NODE_PERMISSION.EDIT

        node = factory.make_node(
            status=NODE_STATUS.ALLOCATED, owner=factory.make_user())
        self.assertTrue(MyAction(node, node.owner).is_permitted())

    def test_is_permitted_disallows_if_user_lacks_permission(self):

        class MyAction(FakeNodeAction):
            permission = NODE_PERMISSION.EDIT

        node = factory.make_node(
            status=NODE_STATUS.ALLOCATED, owner=factory.make_user())
        self.assertFalse(MyAction(node, factory.make_user()).is_permitted())

    def test_inhibition_wraps_inhibit(self):
        inhibition = factory.getRandomString()
        action = FakeNodeAction(factory.make_node(), factory.make_user())
        action.fake_inhibition = inhibition
        self.assertEqual(inhibition, action.inhibition)

    def test_inhibition_caches_inhibition(self):
        # The inhibition property will call inhibit() only once.  We can
        # prove this by changing the string inhibit() returns; it won't
        # affect the value of the property.
        inhibition = factory.getRandomString()
        action = FakeNodeAction(factory.make_node(), factory.make_user())
        action.fake_inhibition = inhibition
        self.assertEqual(inhibition, action.inhibition)
        action.fake_inhibition = factory.getRandomString()
        self.assertEqual(inhibition, action.inhibition)

    def test_inhibition_caches_None(self):
        # An inhibition of None is also faithfully cached.  In other
        # words, it doesn't get mistaken for an uninitialized cache or
        # anything.
        action = FakeNodeAction(factory.make_node(), factory.make_user())
        action.fake_inhibition = None
        self.assertIsNone(action.inhibition)
        action.fake_inhibition = factory.getRandomString()
        self.assertIsNone(action.inhibition)


class TestDeleteNodeAction(TestCase):

    def test_Delete_inhibit_when_node_is_allocated(self):
        node = factory.make_node(status=NODE_STATUS.ALLOCATED)
        action = Delete(node, factory.make_admin())
        inhibition = action.inhibit()
        self.assertEqual(
            "You cannot delete this node because it's in use.", inhibition)

    def test_Delete_does_not_inhibit_otherwise(self):
        node = factory.make_node(status=NODE_STATUS.FAILED_TESTS)
        action = Delete(node, factory.make_admin())
        inhibition = action.inhibit()
        self.assertIsNone(inhibition)

    def test_Delete_redirects_to_node_delete_view(self):
        node = factory.make_node()
        action = Delete(node, factory.make_admin())
        try:
            action.execute()
        except Redirect as e:
            pass
        self.assertEqual(
            reverse('node-delete', args=[node.system_id]),
            urlparse(unicode(e)).path)


class TestAcceptAndCommissionNodeAction(TestCase):

    def test_AcceptAndCommission_starts_commissioning(self):
        node = factory.make_node(
            mac=True, status=NODE_STATUS.DECLARED,
            power_type=POWER_TYPE.WAKE_ON_LAN)
        action = AcceptAndCommission(node, factory.make_admin())
        action.execute()
        self.assertEqual(NODE_STATUS.COMMISSIONING, node.status)
        self.assertEqual(
            'provisioningserver.tasks.power_on',
            self.celery.tasks[0]['task'].name)


class TestRetryCommissioningNodeAction(TestCase):

    def test_RetryCommissioning_starts_commissioning(self):
        node = factory.make_node(
            mac=True, status=NODE_STATUS.FAILED_TESTS,
            power_type=POWER_TYPE.WAKE_ON_LAN)
        action = RetryCommissioning(node, factory.make_admin())
        action.execute()
        self.assertEqual(NODE_STATUS.COMMISSIONING, node.status)
        self.assertEqual(
            'provisioningserver.tasks.power_on',
            self.celery.tasks[0]['task'].name)


class TestStartNodeNodeAction(TestCase):

    def test_StartNode_inhibit_allows_user_with_SSH_key(self):
        user_with_key = factory.make_user()
        factory.make_sshkey(user_with_key)
        self.assertIsNone(
            StartNode(factory.make_node(), user_with_key).inhibit())

    def test_StartNode_inhibit_disallows_user_without_SSH_key(self):
        user_without_key = factory.make_user()
        action = StartNode(factory.make_node(), user_without_key)
        inhibition = action.inhibit()
        self.assertIsNotNone(inhibition)
        self.assertIn("SSH key", inhibition)

    def test_StartNode_acquires_and_starts_node(self):
        node = factory.make_node(
            mac=True, status=NODE_STATUS.READY,
            power_type=POWER_TYPE.WAKE_ON_LAN)
        user = factory.make_user()
        StartNode(node, user).execute()
        self.assertEqual(NODE_STATUS.ALLOCATED, node.status)
        self.assertEqual(user, node.owner)
        self.assertEqual(
            'provisioningserver.tasks.power_on',
            self.celery.tasks[0]['task'].name)


class TestStopNodeNodeAction(TestCase):

    def test_StopNode_stops_and_releases_node(self):
        self.patch(PowerAction, 'run_shell', lambda *args, **kwargs: ('', ''))
        user = factory.make_user()
        params = dict(
            power_address=factory.getRandomString(),
            power_user=factory.getRandomString(),
            power_pass=factory.getRandomString())
        node = factory.make_node(
            mac=True, status=NODE_STATUS.ALLOCATED,
            power_type=POWER_TYPE.IPMI,
            owner=user, power_parameters=params)
        StopNode(node, user).execute()

        self.assertEqual(NODE_STATUS.READY, node.status)
        self.assertIsNone(node.owner)
        self.assertEqual(
            'provisioningserver.tasks.power_off',
            self.celery.tasks[0]['task'].name)