~ubuntu-branches/ubuntu/raring/maas/raring-updates

« back to all changes in this revision

Viewing changes to src/maasserver/tests/test_models.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez
  • Date: 2012-07-03 17:42:37 UTC
  • mfrom: (1.1.13)
  • Revision ID: package-import@ubuntu.com-20120703174237-p8l0keuuznfg721k
Tags: 0.1+bzr709+dfsg-0ubuntu1
* New Upstream release
* debian/control:
  - Depends on python-celery, python-tempita, libjs-yui3-{full,min},
    libjs-raphael
* debian/maas.install:
  - Install apiclient, celeryconfig.py, maas-import-pxe-files, preseeds_v2.
  - Update to install various files from chroot, rather tha manually copy
    them from the source.
* debian/maas.links: symlink celeryconfig.py
* debian/maas.maas-celery.upstart: Add job.
* debian/rules:
  - Install celery upstart job.
  - Do not install jslibs as packages are now used.
  - Drop copying of maas_local_settings_sample.py as source now ships
    a maas_local_settings.py
* debian/patches:
  - 04-maas-http-fix.patch: Drop. Merged upstream.
  - 01-fix-database-settings.patch: Refreshed.
  - 99_enums_js.patch: Added until creation of enum.js / build process
    is fixed.
* debian/maas.postinst: Update bzr version to correctly handle upgrades.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2012 Canonical Ltd.  This software is licensed under the
2
 
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
 
4
 
"""Test maasserver models."""
5
 
 
6
 
from __future__ import (
7
 
    absolute_import,
8
 
    print_function,
9
 
    unicode_literals,
10
 
    )
11
 
 
12
 
__metaclass__ = type
13
 
__all__ = []
14
 
 
15
 
import codecs
16
 
from io import BytesIO
17
 
import os
18
 
import random
19
 
import shutil
20
 
from socket import gethostname
21
 
 
22
 
from django.conf import settings
23
 
from django.contrib.auth.models import User
24
 
from django.core.exceptions import (
25
 
    PermissionDenied,
26
 
    ValidationError,
27
 
    )
28
 
from django.db import IntegrityError
29
 
from django.utils.safestring import SafeUnicode
30
 
from fixtures import TestWithFixtures
31
 
from maasserver.exceptions import (
32
 
    CannotDeleteUserException,
33
 
    NodeStateViolation,
34
 
    )
35
 
from maasserver.models import (
36
 
    Config,
37
 
    create_auth_token,
38
 
    DEFAULT_CONFIG,
39
 
    FileStorage,
40
 
    GENERIC_CONSUMER,
41
 
    get_auth_tokens,
42
 
    get_db_state,
43
 
    get_default_config,
44
 
    get_html_display_for_key,
45
 
    HELLIPSIS,
46
 
    MACAddress,
47
 
    Node,
48
 
    NODE_PERMISSION,
49
 
    NODE_STATUS,
50
 
    NODE_STATUS_CHOICES,
51
 
    NODE_STATUS_CHOICES_DICT,
52
 
    NODE_TRANSITIONS,
53
 
    SSHKey,
54
 
    SYSTEM_USERS,
55
 
    UserProfile,
56
 
    validate_ssh_public_key,
57
 
    )
58
 
from maasserver.provisioning import get_provisioning_api_proxy
59
 
from maasserver.testing import get_data
60
 
from maasserver.testing.enum import map_enum
61
 
from maasserver.testing.factory import factory
62
 
from maasserver.testing.testcase import TestCase
63
 
from metadataserver.models import (
64
 
    NodeCommissionResult,
65
 
    NodeUserData,
66
 
    )
67
 
from piston.models import (
68
 
    Consumer,
69
 
    KEY_SIZE,
70
 
    SECRET_SIZE,
71
 
    Token,
72
 
    )
73
 
from provisioningserver.enum import POWER_TYPE
74
 
from testtools.matchers import (
75
 
    EndsWith,
76
 
    GreaterThan,
77
 
    LessThan,
78
 
    )
79
 
 
80
 
 
81
 
class NodeTest(TestCase):
82
 
 
83
 
    def test_system_id(self):
84
 
        """
85
 
        The generated system_id looks good.
86
 
 
87
 
        """
88
 
        node = factory.make_node()
89
 
        self.assertEqual(len(node.system_id), 41)
90
 
        self.assertTrue(node.system_id.startswith('node-'))
91
 
 
92
 
    def test_display_status_shows_default_status(self):
93
 
        node = factory.make_node()
94
 
        self.assertEqual(
95
 
            NODE_STATUS_CHOICES_DICT[node.status],
96
 
            node.display_status())
97
 
 
98
 
    def test_display_status_for_allocated_node_shows_owner(self):
99
 
        node = factory.make_node(
100
 
            owner=factory.make_user(), status=NODE_STATUS.ALLOCATED)
101
 
        self.assertEqual(
102
 
            "Allocated to %s" % node.owner.username,
103
 
            node.display_status())
104
 
 
105
 
    def test_add_node_with_token(self):
106
 
        user = factory.make_user()
107
 
        token = create_auth_token(user)
108
 
        node = factory.make_node(token=token)
109
 
        self.assertEqual(token, node.token)
110
 
 
111
 
    def test_add_mac_address(self):
112
 
        node = factory.make_node()
113
 
        node.add_mac_address('AA:BB:CC:DD:EE:FF')
114
 
        macs = MACAddress.objects.filter(
115
 
            node=node, mac_address='AA:BB:CC:DD:EE:FF').count()
116
 
        self.assertEqual(1, macs)
117
 
 
118
 
    def test_remove_mac_address(self):
119
 
        node = factory.make_node()
120
 
        node.add_mac_address('AA:BB:CC:DD:EE:FF')
121
 
        node.remove_mac_address('AA:BB:CC:DD:EE:FF')
122
 
        macs = MACAddress.objects.filter(
123
 
            node=node, mac_address='AA:BB:CC:DD:EE:FF').count()
124
 
        self.assertEqual(0, macs)
125
 
 
126
 
    def test_delete_node_deletes_related_mac(self):
127
 
        node = factory.make_node()
128
 
        mac = node.add_mac_address('AA:BB:CC:DD:EE:FF')
129
 
        node.delete()
130
 
        self.assertRaises(
131
 
            MACAddress.DoesNotExist, MACAddress.objects.get, id=mac.id)
132
 
 
133
 
    def test_cannot_delete_allocated_node(self):
134
 
        node = factory.make_node(status=NODE_STATUS.ALLOCATED)
135
 
        self.assertRaises(NodeStateViolation, node.delete)
136
 
 
137
 
    def test_set_mac_based_hostname_default_enlistment_domain(self):
138
 
        # The enlistment domain defaults to `local`.
139
 
        node = factory.make_node()
140
 
        node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
141
 
        hostname = 'node-aabbccddeeff.local'
142
 
        self.assertEqual(hostname, node.hostname)
143
 
 
144
 
    def test_set_mac_based_hostname_alt_enlistment_domain(self):
145
 
        # A non-default enlistment domain can be specified.
146
 
        Config.objects.set_config("enlistment_domain", "example.com")
147
 
        node = factory.make_node()
148
 
        node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
149
 
        hostname = 'node-aabbccddeeff.example.com'
150
 
        self.assertEqual(hostname, node.hostname)
151
 
 
152
 
    def test_set_mac_based_hostname_cleaning_enlistment_domain(self):
153
 
        # Leading and trailing dots and whitespace are cleaned from the
154
 
        # configured enlistment domain before it's joined to the hostname.
155
 
        Config.objects.set_config("enlistment_domain", " .example.com. ")
156
 
        node = factory.make_node()
157
 
        node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
158
 
        hostname = 'node-aabbccddeeff.example.com'
159
 
        self.assertEqual(hostname, node.hostname)
160
 
 
161
 
    def test_set_mac_based_hostname_no_enlistment_domain(self):
162
 
        # The enlistment domain can be set to the empty string and
163
 
        # set_mac_based_hostname sets a hostname with no domain.
164
 
        Config.objects.set_config("enlistment_domain", "")
165
 
        node = factory.make_node()
166
 
        node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
167
 
        hostname = 'node-aabbccddeeff'
168
 
        self.assertEqual(hostname, node.hostname)
169
 
 
170
 
    def test_get_effective_power_type_defaults_to_config(self):
171
 
        power_types = list(map_enum(POWER_TYPE).values())
172
 
        power_types.remove(POWER_TYPE.DEFAULT)
173
 
        node = factory.make_node(power_type=POWER_TYPE.DEFAULT)
174
 
        effective_types = []
175
 
        for power_type in power_types:
176
 
            Config.objects.set_config('node_power_type', power_type)
177
 
            effective_types.append(node.get_effective_power_type())
178
 
        self.assertEqual(power_types, effective_types)
179
 
 
180
 
    def test_get_effective_power_type_reads_node_field(self):
181
 
        power_types = list(map_enum(POWER_TYPE).values())
182
 
        power_types.remove(POWER_TYPE.DEFAULT)
183
 
        nodes = [
184
 
            factory.make_node(power_type=power_type)
185
 
            for power_type in power_types]
186
 
        self.assertEqual(
187
 
            power_types, [node.get_effective_power_type() for node in nodes])
188
 
 
189
 
    def test_get_effective_power_type_rejects_default_as_config_value(self):
190
 
        node = factory.make_node(power_type=POWER_TYPE.DEFAULT)
191
 
        Config.objects.set_config('node_power_type', POWER_TYPE.DEFAULT)
192
 
        self.assertRaises(ValueError, node.get_effective_power_type)
193
 
 
194
 
    def test_acquire(self):
195
 
        node = factory.make_node(status=NODE_STATUS.READY)
196
 
        user = factory.make_user()
197
 
        token = create_auth_token(user)
198
 
        node.acquire(token)
199
 
        self.assertEqual(user, node.owner)
200
 
        self.assertEqual(NODE_STATUS.ALLOCATED, node.status)
201
 
 
202
 
    def test_release(self):
203
 
        node = factory.make_node(
204
 
            status=NODE_STATUS.ALLOCATED, owner=factory.make_user())
205
 
        node.release()
206
 
        self.assertEqual((NODE_STATUS.READY, None), (node.status, node.owner))
207
 
 
208
 
    def test_accept_enlistment_gets_node_out_of_declared_state(self):
209
 
        # If called on a node in Declared state, accept_enlistment()
210
 
        # changes the node's status, and returns the node.
211
 
        target_state = NODE_STATUS.COMMISSIONING
212
 
 
213
 
        node = factory.make_node(status=NODE_STATUS.DECLARED)
214
 
        return_value = node.accept_enlistment(factory.make_user())
215
 
        self.assertEqual((node, target_state), (return_value, node.status))
216
 
 
217
 
    def test_accept_enlistment_does_nothing_if_already_accepted(self):
218
 
        # If a node has already been accepted, but not assigned a role
219
 
        # yet, calling accept_enlistment on it is meaningless but not an
220
 
        # error.  The method returns None in this case.
221
 
        accepted_states = [
222
 
            NODE_STATUS.COMMISSIONING,
223
 
            NODE_STATUS.READY,
224
 
            ]
225
 
        nodes = {
226
 
            status: factory.make_node(status=status)
227
 
            for status in accepted_states}
228
 
 
229
 
        return_values = {
230
 
            status: node.accept_enlistment(factory.make_user())
231
 
            for status, node in nodes.items()}
232
 
 
233
 
        self.assertEqual(
234
 
            {status: None for status in accepted_states}, return_values)
235
 
        self.assertEqual(
236
 
            {status: status for status in accepted_states},
237
 
            {status: node.status for status, node in nodes.items()})
238
 
 
239
 
    def test_accept_enlistment_rejects_bad_state_change(self):
240
 
        # If a node is neither Declared nor in one of the "accepted"
241
 
        # states where acceptance is a safe no-op, accept_enlistment
242
 
        # raises a node state violation and leaves the node's state
243
 
        # unchanged.
244
 
        all_states = map_enum(NODE_STATUS).values()
245
 
        acceptable_states = [
246
 
            NODE_STATUS.DECLARED,
247
 
            NODE_STATUS.COMMISSIONING,
248
 
            NODE_STATUS.READY,
249
 
            ]
250
 
        unacceptable_states = set(all_states) - set(acceptable_states)
251
 
        nodes = {
252
 
            status: factory.make_node(status=status)
253
 
            for status in unacceptable_states}
254
 
 
255
 
        exceptions = {status: False for status in unacceptable_states}
256
 
        for status, node in nodes.items():
257
 
            try:
258
 
                node.accept_enlistment(factory.make_user())
259
 
            except NodeStateViolation:
260
 
                exceptions[status] = True
261
 
 
262
 
        self.assertEqual(
263
 
            {status: True for status in unacceptable_states}, exceptions)
264
 
        self.assertEqual(
265
 
            {status: status for status in unacceptable_states},
266
 
            {status: node.status for status, node in nodes.items()})
267
 
 
268
 
    def test_start_commissioning_changes_status_and_starts_node(self):
269
 
        user = factory.make_user()
270
 
        node = factory.make_node(status=NODE_STATUS.DECLARED)
271
 
        node.start_commissioning(user)
272
 
 
273
 
        expected_attrs = {
274
 
            'status': NODE_STATUS.COMMISSIONING,
275
 
            'owner': user,
276
 
        }
277
 
        self.assertAttributes(node, expected_attrs)
278
 
        power_status = get_provisioning_api_proxy().power_status
279
 
        self.assertEqual('start', power_status[node.system_id])
280
 
 
281
 
    def test_start_commissioning_sets_user_data(self):
282
 
        node = factory.make_node(status=NODE_STATUS.DECLARED)
283
 
        node.start_commissioning(factory.make_admin())
284
 
        path = settings.COMMISSIONING_SCRIPT
285
 
        with open(path, 'r') as f:
286
 
            commissioning_user_data = f.read()
287
 
        self.assertEqual(
288
 
            commissioning_user_data,
289
 
            NodeUserData.objects.get_user_data(node))
290
 
 
291
 
    def test_missing_commissioning_script(self):
292
 
        self.patch(
293
 
            settings, 'COMMISSIONING_SCRIPT',
294
 
            '/etc/' + factory.getRandomString(10))
295
 
        node = factory.make_node(status=NODE_STATUS.DECLARED)
296
 
        self.assertRaises(
297
 
            ValidationError,
298
 
            node.start_commissioning, factory.make_admin())
299
 
 
300
 
    def test_start_commissioning_clears_node_commissioning_results(self):
301
 
        node = factory.make_node(status=NODE_STATUS.DECLARED)
302
 
        NodeCommissionResult.objects.store_data(
303
 
            node, factory.getRandomString(), factory.getRandomString())
304
 
        node.start_commissioning(factory.make_admin())
305
 
        self.assertItemsEqual([], node.nodecommissionresult_set.all())
306
 
 
307
 
    def test_start_commissioning_ignores_other_commissioning_results(self):
308
 
        node = factory.make_node()
309
 
        filename = factory.getRandomString()
310
 
        text = factory.getRandomString()
311
 
        NodeCommissionResult.objects.store_data(node, filename, text)
312
 
        other_node = factory.make_node(status=NODE_STATUS.DECLARED)
313
 
        other_node.start_commissioning(factory.make_admin())
314
 
        self.assertEqual(
315
 
            text, NodeCommissionResult.objects.get_data(node, filename))
316
 
 
317
 
    def test_full_clean_checks_status_transition_and_raises_if_invalid(self):
318
 
        # RETIRED -> ALLOCATED is an invalid transition.
319
 
        node = factory.make_node(
320
 
            status=NODE_STATUS.RETIRED, owner=factory.make_user())
321
 
        node.status = NODE_STATUS.ALLOCATED
322
 
        self.assertRaisesRegexp(
323
 
            NodeStateViolation,
324
 
            "Invalid transition: Retired -> Allocated.",
325
 
            node.full_clean)
326
 
 
327
 
    def test_full_clean_passes_if_status_unchanged(self):
328
 
        status = factory.getRandomChoice(NODE_STATUS_CHOICES)
329
 
        node = factory.make_node(status=status)
330
 
        node.status = status
331
 
        node.full_clean()
332
 
        # The test is that this does not raise an error.
333
 
        pass
334
 
 
335
 
    def test_full_clean_passes_if_status_valid_transition(self):
336
 
        # NODE_STATUS.READY -> NODE_STATUS.ALLOCATED is a valid
337
 
        # transition.
338
 
        status = NODE_STATUS.READY
339
 
        node = factory.make_node(status=status)
340
 
        node.status = NODE_STATUS.ALLOCATED
341
 
        node.full_clean()
342
 
        # The test is that this does not raise an error.
343
 
        pass
344
 
 
345
 
    def test_save_raises_node_state_violation_on_bad_transition(self):
346
 
        # RETIRED -> ALLOCATED is an invalid transition.
347
 
        node = factory.make_node(
348
 
            status=NODE_STATUS.RETIRED, owner=factory.make_user())
349
 
        node.status = NODE_STATUS.ALLOCATED
350
 
        self.assertRaisesRegexp(
351
 
            NodeStateViolation,
352
 
            "Invalid transition: Retired -> Allocated.",
353
 
            node.save)
354
 
 
355
 
    def test_save_does_not_check_status_transition_if_skip_check(self):
356
 
        # RETIRED -> ALLOCATED is an invalid transition.
357
 
        node = factory.make_node(
358
 
            status=NODE_STATUS.RETIRED, owner=factory.make_user())
359
 
        node.status = NODE_STATUS.ALLOCATED
360
 
        node.save(skip_check=True)
361
 
        # The test is that this does not raise an error.
362
 
        pass
363
 
 
364
 
 
365
 
class NodeTransitionsTests(TestCase):
366
 
    """Test the structure of NODE_TRANSITIONS."""
367
 
 
368
 
    def test_NODE_TRANSITIONS_initial_states(self):
369
 
        allowed_states = set(NODE_STATUS_CHOICES_DICT.keys() + [None])
370
 
 
371
 
        self.assertTrue(set(NODE_TRANSITIONS.keys()) <= allowed_states)
372
 
 
373
 
    def test_NODE_TRANSITIONS_destination_state(self):
374
 
        all_destination_states = []
375
 
        for destination_states in NODE_TRANSITIONS.values():
376
 
            all_destination_states.extend(destination_states)
377
 
        allowed_states = set(NODE_STATUS_CHOICES_DICT.keys())
378
 
 
379
 
        self.assertTrue(set(all_destination_states) <= allowed_states)
380
 
 
381
 
 
382
 
class GetDbStateTest(TestCase):
383
 
    """Testing for the method `get_db_state`."""
384
 
 
385
 
    def test_get_db_state_returns_db_state(self):
386
 
        status = factory.getRandomChoice(NODE_STATUS_CHOICES)
387
 
        node = factory.make_node(status=status)
388
 
        another_status = factory.getRandomChoice(
389
 
            NODE_STATUS_CHOICES, but_not=[status])
390
 
        node.status = another_status
391
 
        self.assertEqual(status, get_db_state(node, 'status'))
392
 
 
393
 
 
394
 
class NodeManagerTest(TestCase):
395
 
 
396
 
    def make_node(self, user=None):
397
 
        """Create a node, allocated to `user` if given."""
398
 
        if user is None:
399
 
            status = NODE_STATUS.READY
400
 
        else:
401
 
            status = NODE_STATUS.ALLOCATED
402
 
        return factory.make_node(set_hostname=True, status=status, owner=user)
403
 
 
404
 
    def make_user_data(self):
405
 
        """Create a blob of arbitrary user-data."""
406
 
        return factory.getRandomString().encode('ascii')
407
 
 
408
 
    def test_filter_by_ids_filters_nodes_by_ids(self):
409
 
        nodes = [factory.make_node() for counter in range(5)]
410
 
        ids = [node.system_id for node in nodes]
411
 
        selection = slice(1, 3)
412
 
        self.assertItemsEqual(
413
 
            nodes[selection],
414
 
            Node.objects.filter_by_ids(Node.objects.all(), ids[selection]))
415
 
 
416
 
    def test_filter_by_ids_with_empty_list_returns_empty(self):
417
 
        factory.make_node()
418
 
        self.assertItemsEqual(
419
 
            [], Node.objects.filter_by_ids(Node.objects.all(), []))
420
 
 
421
 
    def test_filter_by_ids_without_ids_returns_full(self):
422
 
        node = factory.make_node()
423
 
        self.assertItemsEqual(
424
 
            [node], Node.objects.filter_by_ids(Node.objects.all(), None))
425
 
 
426
 
    def test_get_nodes_for_user_lists_visible_nodes(self):
427
 
        """get_nodes with perm=NODE_PERMISSION.VIEW lists the nodes a user
428
 
        has access to.
429
 
 
430
 
        When run for a regular user it returns unowned nodes, and nodes
431
 
        owned by that user.
432
 
        """
433
 
        user = factory.make_user()
434
 
        visible_nodes = [self.make_node(owner) for owner in [None, user]]
435
 
        self.make_node(factory.make_user())
436
 
        self.assertItemsEqual(
437
 
            visible_nodes, Node.objects.get_nodes(user, NODE_PERMISSION.VIEW))
438
 
 
439
 
    def test_get_nodes_admin_lists_all_nodes(self):
440
 
        admin = factory.make_admin()
441
 
        owners = [
442
 
            None,
443
 
            factory.make_user(),
444
 
            factory.make_admin(),
445
 
            admin,
446
 
            ]
447
 
        nodes = [self.make_node(owner) for owner in owners]
448
 
        self.assertItemsEqual(
449
 
            nodes, Node.objects.get_nodes(admin, NODE_PERMISSION.VIEW))
450
 
 
451
 
    def test_get_nodes_filters_by_id(self):
452
 
        user = factory.make_user()
453
 
        nodes = [self.make_node(user) for counter in range(5)]
454
 
        ids = [node.system_id for node in nodes]
455
 
        wanted_slice = slice(0, 3)
456
 
        self.assertItemsEqual(
457
 
            nodes[wanted_slice],
458
 
            Node.objects.get_nodes(
459
 
                user, NODE_PERMISSION.VIEW, ids=ids[wanted_slice]))
460
 
 
461
 
    def test_get_nodes_with_edit_perm_for_user_lists_owned_nodes(self):
462
 
        user = factory.make_user()
463
 
        visible_node = self.make_node(user)
464
 
        self.make_node(None)
465
 
        self.make_node(factory.make_user())
466
 
        self.assertItemsEqual(
467
 
            [visible_node],
468
 
            Node.objects.get_nodes(user, NODE_PERMISSION.EDIT))
469
 
 
470
 
    def test_get_nodes_with_edit_perm_admin_lists_all_nodes(self):
471
 
        admin = factory.make_admin()
472
 
        owners = [
473
 
            None,
474
 
            factory.make_user(),
475
 
            factory.make_admin(),
476
 
            admin,
477
 
            ]
478
 
        nodes = [self.make_node(owner) for owner in owners]
479
 
        self.assertItemsEqual(
480
 
            nodes, Node.objects.get_nodes(admin, NODE_PERMISSION.EDIT))
481
 
 
482
 
    def test_get_nodes_with_admin_perm_returns_empty_list_for_user(self):
483
 
        user = factory.make_user()
484
 
        [self.make_node(user) for counter in range(5)]
485
 
        self.assertItemsEqual(
486
 
            [],
487
 
            Node.objects.get_nodes(user, NODE_PERMISSION.ADMIN))
488
 
 
489
 
    def test_get_nodes_with_admin_perm_returns_all_nodes_for_admin(self):
490
 
        user = factory.make_user()
491
 
        nodes = [self.make_node(user) for counter in range(5)]
492
 
        self.assertItemsEqual(
493
 
            nodes,
494
 
            Node.objects.get_nodes(
495
 
                factory.make_admin(), NODE_PERMISSION.ADMIN))
496
 
 
497
 
    def test_get_visible_node_or_404_ok(self):
498
 
        """get_node_or_404 fetches nodes by system_id."""
499
 
        user = factory.make_user()
500
 
        node = self.make_node(user)
501
 
        self.assertEqual(
502
 
            node,
503
 
            Node.objects.get_node_or_404(
504
 
                node.system_id, user, NODE_PERMISSION.VIEW))
505
 
 
506
 
    def test_get_visible_node_or_404_raises_PermissionDenied(self):
507
 
        """get_node_or_404 raises PermissionDenied if the provided
508
 
        user has not the right permission on the returned node."""
509
 
        user_node = self.make_node(factory.make_user())
510
 
        self.assertRaises(
511
 
            PermissionDenied,
512
 
            Node.objects.get_node_or_404,
513
 
            user_node.system_id, factory.make_user(), NODE_PERMISSION.VIEW)
514
 
 
515
 
    def test_get_available_node_for_acquisition_finds_available_node(self):
516
 
        user = factory.make_user()
517
 
        node = self.make_node(None)
518
 
        self.assertEqual(
519
 
            node, Node.objects.get_available_node_for_acquisition(user))
520
 
 
521
 
    def test_get_available_node_for_acquisition_returns_none_if_empty(self):
522
 
        user = factory.make_user()
523
 
        self.assertEqual(
524
 
            None, Node.objects.get_available_node_for_acquisition(user))
525
 
 
526
 
    def test_get_available_node_for_acquisition_ignores_taken_nodes(self):
527
 
        user = factory.make_user()
528
 
        available_status = NODE_STATUS.READY
529
 
        unavailable_statuses = (
530
 
            set(NODE_STATUS_CHOICES_DICT) - set([available_status]))
531
 
        for status in unavailable_statuses:
532
 
            factory.make_node(status=status)
533
 
        self.assertEqual(
534
 
            None, Node.objects.get_available_node_for_acquisition(user))
535
 
 
536
 
    def test_get_available_node_for_acquisition_ignores_invisible_nodes(self):
537
 
        user = factory.make_user()
538
 
        node = self.make_node()
539
 
        node.owner = factory.make_user()
540
 
        node.save()
541
 
        self.assertEqual(
542
 
            None, Node.objects.get_available_node_for_acquisition(user))
543
 
 
544
 
    def test_get_available_node_combines_constraint_with_availability(self):
545
 
        user = factory.make_user()
546
 
        node = self.make_node(factory.make_user())
547
 
        self.assertEqual(
548
 
            None,
549
 
            Node.objects.get_available_node_for_acquisition(
550
 
                user, {'name': node.system_id}))
551
 
 
552
 
    def test_get_available_node_constrains_by_name(self):
553
 
        user = factory.make_user()
554
 
        nodes = [self.make_node() for counter in range(3)]
555
 
        self.assertEqual(
556
 
            nodes[1],
557
 
            Node.objects.get_available_node_for_acquisition(
558
 
                user, {'name': nodes[1].hostname}))
559
 
 
560
 
    def test_get_available_node_returns_None_if_name_is_unknown(self):
561
 
        user = factory.make_user()
562
 
        self.assertEqual(
563
 
            None,
564
 
            Node.objects.get_available_node_for_acquisition(
565
 
                user, {'name': factory.getRandomString()}))
566
 
 
567
 
    def test_stop_nodes_stops_nodes(self):
568
 
        user = factory.make_user()
569
 
        node = self.make_node(user)
570
 
        output = Node.objects.stop_nodes([node.system_id], user)
571
 
 
572
 
        self.assertItemsEqual([node], output)
573
 
        power_status = get_provisioning_api_proxy().power_status
574
 
        self.assertEqual('stop', power_status[node.system_id])
575
 
 
576
 
    def test_stop_nodes_ignores_uneditable_nodes(self):
577
 
        nodes = [self.make_node(factory.make_user()) for counter in range(3)]
578
 
        ids = [node.system_id for node in nodes]
579
 
        stoppable_node = nodes[0]
580
 
        self.assertItemsEqual(
581
 
            [stoppable_node],
582
 
            Node.objects.stop_nodes(ids, stoppable_node.owner))
583
 
 
584
 
    def test_start_nodes_starts_nodes(self):
585
 
        user = factory.make_user()
586
 
        node = self.make_node(user)
587
 
        output = Node.objects.start_nodes([node.system_id], user)
588
 
 
589
 
        self.assertItemsEqual([node], output)
590
 
        power_status = get_provisioning_api_proxy().power_status
591
 
        self.assertEqual('start', power_status[node.system_id])
592
 
 
593
 
    def test_start_nodes_sets_commissioning_profile(self):
594
 
        # Starting up a node should always set a profile. Here we test
595
 
        # that a commissioning profile was set for nodes in the
596
 
        # commissioning status.
597
 
        user = factory.make_user()
598
 
        node = factory.make_node(
599
 
            set_hostname=True, status=NODE_STATUS.COMMISSIONING, owner=user)
600
 
        output = Node.objects.start_nodes([node.system_id], user)
601
 
 
602
 
        self.assertItemsEqual([node], output)
603
 
        profile = get_provisioning_api_proxy().nodes[node.system_id]['profile']
604
 
        self.assertEqual('maas-precise-i386-commissioning', profile)
605
 
 
606
 
    def test_start_nodes_doesnt_set_commissioning_profile(self):
607
 
        # Starting up a node should always set a profile. Complement the
608
 
        # above test to show that a different profile can be set.
609
 
        user = factory.make_user()
610
 
        node = self.make_node(user)
611
 
        output = Node.objects.start_nodes([node.system_id], user)
612
 
 
613
 
        self.assertItemsEqual([node], output)
614
 
        profile = get_provisioning_api_proxy().nodes[node.system_id]['profile']
615
 
        self.assertEqual('maas-precise-i386', profile)
616
 
 
617
 
    def test_start_nodes_ignores_uneditable_nodes(self):
618
 
        nodes = [self.make_node(factory.make_user()) for counter in range(3)]
619
 
        ids = [node.system_id for node in nodes]
620
 
        startable_node = nodes[0]
621
 
        self.assertItemsEqual(
622
 
            [startable_node],
623
 
            Node.objects.start_nodes(ids, startable_node.owner))
624
 
 
625
 
    def test_start_nodes_stores_user_data(self):
626
 
        node = factory.make_node(owner=factory.make_user())
627
 
        user_data = self.make_user_data()
628
 
        Node.objects.start_nodes(
629
 
            [node.system_id], node.owner, user_data=user_data)
630
 
        self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
631
 
 
632
 
    def test_start_nodes_does_not_store_user_data_for_uneditable_nodes(self):
633
 
        node = factory.make_node(owner=factory.make_user())
634
 
        original_user_data = self.make_user_data()
635
 
        NodeUserData.objects.set_user_data(node, original_user_data)
636
 
        Node.objects.start_nodes(
637
 
            [node.system_id], factory.make_user(),
638
 
            user_data=self.make_user_data())
639
 
        self.assertEqual(
640
 
            original_user_data, NodeUserData.objects.get_user_data(node))
641
 
 
642
 
    def test_start_nodes_without_user_data_clears_existing_data(self):
643
 
        node = factory.make_node(owner=factory.make_user())
644
 
        user_data = self.make_user_data()
645
 
        NodeUserData.objects.set_user_data(node, user_data)
646
 
        Node.objects.start_nodes([node.system_id], node.owner, user_data=None)
647
 
        self.assertRaises(
648
 
            NodeUserData.DoesNotExist,
649
 
            NodeUserData.objects.get_user_data, node)
650
 
 
651
 
    def test_start_nodes_with_user_data_overwrites_existing_data(self):
652
 
        node = factory.make_node(owner=factory.make_user())
653
 
        NodeUserData.objects.set_user_data(node, self.make_user_data())
654
 
        user_data = self.make_user_data()
655
 
        Node.objects.start_nodes(
656
 
            [node.system_id], node.owner, user_data=user_data)
657
 
        self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
658
 
 
659
 
 
660
 
class MACAddressTest(TestCase):
661
 
 
662
 
    def make_MAC(self, address):
663
 
        """Create a MAC address."""
664
 
        node = factory.make_node()
665
 
        return MACAddress(mac_address=address, node=node)
666
 
 
667
 
    def test_stores_to_database(self):
668
 
        mac = self.make_MAC('00:11:22:33:44:55')
669
 
        mac.save()
670
 
        self.assertEqual([mac], list(MACAddress.objects.all()))
671
 
 
672
 
    def test_invalid_address_raises_validation_error(self):
673
 
        mac = self.make_MAC('aa:bb:ccxdd:ee:ff')
674
 
        self.assertRaises(ValidationError, mac.full_clean)
675
 
 
676
 
 
677
 
class AuthTokensTest(TestCase):
678
 
    """Test creation and retrieval of auth tokens."""
679
 
 
680
 
    def assertTokenValid(self, token):
681
 
        self.assertIsInstance(token.key, basestring)
682
 
        self.assertEqual(KEY_SIZE, len(token.key))
683
 
        self.assertIsInstance(token.secret, basestring)
684
 
        self.assertEqual(SECRET_SIZE, len(token.secret))
685
 
 
686
 
    def assertConsumerValid(self, consumer):
687
 
        self.assertIsInstance(consumer.key, basestring)
688
 
        self.assertEqual(KEY_SIZE, len(consumer.key))
689
 
        self.assertEqual('', consumer.secret)
690
 
 
691
 
    def test_create_auth_token(self):
692
 
        user = factory.make_user()
693
 
        token = create_auth_token(user)
694
 
        self.assertEqual(user, token.user)
695
 
        self.assertEqual(user, token.consumer.user)
696
 
        self.assertTrue(token.is_approved)
697
 
        self.assertConsumerValid(token.consumer)
698
 
        self.assertTokenValid(token)
699
 
 
700
 
    def test_get_auth_tokens_finds_tokens_for_user(self):
701
 
        user = factory.make_user()
702
 
        token = create_auth_token(user)
703
 
        self.assertIn(token, get_auth_tokens(user))
704
 
 
705
 
    def test_get_auth_tokens_ignores_other_users(self):
706
 
        user, other_user = factory.make_user(), factory.make_user()
707
 
        unrelated_token = create_auth_token(other_user)
708
 
        self.assertNotIn(unrelated_token, get_auth_tokens(user))
709
 
 
710
 
    def test_get_auth_tokens_ignores_unapproved_tokens(self):
711
 
        user = factory.make_user()
712
 
        token = create_auth_token(user)
713
 
        token.is_approved = False
714
 
        token.save()
715
 
        self.assertNotIn(token, get_auth_tokens(user))
716
 
 
717
 
 
718
 
class UserProfileTest(TestCase):
719
 
 
720
 
    def test_profile_creation(self):
721
 
        # A profile is created each time a user is created.
722
 
        user = factory.make_user()
723
 
        self.assertIsInstance(user.get_profile(), UserProfile)
724
 
        self.assertEqual(user, user.get_profile().user)
725
 
 
726
 
    def test_consumer_creation(self):
727
 
        # A generic consumer is created each time a user is created.
728
 
        user = factory.make_user()
729
 
        consumers = Consumer.objects.filter(user=user, name=GENERIC_CONSUMER)
730
 
        self.assertEqual([user], [consumer.user for consumer in consumers])
731
 
 
732
 
    def test_token_creation(self):
733
 
        # A token is created each time a user is created.
734
 
        user = factory.make_user()
735
 
        [token] = get_auth_tokens(user)
736
 
        self.assertEqual(user, token.user)
737
 
 
738
 
    def test_create_authorisation_token(self):
739
 
        # UserProfile.create_authorisation_token calls create_auth_token.
740
 
        user = factory.make_user()
741
 
        profile = user.get_profile()
742
 
        consumer, token = profile.create_authorisation_token()
743
 
        self.assertEqual(user, token.user)
744
 
        self.assertEqual(user, consumer.user)
745
 
 
746
 
    def test_get_authorisation_tokens(self):
747
 
        # UserProfile.get_authorisation_tokens calls get_auth_tokens.
748
 
        user = factory.make_user()
749
 
        consumer, token = user.get_profile().create_authorisation_token()
750
 
        self.assertIn(token, user.get_profile().get_authorisation_tokens())
751
 
 
752
 
    def test_delete(self):
753
 
        # Deleting a profile also deletes the related user.
754
 
        profile = factory.make_user().get_profile()
755
 
        profile_id = profile.id
756
 
        user_id = profile.user.id
757
 
        self.assertTrue(User.objects.filter(id=user_id).exists())
758
 
        self.assertTrue(
759
 
            UserProfile.objects.filter(id=profile_id).exists())
760
 
        profile.delete()
761
 
        self.assertFalse(User.objects.filter(id=user_id).exists())
762
 
        self.assertFalse(
763
 
            UserProfile.objects.filter(id=profile_id).exists())
764
 
 
765
 
    def test_delete_consumers_tokens(self):
766
 
        # Deleting a profile deletes the related tokens and consumers.
767
 
        profile = factory.make_user().get_profile()
768
 
        token_ids = []
769
 
        consumer_ids = []
770
 
        for i in range(3):
771
 
            token, consumer = profile.create_authorisation_token()
772
 
            token_ids.append(token.id)
773
 
            consumer_ids.append(consumer.id)
774
 
        profile.delete()
775
 
        self.assertFalse(Consumer.objects.filter(id__in=consumer_ids).exists())
776
 
        self.assertFalse(Token.objects.filter(id__in=token_ids).exists())
777
 
 
778
 
    def test_delete_attached_nodes(self):
779
 
        # Cannot delete a user with nodes attached to it.
780
 
        profile = factory.make_user().get_profile()
781
 
        factory.make_node(owner=profile.user)
782
 
        self.assertRaises(CannotDeleteUserException, profile.delete)
783
 
 
784
 
    def test_manager_all_users(self):
785
 
        users = set(factory.make_user() for i in range(3))
786
 
        all_users = set(UserProfile.objects.all_users())
787
 
        self.assertEqual(users, all_users)
788
 
 
789
 
    def test_manager_all_users_no_system_user(self):
790
 
        for i in range(3):
791
 
            factory.make_user()
792
 
        usernames = set(
793
 
            user.username for user in UserProfile.objects.all_users())
794
 
        self.assertTrue(set(SYSTEM_USERS).isdisjoint(usernames))
795
 
 
796
 
 
797
 
class SSHKeyValidatorTest(TestCase):
798
 
 
799
 
    def test_validates_rsa_public_key(self):
800
 
        key_string = get_data('data/test_rsa0.pub')
801
 
        validate_ssh_public_key(key_string)
802
 
        # No ValidationError.
803
 
 
804
 
    def test_validates_dsa_public_key(self):
805
 
        key_string = get_data('data/test_dsa.pub')
806
 
        validate_ssh_public_key(key_string)
807
 
        # No ValidationError.
808
 
 
809
 
    def test_does_not_validate_random_data(self):
810
 
        key_string = factory.getRandomString()
811
 
        self.assertRaises(
812
 
            ValidationError, validate_ssh_public_key, key_string)
813
 
 
814
 
    def test_does_not_validate_wrongly_padded_data(self):
815
 
        key_string = 'ssh-dss %s %s@%s' % (
816
 
            factory.getRandomString(), factory.getRandomString(),
817
 
            factory.getRandomString())
818
 
        self.assertRaises(
819
 
            ValidationError, validate_ssh_public_key, key_string)
820
 
 
821
 
    def test_does_not_validate_rsa_private_key(self):
822
 
        key_string = get_data('data/test_rsa')
823
 
        self.assertRaises(
824
 
            ValidationError, validate_ssh_public_key, key_string)
825
 
 
826
 
    def test_does_not_validate_dsa_private_key(self):
827
 
        key_string = get_data('data/test_dsa')
828
 
        self.assertRaises(
829
 
            ValidationError, validate_ssh_public_key, key_string)
830
 
 
831
 
 
832
 
class GetHTMLDisplayForKeyTest(TestCase):
833
 
    """Testing for the method `get_html_display_for_key`."""
834
 
 
835
 
    def make_comment(self, length):
836
 
        """Create a comment of the desired length.
837
 
 
838
 
        The comment may contain spaces, but not begin or end in them.  It
839
 
        will be of the desired length both before and after stripping.
840
 
        """
841
 
        return ''.join([
842
 
            factory.getRandomString(1),
843
 
            factory.getRandomString(max([length - 2, 0]), spaces=True),
844
 
            factory.getRandomString(1),
845
 
            ])[:length]
846
 
 
847
 
    def make_key(self, type_len=7, key_len=360, comment_len=None):
848
 
        """Produce a fake ssh public key containing arbitrary data.
849
 
 
850
 
        :param type_len: The length of the "key type" field.  (Default is
851
 
            sized for the real-life "ssh-rsa").
852
 
        :param key_len: Length of the key text.  (With a roughly realistic
853
 
            default).
854
 
        :param comment_len: Length of the comment field.  The comment may
855
 
            contain spaces.  Leave it None to omit the comment.
856
 
        :return: A string representing the combined key-file contents.
857
 
        """
858
 
        fields = [
859
 
            factory.getRandomString(type_len),
860
 
            factory.getRandomString(key_len),
861
 
            ]
862
 
        if comment_len is not None:
863
 
            fields.append(self.make_comment(comment_len))
864
 
        return " ".join(fields)
865
 
 
866
 
    def test_display_returns_unchanged_if_unknown_and_small(self):
867
 
        # If the key does not look like a normal key (with three parts
868
 
        # separated by spaces, it's returned unchanged if its size is <=
869
 
        # size.
870
 
        size = random.randint(101, 200)
871
 
        key = factory.getRandomString(size - 100)
872
 
        display = get_html_display_for_key(key, size)
873
 
        self.assertTrue(len(display) < size)
874
 
        self.assertEqual(key, display)
875
 
 
876
 
    def test_display_returns_cropped_if_unknown_and_large(self):
877
 
        # If the key does not look like a normal key (with three parts
878
 
        # separated by spaces, it's returned cropped if its size is >
879
 
        # size.
880
 
        size = random.randint(20, 100)  # size cannot be < len(HELLIPSIS).
881
 
        key = factory.getRandomString(size + 1)
882
 
        display = get_html_display_for_key(key, size)
883
 
        self.assertEqual(size, len(display))
884
 
        self.assertEqual(
885
 
            '%.*s%s' % (size - len(HELLIPSIS), key, HELLIPSIS), display)
886
 
 
887
 
    def test_display_escapes_commentless_key_for_html(self):
888
 
        # The key's comment may contain characters that are not safe for
889
 
        # including in HTML, and so get_html_display_for_key escapes the
890
 
        # text.
891
 
        # There are several code paths in get_html_display_for_key; this
892
 
        # test is for the case where the key has no comment, and is
893
 
        # brief enough to fit into the allotted space.
894
 
        self.assertEqual(
895
 
            "&lt;type&gt; &lt;text&gt;",
896
 
            get_html_display_for_key("<type> <text>", 100))
897
 
 
898
 
    def test_display_escapes_short_key_for_html(self):
899
 
        # The key's comment may contain characters that are not safe for
900
 
        # including in HTML, and so get_html_display_for_key escapes the
901
 
        # text.
902
 
        # There are several code paths in get_html_display_for_key; this
903
 
        # test is for the case where the whole key is short enough to
904
 
        # fit completely into the output.
905
 
        key = "<type> <text> <comment>"
906
 
        display = get_html_display_for_key(key, 100)
907
 
        # This also verifies that the entire key fits into the string.
908
 
        # Otherwise we might accidentally get one of the other cases.
909
 
        self.assertThat(display, EndsWith("&lt;comment&gt;"))
910
 
        # And of course the check also implies that the text is
911
 
        # HTML-escaped:
912
 
        self.assertNotIn("<", display)
913
 
        self.assertNotIn(">", display)
914
 
 
915
 
    def test_display_escapes_long_key_for_html(self):
916
 
        # The key's comment may contain characters that are not safe for
917
 
        # including in HTML, and so get_html_display_for_key escapes the
918
 
        # text.
919
 
        # There are several code paths in get_html_display_for_key; this
920
 
        # test is for the case where the comment is short enough to fit
921
 
        # completely into the output.
922
 
        key = "<type> %s <comment>" % ("<&>" * 50)
923
 
        display = get_html_display_for_key(key, 50)
924
 
        # This verifies that we are indeed getting an abbreviated
925
 
        # display.  Otherwise we might accidentally get one of the other
926
 
        # cases.
927
 
        self.assertIn("&hellip;", display)
928
 
        self.assertIn("comment", display)
929
 
        # And now, on to checking that the text is HTML-safe.
930
 
        self.assertNotIn("<", display)
931
 
        self.assertNotIn(">", display)
932
 
        self.assertThat(display, EndsWith("&lt;comment&gt;"))
933
 
 
934
 
    def test_display_limits_size_with_large_comment(self):
935
 
        # If the key has a large 'comment' part, the key is simply
936
 
        # cropped and HELLIPSIS appended to it.
937
 
        key = self.make_key(10, 10, 100)
938
 
        display = get_html_display_for_key(key, 50)
939
 
        self.assertEqual(50, len(display))
940
 
        self.assertEqual(
941
 
            '%.*s%s' % (50 - len(HELLIPSIS), key, HELLIPSIS), display)
942
 
 
943
 
    def test_display_limits_size_with_large_key_type(self):
944
 
        # If the key has a large 'key_type' part, the key is simply
945
 
        # cropped and HELLIPSIS appended to it.
946
 
        key = self.make_key(100, 10, 10)
947
 
        display = get_html_display_for_key(key, 50)
948
 
        self.assertEqual(50, len(display))
949
 
        self.assertEqual(
950
 
            '%.*s%s' % (50 - len(HELLIPSIS), key, HELLIPSIS), display)
951
 
 
952
 
    def test_display_cropped_key(self):
953
 
        # If the key has a small key_type, a small comment and a large
954
 
        # key_string (which is the 'normal' case), the key_string part
955
 
        # gets cropped.
956
 
        type_len = 10
957
 
        comment_len = 10
958
 
        key = self.make_key(type_len, 100, comment_len)
959
 
        key_type, key_string, comment = key.split(' ', 2)
960
 
        display = get_html_display_for_key(key, 50)
961
 
        self.assertEqual(50, len(display))
962
 
        self.assertEqual(
963
 
            '%s %.*s%s %s' % (
964
 
                key_type,
965
 
                50 - (type_len + len(HELLIPSIS) + comment_len + 2),
966
 
                key_string, HELLIPSIS, comment),
967
 
            display)
968
 
 
969
 
 
970
 
class SSHKeyTest(TestCase):
971
 
    """Testing for the :class:`SSHKey`."""
972
 
 
973
 
    def test_sshkey_validation_with_valid_key(self):
974
 
        key_string = get_data('data/test_rsa0.pub')
975
 
        user = factory.make_user()
976
 
        key = SSHKey(key=key_string, user=user)
977
 
        key.full_clean()
978
 
        # No ValidationError.
979
 
 
980
 
    def test_sshkey_validation_fails_if_key_is_invalid(self):
981
 
        key_string = factory.getRandomString()
982
 
        user = factory.make_user()
983
 
        key = SSHKey(key=key_string, user=user)
984
 
        self.assertRaises(
985
 
            ValidationError, key.full_clean)
986
 
 
987
 
    def test_sshkey_display_with_real_life_key(self):
988
 
        # With a real-life ssh-rsa key, the key_string part is cropped.
989
 
        key_string = get_data('data/test_rsa0.pub')
990
 
        user = factory.make_user()
991
 
        key = SSHKey(key=key_string, user=user)
992
 
        display = key.display_html()
993
 
        self.assertEqual(
994
 
            'ssh-rsa AAAAB3NzaC1yc2E&hellip; ubuntu@server-7476', display)
995
 
 
996
 
    def test_sshkey_display_is_marked_as_HTML_safe(self):
997
 
        key_string = get_data('data/test_rsa0.pub')
998
 
        user = factory.make_user()
999
 
        key = SSHKey(key=key_string, user=user)
1000
 
        display = key.display_html()
1001
 
        self.assertIsInstance(display, SafeUnicode)
1002
 
 
1003
 
    def test_sshkey_user_and_key_unique_together(self):
1004
 
        key_string = get_data('data/test_rsa0.pub')
1005
 
        user = factory.make_user()
1006
 
        key = SSHKey(key=key_string, user=user)
1007
 
        key.save()
1008
 
        key2 = SSHKey(key=key_string, user=user)
1009
 
        self.assertRaises(
1010
 
            ValidationError, key2.full_clean)
1011
 
 
1012
 
    def test_sshkey_user_and_key_unique_together_db_level(self):
1013
 
        key_string = get_data('data/test_rsa0.pub')
1014
 
        user = factory.make_user()
1015
 
        key = SSHKey(key=key_string, user=user)
1016
 
        key.save()
1017
 
        key2 = SSHKey(key=key_string, user=user)
1018
 
        self.assertRaises(
1019
 
            IntegrityError, key2.save, skip_check=True)
1020
 
 
1021
 
    def test_sshkey_same_key_can_be_used_by_different_users(self):
1022
 
        key_string = get_data('data/test_rsa0.pub')
1023
 
        user = factory.make_user()
1024
 
        key = SSHKey(key=key_string, user=user)
1025
 
        key.save()
1026
 
        user2 = factory.make_user()
1027
 
        key2 = SSHKey(key=key_string, user=user2)
1028
 
        key2.full_clean()
1029
 
        # No ValidationError.
1030
 
 
1031
 
 
1032
 
class SSHKeyManagerTest(TestCase):
1033
 
    """Testing for the :class:`SSHKeyManager` model manager."""
1034
 
 
1035
 
    def test_get_keys_for_user_no_keys(self):
1036
 
        user = factory.make_user()
1037
 
        keys = SSHKey.objects.get_keys_for_user(user)
1038
 
        self.assertItemsEqual([], keys)
1039
 
 
1040
 
    def test_get_keys_for_user_with_keys(self):
1041
 
        user1, created_keys = factory.make_user_with_keys(
1042
 
            n_keys=3, username='user1')
1043
 
        # user2
1044
 
        factory.make_user_with_keys(n_keys=2)
1045
 
        keys = SSHKey.objects.get_keys_for_user(user1)
1046
 
        self.assertItemsEqual([key.key for key in created_keys], keys)
1047
 
 
1048
 
 
1049
 
class FileStorageTest(TestCase):
1050
 
    """Testing of the :class:`FileStorage` model."""
1051
 
 
1052
 
    def make_upload_dir(self):
1053
 
        """Create the upload directory, and arrange for eventual deletion.
1054
 
 
1055
 
        The directory must not already exist.  If it does, this method will
1056
 
        fail rather than arrange for deletion of a directory that may
1057
 
        contain meaningful data.
1058
 
 
1059
 
        :return: Absolute path to the `FileStorage` upload directory.  This
1060
 
            is the directory where the actual files are stored.
1061
 
        """
1062
 
        media_root = settings.MEDIA_ROOT
1063
 
        self.assertFalse(os.path.exists(media_root), "See media/README")
1064
 
        self.addCleanup(shutil.rmtree, media_root, ignore_errors=True)
1065
 
        os.mkdir(media_root)
1066
 
        upload_dir = os.path.join(media_root, FileStorage.upload_dir)
1067
 
        os.mkdir(upload_dir)
1068
 
        return upload_dir
1069
 
 
1070
 
    def get_media_path(self, filename):
1071
 
        """Get the path to a given stored file, relative to MEDIA_ROOT."""
1072
 
        return os.path.join(FileStorage.upload_dir, filename)
1073
 
 
1074
 
    def make_data(self, including_text='data'):
1075
 
        """Return arbitrary data.
1076
 
 
1077
 
        :param including_text: Text to include in the data.  Leave something
1078
 
            here to make failure messages more recognizable.
1079
 
        :type including_text: basestring
1080
 
        :return: A string of bytes, including `including_text`.
1081
 
        :rtype: bytes
1082
 
        """
1083
 
        # Note that this won't automatically insert any non-ASCII bytes.
1084
 
        # Proper handling of real binary data is tested separately.
1085
 
        text = "%s %s" % (including_text, factory.getRandomString())
1086
 
        return text.encode('ascii')
1087
 
 
1088
 
    def age_file(self, path, seconds=None):
1089
 
        """Make the file at `path` look like it hasn't been touched recently.
1090
 
 
1091
 
        Decrements the file's mtime by a bit over a day.
1092
 
        """
1093
 
        if seconds is None:
1094
 
            seconds = FileStorage.objects.grace_time + 1
1095
 
        stat_result = os.stat(path)
1096
 
        atime = stat_result.st_atime
1097
 
        mtime = stat_result.st_mtime
1098
 
        os.utime(path, (atime, mtime - seconds))
1099
 
 
1100
 
    def test_get_existing_storage_returns_None_if_none_found(self):
1101
 
        nonexistent_file = factory.getRandomString()
1102
 
        self.assertIsNone(
1103
 
            FileStorage.objects.get_existing_storage(nonexistent_file))
1104
 
 
1105
 
    def test_get_existing_storage_finds_FileStorage(self):
1106
 
        self.make_upload_dir()
1107
 
        storage = factory.make_file_storage()
1108
 
        self.assertEqual(
1109
 
            storage,
1110
 
            FileStorage.objects.get_existing_storage(storage.filename))
1111
 
 
1112
 
    def test_save_file_creates_storage(self):
1113
 
        self.make_upload_dir()
1114
 
        filename = factory.getRandomString()
1115
 
        data = self.make_data()
1116
 
        storage = FileStorage.objects.save_file(filename, BytesIO(data))
1117
 
        self.assertEqual(
1118
 
            (filename, data),
1119
 
            (storage.filename, storage.data.read()))
1120
 
 
1121
 
    def test_storage_can_be_retrieved(self):
1122
 
        self.make_upload_dir()
1123
 
        filename = factory.getRandomString()
1124
 
        data = self.make_data()
1125
 
        factory.make_file_storage(filename=filename, data=data)
1126
 
        storage = FileStorage.objects.get(filename=filename)
1127
 
        self.assertEqual(
1128
 
            (filename, data),
1129
 
            (storage.filename, storage.data.read()))
1130
 
 
1131
 
    def test_stores_binary_data(self):
1132
 
        self.make_upload_dir()
1133
 
 
1134
 
        # This horrible binary data could never, ever, under any
1135
 
        # encoding known to man be interpreted as text(1).  Switch the
1136
 
        # bytes of the byte-order mark around and by design you get an
1137
 
        # invalid codepoint; put a byte with the high bit set between bytes
1138
 
        # that have it cleared, and you have a guaranteed non-UTF-8
1139
 
        # sequence.
1140
 
        #
1141
 
        # (1) Provided, of course, that man know only about ASCII and
1142
 
        # UTF.
1143
 
        binary_data = codecs.BOM64_LE + codecs.BOM64_BE + b'\x00\xff\x00'
1144
 
 
1145
 
        # And yet, because FileStorage supports binary data, it comes
1146
 
        # out intact.
1147
 
        storage = factory.make_file_storage(filename="x", data=binary_data)
1148
 
        self.assertEqual(binary_data, storage.data.read())
1149
 
 
1150
 
    def test_overwrites_file(self):
1151
 
        # If a file of the same name has already been stored, the
1152
 
        # reference to the old data gets overwritten with one to the new
1153
 
        # data.  They are actually different files on the filesystem.
1154
 
        self.make_upload_dir()
1155
 
        filename = 'filename-%s' % factory.getRandomString()
1156
 
        old_storage = factory.make_file_storage(
1157
 
            filename=filename, data=self.make_data('old data'))
1158
 
        new_data = self.make_data('new-data')
1159
 
        new_storage = factory.make_file_storage(
1160
 
            filename=filename, data=new_data)
1161
 
        self.assertNotEqual(old_storage.data.name, new_storage.data.name)
1162
 
        self.assertEqual(
1163
 
            new_data, FileStorage.objects.get(filename=filename).data.read())
1164
 
 
1165
 
    def test_list_stored_files_lists_files(self):
1166
 
        upload_dir = self.make_upload_dir()
1167
 
        filename = factory.getRandomString()
1168
 
        with open(os.path.join(upload_dir, filename), 'w') as f:
1169
 
            f.write(self.make_data())
1170
 
        self.assertIn(
1171
 
            self.get_media_path(filename),
1172
 
            FileStorage.objects.list_stored_files())
1173
 
 
1174
 
    def test_list_stored_files_includes_referenced_files(self):
1175
 
        self.make_upload_dir()
1176
 
        storage = factory.make_file_storage()
1177
 
        self.assertIn(
1178
 
            storage.data.name, FileStorage.objects.list_stored_files())
1179
 
 
1180
 
    def test_list_referenced_files_lists_FileStorage_files(self):
1181
 
        self.make_upload_dir()
1182
 
        storage = factory.make_file_storage()
1183
 
        self.assertIn(
1184
 
            storage.data.name, FileStorage.objects.list_referenced_files())
1185
 
 
1186
 
    def test_list_referenced_files_excludes_unreferenced_files(self):
1187
 
        upload_dir = self.make_upload_dir()
1188
 
        filename = factory.getRandomString()
1189
 
        with open(os.path.join(upload_dir, filename), 'w') as f:
1190
 
            f.write(self.make_data())
1191
 
        self.assertNotIn(
1192
 
            self.get_media_path(filename),
1193
 
            FileStorage.objects.list_referenced_files())
1194
 
 
1195
 
    def test_list_referenced_files_uses_file_name_not_FileStorage_name(self):
1196
 
        self.make_upload_dir()
1197
 
        filename = factory.getRandomString()
1198
 
        # The filename we're going to use is already taken.  The file
1199
 
        # we'll be looking at will have to have a different name.
1200
 
        factory.make_file_storage(filename=filename)
1201
 
        storage = factory.make_file_storage(filename=filename)
1202
 
        # It's the name of the file, not the FileStorage.filename, that
1203
 
        # is in list_referenced_files.
1204
 
        self.assertIn(
1205
 
            storage.data.name, FileStorage.objects.list_referenced_files())
1206
 
 
1207
 
    def test_is_old_returns_False_for_recent_file(self):
1208
 
        upload_dir = self.make_upload_dir()
1209
 
        filename = factory.getRandomString()
1210
 
        path = os.path.join(upload_dir, filename)
1211
 
        with open(path, 'w') as f:
1212
 
            f.write(self.make_data())
1213
 
        self.age_file(path, FileStorage.objects.grace_time - 60)
1214
 
        self.assertFalse(
1215
 
            FileStorage.objects.is_old(self.get_media_path(filename)))
1216
 
 
1217
 
    def test_is_old_returns_True_for_old_file(self):
1218
 
        upload_dir = self.make_upload_dir()
1219
 
        filename = factory.getRandomString()
1220
 
        path = os.path.join(upload_dir, filename)
1221
 
        with open(path, 'w') as f:
1222
 
            f.write(self.make_data())
1223
 
        self.age_file(path, FileStorage.objects.grace_time + 1)
1224
 
        self.assertTrue(
1225
 
            FileStorage.objects.is_old(self.get_media_path(filename)))
1226
 
 
1227
 
    def test_collect_garbage_deletes_garbage(self):
1228
 
        upload_dir = self.make_upload_dir()
1229
 
        filename = factory.getRandomString()
1230
 
        path = os.path.join(upload_dir, filename)
1231
 
        with open(path, 'w') as f:
1232
 
            f.write(self.make_data())
1233
 
        self.age_file(path)
1234
 
        FileStorage.objects.collect_garbage()
1235
 
        self.assertFalse(
1236
 
            FileStorage.storage.exists(self.get_media_path(filename)))
1237
 
 
1238
 
    def test_grace_time_is_generous_but_not_unlimited(self):
1239
 
        # Grace time for garbage collection is long enough that it won't
1240
 
        # expire while the request that wrote it is still being handled.
1241
 
        # But it won't keep a file around for ages.  For instance, it'll
1242
 
        # be more than 20 seconds, but less than a day.
1243
 
        self.assertThat(FileStorage.objects.grace_time, GreaterThan(20))
1244
 
        self.assertThat(FileStorage.objects.grace_time, LessThan(24 * 60 * 60))
1245
 
 
1246
 
    def test_collect_garbage_leaves_recent_files_alone(self):
1247
 
        upload_dir = self.make_upload_dir()
1248
 
        filename = factory.getRandomString()
1249
 
        with open(os.path.join(upload_dir, filename), 'w') as f:
1250
 
            f.write(self.make_data())
1251
 
        FileStorage.objects.collect_garbage()
1252
 
        self.assertTrue(
1253
 
            FileStorage.storage.exists(self.get_media_path(filename)))
1254
 
 
1255
 
    def test_collect_garbage_leaves_referenced_files_alone(self):
1256
 
        self.make_upload_dir()
1257
 
        storage = factory.make_file_storage()
1258
 
        self.age_file(storage.data.path)
1259
 
        FileStorage.objects.collect_garbage()
1260
 
        self.assertTrue(FileStorage.storage.exists(storage.data.name))
1261
 
 
1262
 
    def test_collect_garbage_tolerates_missing_upload_dir(self):
1263
 
        # When MAAS is freshly installed, the upload directory is still
1264
 
        # missing.  But...
1265
 
        FileStorage.objects.collect_garbage()
1266
 
        # ...we get through garbage collection without breakage.
1267
 
        pass
1268
 
 
1269
 
 
1270
 
class ConfigDefaultTest(TestCase, TestWithFixtures):
1271
 
    """Test config default values."""
1272
 
 
1273
 
    def test_default_config_maas_name(self):
1274
 
        default_config = get_default_config()
1275
 
        self.assertEqual(gethostname(), default_config['maas_name'])
1276
 
 
1277
 
 
1278
 
class Listener:
1279
 
    """A utility class which tracks the calls to its 'call' method and
1280
 
    stores the arguments given to 'call' in 'self.calls'.
1281
 
    """
1282
 
 
1283
 
    def __init__(self):
1284
 
        self.calls = []
1285
 
 
1286
 
    def call(self, *args, **kwargs):
1287
 
        self.calls.append([args, kwargs])
1288
 
 
1289
 
 
1290
 
class ConfigTest(TestCase):
1291
 
    """Testing of the :class:`Config` model and its related manager class."""
1292
 
 
1293
 
    def test_manager_get_config_found(self):
1294
 
        Config.objects.create(name='name', value='config')
1295
 
        config = Config.objects.get_config('name')
1296
 
        self.assertEqual('config', config)
1297
 
 
1298
 
    def test_manager_get_config_not_found(self):
1299
 
        config = Config.objects.get_config('name', 'default value')
1300
 
        self.assertEqual('default value', config)
1301
 
 
1302
 
    def test_manager_get_config_not_found_none(self):
1303
 
        config = Config.objects.get_config('name')
1304
 
        self.assertIsNone(config)
1305
 
 
1306
 
    def test_manager_get_config_not_found_in_default_config(self):
1307
 
        name = factory.getRandomString()
1308
 
        value = factory.getRandomString()
1309
 
        DEFAULT_CONFIG[name] = value
1310
 
        config = Config.objects.get_config(name, None)
1311
 
        self.assertEqual(value, config)
1312
 
 
1313
 
    def test_default_config_cannot_be_changed(self):
1314
 
        name = factory.getRandomString()
1315
 
        DEFAULT_CONFIG[name] = {'key': 'value'}
1316
 
        config = Config.objects.get_config(name)
1317
 
        config.update({'key2': 'value2'})
1318
 
 
1319
 
        self.assertEqual({'key': 'value'}, Config.objects.get_config(name))
1320
 
 
1321
 
    def test_manager_get_config_list_returns_config_list(self):
1322
 
        Config.objects.create(name='name', value='config1')
1323
 
        Config.objects.create(name='name', value='config2')
1324
 
        config_list = Config.objects.get_config_list('name')
1325
 
        self.assertItemsEqual(['config1', 'config2'], config_list)
1326
 
 
1327
 
    def test_manager_set_config_creates_config(self):
1328
 
        Config.objects.set_config('name', 'config1')
1329
 
        Config.objects.set_config('name', 'config2')
1330
 
        self.assertSequenceEqual(
1331
 
            ['config2'],
1332
 
            [config.value for config in Config.objects.filter(name='name')])
1333
 
 
1334
 
    def test_manager_config_changed_connect_connects(self):
1335
 
        listener = Listener()
1336
 
        name = factory.getRandomString()
1337
 
        value = factory.getRandomString()
1338
 
        Config.objects.config_changed_connect(name, listener.call)
1339
 
        Config.objects.set_config(name, value)
1340
 
        config = Config.objects.get(name=name)
1341
 
 
1342
 
        self.assertEqual(1, len(listener.calls))
1343
 
        self.assertEqual((Config, config, True), listener.calls[0][0])
1344
 
 
1345
 
    def test_manager_config_changed_connect_connects_multiple(self):
1346
 
        listener = Listener()
1347
 
        listener2 = Listener()
1348
 
        name = factory.getRandomString()
1349
 
        value = factory.getRandomString()
1350
 
        Config.objects.config_changed_connect(name, listener.call)
1351
 
        Config.objects.config_changed_connect(name, listener2.call)
1352
 
        Config.objects.set_config(name, value)
1353
 
 
1354
 
        self.assertEqual(1, len(listener.calls))
1355
 
        self.assertEqual(1, len(listener2.calls))
1356
 
 
1357
 
    def test_manager_config_changed_connect_connects_multiple_same(self):
1358
 
        # If the same method is connected twice, it will only get called
1359
 
        # once.
1360
 
        listener = Listener()
1361
 
        name = factory.getRandomString()
1362
 
        value = factory.getRandomString()
1363
 
        Config.objects.config_changed_connect(name, listener.call)
1364
 
        Config.objects.config_changed_connect(name, listener.call)
1365
 
        Config.objects.set_config(name, value)
1366
 
 
1367
 
        self.assertEqual(1, len(listener.calls))
1368
 
 
1369
 
    def test_manager_config_changed_connect_connects_by_config_name(self):
1370
 
        listener = Listener()
1371
 
        name = factory.getRandomString()
1372
 
        value = factory.getRandomString()
1373
 
        Config.objects.config_changed_connect(name, listener.call)
1374
 
        another_name = factory.getRandomString()
1375
 
        Config.objects.set_config(another_name, value)
1376
 
 
1377
 
        self.assertEqual(0, len(listener.calls))