1
# Copyright 2012 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Test maasserver models."""
6
from __future__ import (
16
from io import BytesIO
20
from socket import gethostname
22
from django.conf import settings
23
from django.contrib.auth.models import User
24
from django.core.exceptions import (
28
from django.db import IntegrityError
29
from django.utils.safestring import SafeUnicode
30
from fixtures import TestWithFixtures
31
from maasserver.exceptions import (
32
CannotDeleteUserException,
35
from maasserver.models import (
44
get_html_display_for_key,
51
NODE_STATUS_CHOICES_DICT,
56
validate_ssh_public_key,
58
from maasserver.provisioning import get_provisioning_api_proxy
59
from maasserver.testing import get_data
60
from maasserver.testing.enum import map_enum
61
from maasserver.testing.factory import factory
62
from maasserver.testing.testcase import TestCase
63
from metadataserver.models import (
67
from piston.models import (
73
from provisioningserver.enum import POWER_TYPE
74
from testtools.matchers import (
81
class NodeTest(TestCase):
83
def test_system_id(self):
85
The generated system_id looks good.
88
node = factory.make_node()
89
self.assertEqual(len(node.system_id), 41)
90
self.assertTrue(node.system_id.startswith('node-'))
92
def test_display_status_shows_default_status(self):
93
node = factory.make_node()
95
NODE_STATUS_CHOICES_DICT[node.status],
96
node.display_status())
98
def test_display_status_for_allocated_node_shows_owner(self):
99
node = factory.make_node(
100
owner=factory.make_user(), status=NODE_STATUS.ALLOCATED)
102
"Allocated to %s" % node.owner.username,
103
node.display_status())
105
def test_add_node_with_token(self):
106
user = factory.make_user()
107
token = create_auth_token(user)
108
node = factory.make_node(token=token)
109
self.assertEqual(token, node.token)
111
def test_add_mac_address(self):
112
node = factory.make_node()
113
node.add_mac_address('AA:BB:CC:DD:EE:FF')
114
macs = MACAddress.objects.filter(
115
node=node, mac_address='AA:BB:CC:DD:EE:FF').count()
116
self.assertEqual(1, macs)
118
def test_remove_mac_address(self):
119
node = factory.make_node()
120
node.add_mac_address('AA:BB:CC:DD:EE:FF')
121
node.remove_mac_address('AA:BB:CC:DD:EE:FF')
122
macs = MACAddress.objects.filter(
123
node=node, mac_address='AA:BB:CC:DD:EE:FF').count()
124
self.assertEqual(0, macs)
126
def test_delete_node_deletes_related_mac(self):
127
node = factory.make_node()
128
mac = node.add_mac_address('AA:BB:CC:DD:EE:FF')
131
MACAddress.DoesNotExist, MACAddress.objects.get, id=mac.id)
133
def test_cannot_delete_allocated_node(self):
134
node = factory.make_node(status=NODE_STATUS.ALLOCATED)
135
self.assertRaises(NodeStateViolation, node.delete)
137
def test_set_mac_based_hostname_default_enlistment_domain(self):
138
# The enlistment domain defaults to `local`.
139
node = factory.make_node()
140
node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
141
hostname = 'node-aabbccddeeff.local'
142
self.assertEqual(hostname, node.hostname)
144
def test_set_mac_based_hostname_alt_enlistment_domain(self):
145
# A non-default enlistment domain can be specified.
146
Config.objects.set_config("enlistment_domain", "example.com")
147
node = factory.make_node()
148
node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
149
hostname = 'node-aabbccddeeff.example.com'
150
self.assertEqual(hostname, node.hostname)
152
def test_set_mac_based_hostname_cleaning_enlistment_domain(self):
153
# Leading and trailing dots and whitespace are cleaned from the
154
# configured enlistment domain before it's joined to the hostname.
155
Config.objects.set_config("enlistment_domain", " .example.com. ")
156
node = factory.make_node()
157
node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
158
hostname = 'node-aabbccddeeff.example.com'
159
self.assertEqual(hostname, node.hostname)
161
def test_set_mac_based_hostname_no_enlistment_domain(self):
162
# The enlistment domain can be set to the empty string and
163
# set_mac_based_hostname sets a hostname with no domain.
164
Config.objects.set_config("enlistment_domain", "")
165
node = factory.make_node()
166
node.set_mac_based_hostname('AA:BB:CC:DD:EE:FF')
167
hostname = 'node-aabbccddeeff'
168
self.assertEqual(hostname, node.hostname)
170
def test_get_effective_power_type_defaults_to_config(self):
171
power_types = list(map_enum(POWER_TYPE).values())
172
power_types.remove(POWER_TYPE.DEFAULT)
173
node = factory.make_node(power_type=POWER_TYPE.DEFAULT)
175
for power_type in power_types:
176
Config.objects.set_config('node_power_type', power_type)
177
effective_types.append(node.get_effective_power_type())
178
self.assertEqual(power_types, effective_types)
180
def test_get_effective_power_type_reads_node_field(self):
181
power_types = list(map_enum(POWER_TYPE).values())
182
power_types.remove(POWER_TYPE.DEFAULT)
184
factory.make_node(power_type=power_type)
185
for power_type in power_types]
187
power_types, [node.get_effective_power_type() for node in nodes])
189
def test_get_effective_power_type_rejects_default_as_config_value(self):
190
node = factory.make_node(power_type=POWER_TYPE.DEFAULT)
191
Config.objects.set_config('node_power_type', POWER_TYPE.DEFAULT)
192
self.assertRaises(ValueError, node.get_effective_power_type)
194
def test_acquire(self):
195
node = factory.make_node(status=NODE_STATUS.READY)
196
user = factory.make_user()
197
token = create_auth_token(user)
199
self.assertEqual(user, node.owner)
200
self.assertEqual(NODE_STATUS.ALLOCATED, node.status)
202
def test_release(self):
203
node = factory.make_node(
204
status=NODE_STATUS.ALLOCATED, owner=factory.make_user())
206
self.assertEqual((NODE_STATUS.READY, None), (node.status, node.owner))
208
def test_accept_enlistment_gets_node_out_of_declared_state(self):
209
# If called on a node in Declared state, accept_enlistment()
210
# changes the node's status, and returns the node.
211
target_state = NODE_STATUS.COMMISSIONING
213
node = factory.make_node(status=NODE_STATUS.DECLARED)
214
return_value = node.accept_enlistment(factory.make_user())
215
self.assertEqual((node, target_state), (return_value, node.status))
217
def test_accept_enlistment_does_nothing_if_already_accepted(self):
218
# If a node has already been accepted, but not assigned a role
219
# yet, calling accept_enlistment on it is meaningless but not an
220
# error. The method returns None in this case.
222
NODE_STATUS.COMMISSIONING,
226
status: factory.make_node(status=status)
227
for status in accepted_states}
230
status: node.accept_enlistment(factory.make_user())
231
for status, node in nodes.items()}
234
{status: None for status in accepted_states}, return_values)
236
{status: status for status in accepted_states},
237
{status: node.status for status, node in nodes.items()})
239
def test_accept_enlistment_rejects_bad_state_change(self):
240
# If a node is neither Declared nor in one of the "accepted"
241
# states where acceptance is a safe no-op, accept_enlistment
242
# raises a node state violation and leaves the node's state
244
all_states = map_enum(NODE_STATUS).values()
245
acceptable_states = [
246
NODE_STATUS.DECLARED,
247
NODE_STATUS.COMMISSIONING,
250
unacceptable_states = set(all_states) - set(acceptable_states)
252
status: factory.make_node(status=status)
253
for status in unacceptable_states}
255
exceptions = {status: False for status in unacceptable_states}
256
for status, node in nodes.items():
258
node.accept_enlistment(factory.make_user())
259
except NodeStateViolation:
260
exceptions[status] = True
263
{status: True for status in unacceptable_states}, exceptions)
265
{status: status for status in unacceptable_states},
266
{status: node.status for status, node in nodes.items()})
268
def test_start_commissioning_changes_status_and_starts_node(self):
269
user = factory.make_user()
270
node = factory.make_node(status=NODE_STATUS.DECLARED)
271
node.start_commissioning(user)
274
'status': NODE_STATUS.COMMISSIONING,
277
self.assertAttributes(node, expected_attrs)
278
power_status = get_provisioning_api_proxy().power_status
279
self.assertEqual('start', power_status[node.system_id])
281
def test_start_commissioning_sets_user_data(self):
282
node = factory.make_node(status=NODE_STATUS.DECLARED)
283
node.start_commissioning(factory.make_admin())
284
path = settings.COMMISSIONING_SCRIPT
285
with open(path, 'r') as f:
286
commissioning_user_data = f.read()
288
commissioning_user_data,
289
NodeUserData.objects.get_user_data(node))
291
def test_missing_commissioning_script(self):
293
settings, 'COMMISSIONING_SCRIPT',
294
'/etc/' + factory.getRandomString(10))
295
node = factory.make_node(status=NODE_STATUS.DECLARED)
298
node.start_commissioning, factory.make_admin())
300
def test_start_commissioning_clears_node_commissioning_results(self):
301
node = factory.make_node(status=NODE_STATUS.DECLARED)
302
NodeCommissionResult.objects.store_data(
303
node, factory.getRandomString(), factory.getRandomString())
304
node.start_commissioning(factory.make_admin())
305
self.assertItemsEqual([], node.nodecommissionresult_set.all())
307
def test_start_commissioning_ignores_other_commissioning_results(self):
308
node = factory.make_node()
309
filename = factory.getRandomString()
310
text = factory.getRandomString()
311
NodeCommissionResult.objects.store_data(node, filename, text)
312
other_node = factory.make_node(status=NODE_STATUS.DECLARED)
313
other_node.start_commissioning(factory.make_admin())
315
text, NodeCommissionResult.objects.get_data(node, filename))
317
def test_full_clean_checks_status_transition_and_raises_if_invalid(self):
318
# RETIRED -> ALLOCATED is an invalid transition.
319
node = factory.make_node(
320
status=NODE_STATUS.RETIRED, owner=factory.make_user())
321
node.status = NODE_STATUS.ALLOCATED
322
self.assertRaisesRegexp(
324
"Invalid transition: Retired -> Allocated.",
327
def test_full_clean_passes_if_status_unchanged(self):
328
status = factory.getRandomChoice(NODE_STATUS_CHOICES)
329
node = factory.make_node(status=status)
332
# The test is that this does not raise an error.
335
def test_full_clean_passes_if_status_valid_transition(self):
336
# NODE_STATUS.READY -> NODE_STATUS.ALLOCATED is a valid
338
status = NODE_STATUS.READY
339
node = factory.make_node(status=status)
340
node.status = NODE_STATUS.ALLOCATED
342
# The test is that this does not raise an error.
345
def test_save_raises_node_state_violation_on_bad_transition(self):
346
# RETIRED -> ALLOCATED is an invalid transition.
347
node = factory.make_node(
348
status=NODE_STATUS.RETIRED, owner=factory.make_user())
349
node.status = NODE_STATUS.ALLOCATED
350
self.assertRaisesRegexp(
352
"Invalid transition: Retired -> Allocated.",
355
def test_save_does_not_check_status_transition_if_skip_check(self):
356
# RETIRED -> ALLOCATED is an invalid transition.
357
node = factory.make_node(
358
status=NODE_STATUS.RETIRED, owner=factory.make_user())
359
node.status = NODE_STATUS.ALLOCATED
360
node.save(skip_check=True)
361
# The test is that this does not raise an error.
365
class NodeTransitionsTests(TestCase):
366
"""Test the structure of NODE_TRANSITIONS."""
368
def test_NODE_TRANSITIONS_initial_states(self):
369
allowed_states = set(NODE_STATUS_CHOICES_DICT.keys() + [None])
371
self.assertTrue(set(NODE_TRANSITIONS.keys()) <= allowed_states)
373
def test_NODE_TRANSITIONS_destination_state(self):
374
all_destination_states = []
375
for destination_states in NODE_TRANSITIONS.values():
376
all_destination_states.extend(destination_states)
377
allowed_states = set(NODE_STATUS_CHOICES_DICT.keys())
379
self.assertTrue(set(all_destination_states) <= allowed_states)
382
class GetDbStateTest(TestCase):
383
"""Testing for the method `get_db_state`."""
385
def test_get_db_state_returns_db_state(self):
386
status = factory.getRandomChoice(NODE_STATUS_CHOICES)
387
node = factory.make_node(status=status)
388
another_status = factory.getRandomChoice(
389
NODE_STATUS_CHOICES, but_not=[status])
390
node.status = another_status
391
self.assertEqual(status, get_db_state(node, 'status'))
394
class NodeManagerTest(TestCase):
396
def make_node(self, user=None):
397
"""Create a node, allocated to `user` if given."""
399
status = NODE_STATUS.READY
401
status = NODE_STATUS.ALLOCATED
402
return factory.make_node(set_hostname=True, status=status, owner=user)
404
def make_user_data(self):
405
"""Create a blob of arbitrary user-data."""
406
return factory.getRandomString().encode('ascii')
408
def test_filter_by_ids_filters_nodes_by_ids(self):
409
nodes = [factory.make_node() for counter in range(5)]
410
ids = [node.system_id for node in nodes]
411
selection = slice(1, 3)
412
self.assertItemsEqual(
414
Node.objects.filter_by_ids(Node.objects.all(), ids[selection]))
416
def test_filter_by_ids_with_empty_list_returns_empty(self):
418
self.assertItemsEqual(
419
[], Node.objects.filter_by_ids(Node.objects.all(), []))
421
def test_filter_by_ids_without_ids_returns_full(self):
422
node = factory.make_node()
423
self.assertItemsEqual(
424
[node], Node.objects.filter_by_ids(Node.objects.all(), None))
426
def test_get_nodes_for_user_lists_visible_nodes(self):
427
"""get_nodes with perm=NODE_PERMISSION.VIEW lists the nodes a user
430
When run for a regular user it returns unowned nodes, and nodes
433
user = factory.make_user()
434
visible_nodes = [self.make_node(owner) for owner in [None, user]]
435
self.make_node(factory.make_user())
436
self.assertItemsEqual(
437
visible_nodes, Node.objects.get_nodes(user, NODE_PERMISSION.VIEW))
439
def test_get_nodes_admin_lists_all_nodes(self):
440
admin = factory.make_admin()
444
factory.make_admin(),
447
nodes = [self.make_node(owner) for owner in owners]
448
self.assertItemsEqual(
449
nodes, Node.objects.get_nodes(admin, NODE_PERMISSION.VIEW))
451
def test_get_nodes_filters_by_id(self):
452
user = factory.make_user()
453
nodes = [self.make_node(user) for counter in range(5)]
454
ids = [node.system_id for node in nodes]
455
wanted_slice = slice(0, 3)
456
self.assertItemsEqual(
458
Node.objects.get_nodes(
459
user, NODE_PERMISSION.VIEW, ids=ids[wanted_slice]))
461
def test_get_nodes_with_edit_perm_for_user_lists_owned_nodes(self):
462
user = factory.make_user()
463
visible_node = self.make_node(user)
465
self.make_node(factory.make_user())
466
self.assertItemsEqual(
468
Node.objects.get_nodes(user, NODE_PERMISSION.EDIT))
470
def test_get_nodes_with_edit_perm_admin_lists_all_nodes(self):
471
admin = factory.make_admin()
475
factory.make_admin(),
478
nodes = [self.make_node(owner) for owner in owners]
479
self.assertItemsEqual(
480
nodes, Node.objects.get_nodes(admin, NODE_PERMISSION.EDIT))
482
def test_get_nodes_with_admin_perm_returns_empty_list_for_user(self):
483
user = factory.make_user()
484
[self.make_node(user) for counter in range(5)]
485
self.assertItemsEqual(
487
Node.objects.get_nodes(user, NODE_PERMISSION.ADMIN))
489
def test_get_nodes_with_admin_perm_returns_all_nodes_for_admin(self):
490
user = factory.make_user()
491
nodes = [self.make_node(user) for counter in range(5)]
492
self.assertItemsEqual(
494
Node.objects.get_nodes(
495
factory.make_admin(), NODE_PERMISSION.ADMIN))
497
def test_get_visible_node_or_404_ok(self):
498
"""get_node_or_404 fetches nodes by system_id."""
499
user = factory.make_user()
500
node = self.make_node(user)
503
Node.objects.get_node_or_404(
504
node.system_id, user, NODE_PERMISSION.VIEW))
506
def test_get_visible_node_or_404_raises_PermissionDenied(self):
507
"""get_node_or_404 raises PermissionDenied if the provided
508
user has not the right permission on the returned node."""
509
user_node = self.make_node(factory.make_user())
512
Node.objects.get_node_or_404,
513
user_node.system_id, factory.make_user(), NODE_PERMISSION.VIEW)
515
def test_get_available_node_for_acquisition_finds_available_node(self):
516
user = factory.make_user()
517
node = self.make_node(None)
519
node, Node.objects.get_available_node_for_acquisition(user))
521
def test_get_available_node_for_acquisition_returns_none_if_empty(self):
522
user = factory.make_user()
524
None, Node.objects.get_available_node_for_acquisition(user))
526
def test_get_available_node_for_acquisition_ignores_taken_nodes(self):
527
user = factory.make_user()
528
available_status = NODE_STATUS.READY
529
unavailable_statuses = (
530
set(NODE_STATUS_CHOICES_DICT) - set([available_status]))
531
for status in unavailable_statuses:
532
factory.make_node(status=status)
534
None, Node.objects.get_available_node_for_acquisition(user))
536
def test_get_available_node_for_acquisition_ignores_invisible_nodes(self):
537
user = factory.make_user()
538
node = self.make_node()
539
node.owner = factory.make_user()
542
None, Node.objects.get_available_node_for_acquisition(user))
544
def test_get_available_node_combines_constraint_with_availability(self):
545
user = factory.make_user()
546
node = self.make_node(factory.make_user())
549
Node.objects.get_available_node_for_acquisition(
550
user, {'name': node.system_id}))
552
def test_get_available_node_constrains_by_name(self):
553
user = factory.make_user()
554
nodes = [self.make_node() for counter in range(3)]
557
Node.objects.get_available_node_for_acquisition(
558
user, {'name': nodes[1].hostname}))
560
def test_get_available_node_returns_None_if_name_is_unknown(self):
561
user = factory.make_user()
564
Node.objects.get_available_node_for_acquisition(
565
user, {'name': factory.getRandomString()}))
567
def test_stop_nodes_stops_nodes(self):
568
user = factory.make_user()
569
node = self.make_node(user)
570
output = Node.objects.stop_nodes([node.system_id], user)
572
self.assertItemsEqual([node], output)
573
power_status = get_provisioning_api_proxy().power_status
574
self.assertEqual('stop', power_status[node.system_id])
576
def test_stop_nodes_ignores_uneditable_nodes(self):
577
nodes = [self.make_node(factory.make_user()) for counter in range(3)]
578
ids = [node.system_id for node in nodes]
579
stoppable_node = nodes[0]
580
self.assertItemsEqual(
582
Node.objects.stop_nodes(ids, stoppable_node.owner))
584
def test_start_nodes_starts_nodes(self):
585
user = factory.make_user()
586
node = self.make_node(user)
587
output = Node.objects.start_nodes([node.system_id], user)
589
self.assertItemsEqual([node], output)
590
power_status = get_provisioning_api_proxy().power_status
591
self.assertEqual('start', power_status[node.system_id])
593
def test_start_nodes_sets_commissioning_profile(self):
594
# Starting up a node should always set a profile. Here we test
595
# that a commissioning profile was set for nodes in the
596
# commissioning status.
597
user = factory.make_user()
598
node = factory.make_node(
599
set_hostname=True, status=NODE_STATUS.COMMISSIONING, owner=user)
600
output = Node.objects.start_nodes([node.system_id], user)
602
self.assertItemsEqual([node], output)
603
profile = get_provisioning_api_proxy().nodes[node.system_id]['profile']
604
self.assertEqual('maas-precise-i386-commissioning', profile)
606
def test_start_nodes_doesnt_set_commissioning_profile(self):
607
# Starting up a node should always set a profile. Complement the
608
# above test to show that a different profile can be set.
609
user = factory.make_user()
610
node = self.make_node(user)
611
output = Node.objects.start_nodes([node.system_id], user)
613
self.assertItemsEqual([node], output)
614
profile = get_provisioning_api_proxy().nodes[node.system_id]['profile']
615
self.assertEqual('maas-precise-i386', profile)
617
def test_start_nodes_ignores_uneditable_nodes(self):
618
nodes = [self.make_node(factory.make_user()) for counter in range(3)]
619
ids = [node.system_id for node in nodes]
620
startable_node = nodes[0]
621
self.assertItemsEqual(
623
Node.objects.start_nodes(ids, startable_node.owner))
625
def test_start_nodes_stores_user_data(self):
626
node = factory.make_node(owner=factory.make_user())
627
user_data = self.make_user_data()
628
Node.objects.start_nodes(
629
[node.system_id], node.owner, user_data=user_data)
630
self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
632
def test_start_nodes_does_not_store_user_data_for_uneditable_nodes(self):
633
node = factory.make_node(owner=factory.make_user())
634
original_user_data = self.make_user_data()
635
NodeUserData.objects.set_user_data(node, original_user_data)
636
Node.objects.start_nodes(
637
[node.system_id], factory.make_user(),
638
user_data=self.make_user_data())
640
original_user_data, NodeUserData.objects.get_user_data(node))
642
def test_start_nodes_without_user_data_clears_existing_data(self):
643
node = factory.make_node(owner=factory.make_user())
644
user_data = self.make_user_data()
645
NodeUserData.objects.set_user_data(node, user_data)
646
Node.objects.start_nodes([node.system_id], node.owner, user_data=None)
648
NodeUserData.DoesNotExist,
649
NodeUserData.objects.get_user_data, node)
651
def test_start_nodes_with_user_data_overwrites_existing_data(self):
652
node = factory.make_node(owner=factory.make_user())
653
NodeUserData.objects.set_user_data(node, self.make_user_data())
654
user_data = self.make_user_data()
655
Node.objects.start_nodes(
656
[node.system_id], node.owner, user_data=user_data)
657
self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
660
class MACAddressTest(TestCase):
662
def make_MAC(self, address):
663
"""Create a MAC address."""
664
node = factory.make_node()
665
return MACAddress(mac_address=address, node=node)
667
def test_stores_to_database(self):
668
mac = self.make_MAC('00:11:22:33:44:55')
670
self.assertEqual([mac], list(MACAddress.objects.all()))
672
def test_invalid_address_raises_validation_error(self):
673
mac = self.make_MAC('aa:bb:ccxdd:ee:ff')
674
self.assertRaises(ValidationError, mac.full_clean)
677
class AuthTokensTest(TestCase):
678
"""Test creation and retrieval of auth tokens."""
680
def assertTokenValid(self, token):
681
self.assertIsInstance(token.key, basestring)
682
self.assertEqual(KEY_SIZE, len(token.key))
683
self.assertIsInstance(token.secret, basestring)
684
self.assertEqual(SECRET_SIZE, len(token.secret))
686
def assertConsumerValid(self, consumer):
687
self.assertIsInstance(consumer.key, basestring)
688
self.assertEqual(KEY_SIZE, len(consumer.key))
689
self.assertEqual('', consumer.secret)
691
def test_create_auth_token(self):
692
user = factory.make_user()
693
token = create_auth_token(user)
694
self.assertEqual(user, token.user)
695
self.assertEqual(user, token.consumer.user)
696
self.assertTrue(token.is_approved)
697
self.assertConsumerValid(token.consumer)
698
self.assertTokenValid(token)
700
def test_get_auth_tokens_finds_tokens_for_user(self):
701
user = factory.make_user()
702
token = create_auth_token(user)
703
self.assertIn(token, get_auth_tokens(user))
705
def test_get_auth_tokens_ignores_other_users(self):
706
user, other_user = factory.make_user(), factory.make_user()
707
unrelated_token = create_auth_token(other_user)
708
self.assertNotIn(unrelated_token, get_auth_tokens(user))
710
def test_get_auth_tokens_ignores_unapproved_tokens(self):
711
user = factory.make_user()
712
token = create_auth_token(user)
713
token.is_approved = False
715
self.assertNotIn(token, get_auth_tokens(user))
718
class UserProfileTest(TestCase):
720
def test_profile_creation(self):
721
# A profile is created each time a user is created.
722
user = factory.make_user()
723
self.assertIsInstance(user.get_profile(), UserProfile)
724
self.assertEqual(user, user.get_profile().user)
726
def test_consumer_creation(self):
727
# A generic consumer is created each time a user is created.
728
user = factory.make_user()
729
consumers = Consumer.objects.filter(user=user, name=GENERIC_CONSUMER)
730
self.assertEqual([user], [consumer.user for consumer in consumers])
732
def test_token_creation(self):
733
# A token is created each time a user is created.
734
user = factory.make_user()
735
[token] = get_auth_tokens(user)
736
self.assertEqual(user, token.user)
738
def test_create_authorisation_token(self):
739
# UserProfile.create_authorisation_token calls create_auth_token.
740
user = factory.make_user()
741
profile = user.get_profile()
742
consumer, token = profile.create_authorisation_token()
743
self.assertEqual(user, token.user)
744
self.assertEqual(user, consumer.user)
746
def test_get_authorisation_tokens(self):
747
# UserProfile.get_authorisation_tokens calls get_auth_tokens.
748
user = factory.make_user()
749
consumer, token = user.get_profile().create_authorisation_token()
750
self.assertIn(token, user.get_profile().get_authorisation_tokens())
752
def test_delete(self):
753
# Deleting a profile also deletes the related user.
754
profile = factory.make_user().get_profile()
755
profile_id = profile.id
756
user_id = profile.user.id
757
self.assertTrue(User.objects.filter(id=user_id).exists())
759
UserProfile.objects.filter(id=profile_id).exists())
761
self.assertFalse(User.objects.filter(id=user_id).exists())
763
UserProfile.objects.filter(id=profile_id).exists())
765
def test_delete_consumers_tokens(self):
766
# Deleting a profile deletes the related tokens and consumers.
767
profile = factory.make_user().get_profile()
771
token, consumer = profile.create_authorisation_token()
772
token_ids.append(token.id)
773
consumer_ids.append(consumer.id)
775
self.assertFalse(Consumer.objects.filter(id__in=consumer_ids).exists())
776
self.assertFalse(Token.objects.filter(id__in=token_ids).exists())
778
def test_delete_attached_nodes(self):
779
# Cannot delete a user with nodes attached to it.
780
profile = factory.make_user().get_profile()
781
factory.make_node(owner=profile.user)
782
self.assertRaises(CannotDeleteUserException, profile.delete)
784
def test_manager_all_users(self):
785
users = set(factory.make_user() for i in range(3))
786
all_users = set(UserProfile.objects.all_users())
787
self.assertEqual(users, all_users)
789
def test_manager_all_users_no_system_user(self):
793
user.username for user in UserProfile.objects.all_users())
794
self.assertTrue(set(SYSTEM_USERS).isdisjoint(usernames))
797
class SSHKeyValidatorTest(TestCase):
799
def test_validates_rsa_public_key(self):
800
key_string = get_data('data/test_rsa0.pub')
801
validate_ssh_public_key(key_string)
802
# No ValidationError.
804
def test_validates_dsa_public_key(self):
805
key_string = get_data('data/test_dsa.pub')
806
validate_ssh_public_key(key_string)
807
# No ValidationError.
809
def test_does_not_validate_random_data(self):
810
key_string = factory.getRandomString()
812
ValidationError, validate_ssh_public_key, key_string)
814
def test_does_not_validate_wrongly_padded_data(self):
815
key_string = 'ssh-dss %s %s@%s' % (
816
factory.getRandomString(), factory.getRandomString(),
817
factory.getRandomString())
819
ValidationError, validate_ssh_public_key, key_string)
821
def test_does_not_validate_rsa_private_key(self):
822
key_string = get_data('data/test_rsa')
824
ValidationError, validate_ssh_public_key, key_string)
826
def test_does_not_validate_dsa_private_key(self):
827
key_string = get_data('data/test_dsa')
829
ValidationError, validate_ssh_public_key, key_string)
832
class GetHTMLDisplayForKeyTest(TestCase):
833
"""Testing for the method `get_html_display_for_key`."""
835
def make_comment(self, length):
836
"""Create a comment of the desired length.
838
The comment may contain spaces, but not begin or end in them. It
839
will be of the desired length both before and after stripping.
842
factory.getRandomString(1),
843
factory.getRandomString(max([length - 2, 0]), spaces=True),
844
factory.getRandomString(1),
847
def make_key(self, type_len=7, key_len=360, comment_len=None):
848
"""Produce a fake ssh public key containing arbitrary data.
850
:param type_len: The length of the "key type" field. (Default is
851
sized for the real-life "ssh-rsa").
852
:param key_len: Length of the key text. (With a roughly realistic
854
:param comment_len: Length of the comment field. The comment may
855
contain spaces. Leave it None to omit the comment.
856
:return: A string representing the combined key-file contents.
859
factory.getRandomString(type_len),
860
factory.getRandomString(key_len),
862
if comment_len is not None:
863
fields.append(self.make_comment(comment_len))
864
return " ".join(fields)
866
def test_display_returns_unchanged_if_unknown_and_small(self):
867
# If the key does not look like a normal key (with three parts
868
# separated by spaces, it's returned unchanged if its size is <=
870
size = random.randint(101, 200)
871
key = factory.getRandomString(size - 100)
872
display = get_html_display_for_key(key, size)
873
self.assertTrue(len(display) < size)
874
self.assertEqual(key, display)
876
def test_display_returns_cropped_if_unknown_and_large(self):
877
# If the key does not look like a normal key (with three parts
878
# separated by spaces, it's returned cropped if its size is >
880
size = random.randint(20, 100) # size cannot be < len(HELLIPSIS).
881
key = factory.getRandomString(size + 1)
882
display = get_html_display_for_key(key, size)
883
self.assertEqual(size, len(display))
885
'%.*s%s' % (size - len(HELLIPSIS), key, HELLIPSIS), display)
887
def test_display_escapes_commentless_key_for_html(self):
888
# The key's comment may contain characters that are not safe for
889
# including in HTML, and so get_html_display_for_key escapes the
891
# There are several code paths in get_html_display_for_key; this
892
# test is for the case where the key has no comment, and is
893
# brief enough to fit into the allotted space.
895
"<type> <text>",
896
get_html_display_for_key("<type> <text>", 100))
898
def test_display_escapes_short_key_for_html(self):
899
# The key's comment may contain characters that are not safe for
900
# including in HTML, and so get_html_display_for_key escapes the
902
# There are several code paths in get_html_display_for_key; this
903
# test is for the case where the whole key is short enough to
904
# fit completely into the output.
905
key = "<type> <text> <comment>"
906
display = get_html_display_for_key(key, 100)
907
# This also verifies that the entire key fits into the string.
908
# Otherwise we might accidentally get one of the other cases.
909
self.assertThat(display, EndsWith("<comment>"))
910
# And of course the check also implies that the text is
912
self.assertNotIn("<", display)
913
self.assertNotIn(">", display)
915
def test_display_escapes_long_key_for_html(self):
916
# The key's comment may contain characters that are not safe for
917
# including in HTML, and so get_html_display_for_key escapes the
919
# There are several code paths in get_html_display_for_key; this
920
# test is for the case where the comment is short enough to fit
921
# completely into the output.
922
key = "<type> %s <comment>" % ("<&>" * 50)
923
display = get_html_display_for_key(key, 50)
924
# This verifies that we are indeed getting an abbreviated
925
# display. Otherwise we might accidentally get one of the other
927
self.assertIn("…", display)
928
self.assertIn("comment", display)
929
# And now, on to checking that the text is HTML-safe.
930
self.assertNotIn("<", display)
931
self.assertNotIn(">", display)
932
self.assertThat(display, EndsWith("<comment>"))
934
def test_display_limits_size_with_large_comment(self):
935
# If the key has a large 'comment' part, the key is simply
936
# cropped and HELLIPSIS appended to it.
937
key = self.make_key(10, 10, 100)
938
display = get_html_display_for_key(key, 50)
939
self.assertEqual(50, len(display))
941
'%.*s%s' % (50 - len(HELLIPSIS), key, HELLIPSIS), display)
943
def test_display_limits_size_with_large_key_type(self):
944
# If the key has a large 'key_type' part, the key is simply
945
# cropped and HELLIPSIS appended to it.
946
key = self.make_key(100, 10, 10)
947
display = get_html_display_for_key(key, 50)
948
self.assertEqual(50, len(display))
950
'%.*s%s' % (50 - len(HELLIPSIS), key, HELLIPSIS), display)
952
def test_display_cropped_key(self):
953
# If the key has a small key_type, a small comment and a large
954
# key_string (which is the 'normal' case), the key_string part
958
key = self.make_key(type_len, 100, comment_len)
959
key_type, key_string, comment = key.split(' ', 2)
960
display = get_html_display_for_key(key, 50)
961
self.assertEqual(50, len(display))
965
50 - (type_len + len(HELLIPSIS) + comment_len + 2),
966
key_string, HELLIPSIS, comment),
970
class SSHKeyTest(TestCase):
971
"""Testing for the :class:`SSHKey`."""
973
def test_sshkey_validation_with_valid_key(self):
974
key_string = get_data('data/test_rsa0.pub')
975
user = factory.make_user()
976
key = SSHKey(key=key_string, user=user)
978
# No ValidationError.
980
def test_sshkey_validation_fails_if_key_is_invalid(self):
981
key_string = factory.getRandomString()
982
user = factory.make_user()
983
key = SSHKey(key=key_string, user=user)
985
ValidationError, key.full_clean)
987
def test_sshkey_display_with_real_life_key(self):
988
# With a real-life ssh-rsa key, the key_string part is cropped.
989
key_string = get_data('data/test_rsa0.pub')
990
user = factory.make_user()
991
key = SSHKey(key=key_string, user=user)
992
display = key.display_html()
994
'ssh-rsa AAAAB3NzaC1yc2E… ubuntu@server-7476', display)
996
def test_sshkey_display_is_marked_as_HTML_safe(self):
997
key_string = get_data('data/test_rsa0.pub')
998
user = factory.make_user()
999
key = SSHKey(key=key_string, user=user)
1000
display = key.display_html()
1001
self.assertIsInstance(display, SafeUnicode)
1003
def test_sshkey_user_and_key_unique_together(self):
1004
key_string = get_data('data/test_rsa0.pub')
1005
user = factory.make_user()
1006
key = SSHKey(key=key_string, user=user)
1008
key2 = SSHKey(key=key_string, user=user)
1010
ValidationError, key2.full_clean)
1012
def test_sshkey_user_and_key_unique_together_db_level(self):
1013
key_string = get_data('data/test_rsa0.pub')
1014
user = factory.make_user()
1015
key = SSHKey(key=key_string, user=user)
1017
key2 = SSHKey(key=key_string, user=user)
1019
IntegrityError, key2.save, skip_check=True)
1021
def test_sshkey_same_key_can_be_used_by_different_users(self):
1022
key_string = get_data('data/test_rsa0.pub')
1023
user = factory.make_user()
1024
key = SSHKey(key=key_string, user=user)
1026
user2 = factory.make_user()
1027
key2 = SSHKey(key=key_string, user=user2)
1029
# No ValidationError.
1032
class SSHKeyManagerTest(TestCase):
1033
"""Testing for the :class:`SSHKeyManager` model manager."""
1035
def test_get_keys_for_user_no_keys(self):
1036
user = factory.make_user()
1037
keys = SSHKey.objects.get_keys_for_user(user)
1038
self.assertItemsEqual([], keys)
1040
def test_get_keys_for_user_with_keys(self):
1041
user1, created_keys = factory.make_user_with_keys(
1042
n_keys=3, username='user1')
1044
factory.make_user_with_keys(n_keys=2)
1045
keys = SSHKey.objects.get_keys_for_user(user1)
1046
self.assertItemsEqual([key.key for key in created_keys], keys)
1049
class FileStorageTest(TestCase):
1050
"""Testing of the :class:`FileStorage` model."""
1052
def make_upload_dir(self):
1053
"""Create the upload directory, and arrange for eventual deletion.
1055
The directory must not already exist. If it does, this method will
1056
fail rather than arrange for deletion of a directory that may
1057
contain meaningful data.
1059
:return: Absolute path to the `FileStorage` upload directory. This
1060
is the directory where the actual files are stored.
1062
media_root = settings.MEDIA_ROOT
1063
self.assertFalse(os.path.exists(media_root), "See media/README")
1064
self.addCleanup(shutil.rmtree, media_root, ignore_errors=True)
1065
os.mkdir(media_root)
1066
upload_dir = os.path.join(media_root, FileStorage.upload_dir)
1067
os.mkdir(upload_dir)
1070
def get_media_path(self, filename):
1071
"""Get the path to a given stored file, relative to MEDIA_ROOT."""
1072
return os.path.join(FileStorage.upload_dir, filename)
1074
def make_data(self, including_text='data'):
1075
"""Return arbitrary data.
1077
:param including_text: Text to include in the data. Leave something
1078
here to make failure messages more recognizable.
1079
:type including_text: basestring
1080
:return: A string of bytes, including `including_text`.
1083
# Note that this won't automatically insert any non-ASCII bytes.
1084
# Proper handling of real binary data is tested separately.
1085
text = "%s %s" % (including_text, factory.getRandomString())
1086
return text.encode('ascii')
1088
def age_file(self, path, seconds=None):
1089
"""Make the file at `path` look like it hasn't been touched recently.
1091
Decrements the file's mtime by a bit over a day.
1094
seconds = FileStorage.objects.grace_time + 1
1095
stat_result = os.stat(path)
1096
atime = stat_result.st_atime
1097
mtime = stat_result.st_mtime
1098
os.utime(path, (atime, mtime - seconds))
1100
def test_get_existing_storage_returns_None_if_none_found(self):
1101
nonexistent_file = factory.getRandomString()
1103
FileStorage.objects.get_existing_storage(nonexistent_file))
1105
def test_get_existing_storage_finds_FileStorage(self):
1106
self.make_upload_dir()
1107
storage = factory.make_file_storage()
1110
FileStorage.objects.get_existing_storage(storage.filename))
1112
def test_save_file_creates_storage(self):
1113
self.make_upload_dir()
1114
filename = factory.getRandomString()
1115
data = self.make_data()
1116
storage = FileStorage.objects.save_file(filename, BytesIO(data))
1119
(storage.filename, storage.data.read()))
1121
def test_storage_can_be_retrieved(self):
1122
self.make_upload_dir()
1123
filename = factory.getRandomString()
1124
data = self.make_data()
1125
factory.make_file_storage(filename=filename, data=data)
1126
storage = FileStorage.objects.get(filename=filename)
1129
(storage.filename, storage.data.read()))
1131
def test_stores_binary_data(self):
1132
self.make_upload_dir()
1134
# This horrible binary data could never, ever, under any
1135
# encoding known to man be interpreted as text(1). Switch the
1136
# bytes of the byte-order mark around and by design you get an
1137
# invalid codepoint; put a byte with the high bit set between bytes
1138
# that have it cleared, and you have a guaranteed non-UTF-8
1141
# (1) Provided, of course, that man know only about ASCII and
1143
binary_data = codecs.BOM64_LE + codecs.BOM64_BE + b'\x00\xff\x00'
1145
# And yet, because FileStorage supports binary data, it comes
1147
storage = factory.make_file_storage(filename="x", data=binary_data)
1148
self.assertEqual(binary_data, storage.data.read())
1150
def test_overwrites_file(self):
1151
# If a file of the same name has already been stored, the
1152
# reference to the old data gets overwritten with one to the new
1153
# data. They are actually different files on the filesystem.
1154
self.make_upload_dir()
1155
filename = 'filename-%s' % factory.getRandomString()
1156
old_storage = factory.make_file_storage(
1157
filename=filename, data=self.make_data('old data'))
1158
new_data = self.make_data('new-data')
1159
new_storage = factory.make_file_storage(
1160
filename=filename, data=new_data)
1161
self.assertNotEqual(old_storage.data.name, new_storage.data.name)
1163
new_data, FileStorage.objects.get(filename=filename).data.read())
1165
def test_list_stored_files_lists_files(self):
1166
upload_dir = self.make_upload_dir()
1167
filename = factory.getRandomString()
1168
with open(os.path.join(upload_dir, filename), 'w') as f:
1169
f.write(self.make_data())
1171
self.get_media_path(filename),
1172
FileStorage.objects.list_stored_files())
1174
def test_list_stored_files_includes_referenced_files(self):
1175
self.make_upload_dir()
1176
storage = factory.make_file_storage()
1178
storage.data.name, FileStorage.objects.list_stored_files())
1180
def test_list_referenced_files_lists_FileStorage_files(self):
1181
self.make_upload_dir()
1182
storage = factory.make_file_storage()
1184
storage.data.name, FileStorage.objects.list_referenced_files())
1186
def test_list_referenced_files_excludes_unreferenced_files(self):
1187
upload_dir = self.make_upload_dir()
1188
filename = factory.getRandomString()
1189
with open(os.path.join(upload_dir, filename), 'w') as f:
1190
f.write(self.make_data())
1192
self.get_media_path(filename),
1193
FileStorage.objects.list_referenced_files())
1195
def test_list_referenced_files_uses_file_name_not_FileStorage_name(self):
1196
self.make_upload_dir()
1197
filename = factory.getRandomString()
1198
# The filename we're going to use is already taken. The file
1199
# we'll be looking at will have to have a different name.
1200
factory.make_file_storage(filename=filename)
1201
storage = factory.make_file_storage(filename=filename)
1202
# It's the name of the file, not the FileStorage.filename, that
1203
# is in list_referenced_files.
1205
storage.data.name, FileStorage.objects.list_referenced_files())
1207
def test_is_old_returns_False_for_recent_file(self):
1208
upload_dir = self.make_upload_dir()
1209
filename = factory.getRandomString()
1210
path = os.path.join(upload_dir, filename)
1211
with open(path, 'w') as f:
1212
f.write(self.make_data())
1213
self.age_file(path, FileStorage.objects.grace_time - 60)
1215
FileStorage.objects.is_old(self.get_media_path(filename)))
1217
def test_is_old_returns_True_for_old_file(self):
1218
upload_dir = self.make_upload_dir()
1219
filename = factory.getRandomString()
1220
path = os.path.join(upload_dir, filename)
1221
with open(path, 'w') as f:
1222
f.write(self.make_data())
1223
self.age_file(path, FileStorage.objects.grace_time + 1)
1225
FileStorage.objects.is_old(self.get_media_path(filename)))
1227
def test_collect_garbage_deletes_garbage(self):
1228
upload_dir = self.make_upload_dir()
1229
filename = factory.getRandomString()
1230
path = os.path.join(upload_dir, filename)
1231
with open(path, 'w') as f:
1232
f.write(self.make_data())
1234
FileStorage.objects.collect_garbage()
1236
FileStorage.storage.exists(self.get_media_path(filename)))
1238
def test_grace_time_is_generous_but_not_unlimited(self):
1239
# Grace time for garbage collection is long enough that it won't
1240
# expire while the request that wrote it is still being handled.
1241
# But it won't keep a file around for ages. For instance, it'll
1242
# be more than 20 seconds, but less than a day.
1243
self.assertThat(FileStorage.objects.grace_time, GreaterThan(20))
1244
self.assertThat(FileStorage.objects.grace_time, LessThan(24 * 60 * 60))
1246
def test_collect_garbage_leaves_recent_files_alone(self):
1247
upload_dir = self.make_upload_dir()
1248
filename = factory.getRandomString()
1249
with open(os.path.join(upload_dir, filename), 'w') as f:
1250
f.write(self.make_data())
1251
FileStorage.objects.collect_garbage()
1253
FileStorage.storage.exists(self.get_media_path(filename)))
1255
def test_collect_garbage_leaves_referenced_files_alone(self):
1256
self.make_upload_dir()
1257
storage = factory.make_file_storage()
1258
self.age_file(storage.data.path)
1259
FileStorage.objects.collect_garbage()
1260
self.assertTrue(FileStorage.storage.exists(storage.data.name))
1262
def test_collect_garbage_tolerates_missing_upload_dir(self):
1263
# When MAAS is freshly installed, the upload directory is still
1265
FileStorage.objects.collect_garbage()
1266
# ...we get through garbage collection without breakage.
1270
class ConfigDefaultTest(TestCase, TestWithFixtures):
1271
"""Test config default values."""
1273
def test_default_config_maas_name(self):
1274
default_config = get_default_config()
1275
self.assertEqual(gethostname(), default_config['maas_name'])
1279
"""A utility class which tracks the calls to its 'call' method and
1280
stores the arguments given to 'call' in 'self.calls'.
1286
def call(self, *args, **kwargs):
1287
self.calls.append([args, kwargs])
1290
class ConfigTest(TestCase):
1291
"""Testing of the :class:`Config` model and its related manager class."""
1293
def test_manager_get_config_found(self):
1294
Config.objects.create(name='name', value='config')
1295
config = Config.objects.get_config('name')
1296
self.assertEqual('config', config)
1298
def test_manager_get_config_not_found(self):
1299
config = Config.objects.get_config('name', 'default value')
1300
self.assertEqual('default value', config)
1302
def test_manager_get_config_not_found_none(self):
1303
config = Config.objects.get_config('name')
1304
self.assertIsNone(config)
1306
def test_manager_get_config_not_found_in_default_config(self):
1307
name = factory.getRandomString()
1308
value = factory.getRandomString()
1309
DEFAULT_CONFIG[name] = value
1310
config = Config.objects.get_config(name, None)
1311
self.assertEqual(value, config)
1313
def test_default_config_cannot_be_changed(self):
1314
name = factory.getRandomString()
1315
DEFAULT_CONFIG[name] = {'key': 'value'}
1316
config = Config.objects.get_config(name)
1317
config.update({'key2': 'value2'})
1319
self.assertEqual({'key': 'value'}, Config.objects.get_config(name))
1321
def test_manager_get_config_list_returns_config_list(self):
1322
Config.objects.create(name='name', value='config1')
1323
Config.objects.create(name='name', value='config2')
1324
config_list = Config.objects.get_config_list('name')
1325
self.assertItemsEqual(['config1', 'config2'], config_list)
1327
def test_manager_set_config_creates_config(self):
1328
Config.objects.set_config('name', 'config1')
1329
Config.objects.set_config('name', 'config2')
1330
self.assertSequenceEqual(
1332
[config.value for config in Config.objects.filter(name='name')])
1334
def test_manager_config_changed_connect_connects(self):
1335
listener = Listener()
1336
name = factory.getRandomString()
1337
value = factory.getRandomString()
1338
Config.objects.config_changed_connect(name, listener.call)
1339
Config.objects.set_config(name, value)
1340
config = Config.objects.get(name=name)
1342
self.assertEqual(1, len(listener.calls))
1343
self.assertEqual((Config, config, True), listener.calls[0][0])
1345
def test_manager_config_changed_connect_connects_multiple(self):
1346
listener = Listener()
1347
listener2 = Listener()
1348
name = factory.getRandomString()
1349
value = factory.getRandomString()
1350
Config.objects.config_changed_connect(name, listener.call)
1351
Config.objects.config_changed_connect(name, listener2.call)
1352
Config.objects.set_config(name, value)
1354
self.assertEqual(1, len(listener.calls))
1355
self.assertEqual(1, len(listener2.calls))
1357
def test_manager_config_changed_connect_connects_multiple_same(self):
1358
# If the same method is connected twice, it will only get called
1360
listener = Listener()
1361
name = factory.getRandomString()
1362
value = factory.getRandomString()
1363
Config.objects.config_changed_connect(name, listener.call)
1364
Config.objects.config_changed_connect(name, listener.call)
1365
Config.objects.set_config(name, value)
1367
self.assertEqual(1, len(listener.calls))
1369
def test_manager_config_changed_connect_connects_by_config_name(self):
1370
listener = Listener()
1371
name = factory.getRandomString()
1372
value = factory.getRandomString()
1373
Config.objects.config_changed_connect(name, listener.call)
1374
another_name = factory.getRandomString()
1375
Config.objects.set_config(another_name, value)
1377
self.assertEqual(0, len(listener.calls))