~lutostag/ubuntu/utopic/maas/1.5.2

« back to all changes in this revision

Viewing changes to src/maasserver/tests/test_models.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez
  • Date: 2012-03-07 12:46:17 UTC
  • mto: (20.1.1 quantal) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 4.
  • Revision ID: package-import@ubuntu.com-20120307124617-tdctc6l9bur4f2ci
Tags: upstream-0.1+bzr232+dfsg
ImportĀ upstreamĀ versionĀ 0.1+bzr232+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
__all__ = []
13
13
 
14
14
import codecs
 
15
from io import BytesIO
15
16
import os
16
17
import shutil
17
18
 
18
19
from django.conf import settings
19
20
from django.contrib.auth.models import User
20
21
from django.core.exceptions import ValidationError
 
22
from fixtures import (
 
23
    EnvironmentVariableFixture,
 
24
    TestWithFixtures,
 
25
    )
21
26
from maasserver.exceptions import (
22
27
    CannotDeleteUserException,
23
28
    PermissionDenied,
26
31
    Config,
27
32
    create_auth_token,
28
33
    DEFAULT_CONFIG,
 
34
    FileStorage,
29
35
    GENERIC_CONSUMER,
30
36
    get_auth_tokens,
 
37
    get_default_config,
31
38
    MACAddress,
32
39
    Node,
33
40
    NODE_STATUS,
37
44
    )
38
45
from maasserver.testing import TestCase
39
46
from maasserver.testing.factory import factory
 
47
from metadataserver.models import NodeUserData
40
48
from piston.models import (
41
49
    Consumer,
42
50
    KEY_SIZE,
43
51
    SECRET_SIZE,
44
52
    Token,
45
53
    )
 
54
from testtools.matchers import (
 
55
    GreaterThan,
 
56
    LessThan,
 
57
    )
46
58
 
47
59
 
48
60
class NodeTest(TestCase):
84
96
        self.assertEqual(user, node.owner)
85
97
        self.assertEqual(NODE_STATUS.ALLOCATED, node.status)
86
98
 
 
99
    def test_release(self):
 
100
        node = factory.make_node(
 
101
            status=NODE_STATUS.ALLOCATED, owner=factory.make_user())
 
102
        node.release()
 
103
        self.assertEqual((NODE_STATUS.READY, None), (node.status, node.owner))
 
104
 
87
105
 
88
106
class NodeManagerTest(TestCase):
89
107
 
95
113
            status = NODE_STATUS.ALLOCATED
96
114
        return factory.make_node(set_hostname=True, status=status, owner=user)
97
115
 
 
116
    def make_user_data(self):
 
117
        """Create a blob of arbitrary user-data."""
 
118
        return factory.getRandomString().encode('ascii')
 
119
 
98
120
    def test_filter_by_ids_filters_nodes_by_ids(self):
99
121
        nodes = [factory.make_node() for counter in range(5)]
100
122
        ids = [node.system_id for node in nodes]
204
226
        user = factory.make_user()
205
227
        available_status = NODE_STATUS.READY
206
228
        unavailable_statuses = (
207
 
            set(NODE_STATUS_CHOICES_DICT.keys()) - set([available_status]))
 
229
            set(NODE_STATUS_CHOICES_DICT) - set([available_status]))
208
230
        for status in unavailable_statuses:
209
231
            factory.make_node(status=status)
210
232
        self.assertEqual(
254
276
            [startable_node],
255
277
            Node.objects.start_nodes(ids, startable_node.owner))
256
278
 
 
279
    def test_start_nodes_stores_user_data(self):
 
280
        node = factory.make_node(owner=factory.make_user())
 
281
        user_data = self.make_user_data()
 
282
        Node.objects.start_nodes(
 
283
            [node.system_id], node.owner, user_data=user_data)
 
284
        self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
 
285
 
 
286
    def test_start_nodes_does_not_store_user_data_for_uneditable_nodes(self):
 
287
        node = factory.make_node(owner=factory.make_user())
 
288
        original_user_data = self.make_user_data()
 
289
        NodeUserData.objects.set_user_data(node, original_user_data)
 
290
        Node.objects.start_nodes(
 
291
            [node.system_id], factory.make_user(),
 
292
            user_data=self.make_user_data())
 
293
        self.assertEqual(
 
294
            original_user_data, NodeUserData.objects.get_user_data(node))
 
295
 
 
296
    def test_start_nodes_without_user_data_leaves_existing_data_alone(self):
 
297
        node = factory.make_node(owner=factory.make_user())
 
298
        user_data = self.make_user_data()
 
299
        NodeUserData.objects.set_user_data(node, user_data)
 
300
        Node.objects.start_nodes([node.system_id], node.owner, user_data=None)
 
301
        self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
 
302
 
 
303
    def test_start_nodes_with_user_data_overwrites_existing_data(self):
 
304
        node = factory.make_node(owner=factory.make_user())
 
305
        NodeUserData.objects.set_user_data(node, self.make_user_data())
 
306
        user_data = self.make_user_data()
 
307
        Node.objects.start_nodes(
 
308
            [node.system_id], node.owner, user_data=user_data)
 
309
        self.assertEqual(user_data, NodeUserData.objects.get_user_data(node))
 
310
 
257
311
 
258
312
class MACAddressTest(TestCase):
259
313
 
366
420
        profile = factory.make_user().get_profile()
367
421
        token_ids = []
368
422
        consumer_ids = []
369
 
        for i in xrange(3):
 
423
        for i in range(3):
370
424
            token, consumer = profile.create_authorisation_token()
371
425
            token_ids.append(token.id)
372
426
            consumer_ids.append(consumer.id)
381
435
        self.assertRaises(CannotDeleteUserException, profile.delete)
382
436
 
383
437
    def test_manager_all_users(self):
384
 
        users = set(factory.make_user() for i in xrange(3))
 
438
        users = set(factory.make_user() for i in range(3))
385
439
        all_users = set(UserProfile.objects.all_users())
386
440
        self.assertEqual(users, all_users)
387
441
 
398
452
 
399
453
    FILEPATH = settings.MEDIA_ROOT
400
454
 
401
 
    def setUp(self):
402
 
        super(FileStorageTest, self).setUp()
 
455
    def make_upload_dir(self):
 
456
        """Create the upload directory, and arrange for eventual deletion.
 
457
 
 
458
        The directory must not already exist.  If it does, this method will
 
459
        fail rather than arrange for deletion of a directory that may
 
460
        contain meaningful data.
 
461
 
 
462
        :return: Absolute path to the `FileStorage` upload directory.  This
 
463
            is the directory where the actual files are stored.
 
464
        """
 
465
        # These will blow up if either directory already exists.  Which
 
466
        # is brittle, but probably for the best since we ruthlessly
 
467
        # delete them on cleanup!
403
468
        os.mkdir(self.FILEPATH)
 
469
        upload_dir = os.path.join(self.FILEPATH, FileStorage.upload_dir)
 
470
        os.mkdir(upload_dir)
404
471
        self.addCleanup(shutil.rmtree, self.FILEPATH)
405
 
 
406
 
    def test_creation(self):
407
 
        storage = factory.make_file_storage(filename="myfile", data=b"mydata")
408
 
        expected = ["myfile", "mydata"]
409
 
        actual = [storage.filename, storage.data.read()]
410
 
        self.assertEqual(expected, actual)
411
 
 
412
 
    def test_creation_writes_a_file(self):
413
 
        # The development settings say to write a file starting at
414
 
        # /var/tmp/maas, so check one is actually written there.  The field
415
 
        # itself is hard-coded to make a directory called "storage".
416
 
        factory.make_file_storage(filename="myfile", data=b"mydata")
417
 
 
418
 
        expected_filename = os.path.join(
419
 
            self.FILEPATH, "storage", "myfile")
420
 
 
421
 
        with open(expected_filename) as f:
422
 
            self.assertEqual("mydata", f.read())
 
472
        return upload_dir
 
473
 
 
474
    def get_media_path(self, filename):
 
475
        """Get the path to a given stored file, relative to MEDIA_ROOT."""
 
476
        return os.path.join(FileStorage.upload_dir, filename)
 
477
 
 
478
    def make_data(self, including_text='data'):
 
479
        """Return arbitrary data.
 
480
 
 
481
        :param including_text: Text to include in the data.  Leave something
 
482
            here to make failure messages more recognizable.
 
483
        :type including_text: basestring
 
484
        :return: A string of bytes, including `including_text`.
 
485
        :rtype: bytes
 
486
        """
 
487
        # Note that this won't automatically insert any non-ASCII bytes.
 
488
        # Proper handling of real binary data is tested separately.
 
489
        text = "%s %s" % (including_text, factory.getRandomString())
 
490
        return text.encode('ascii')
 
491
 
 
492
    def age_file(self, path, seconds=None):
 
493
        """Make the file at `path` look like it hasn't been touched recently.
 
494
 
 
495
        Decrements the file's mtime by a bit over a day.
 
496
        """
 
497
        if seconds is None:
 
498
            seconds = FileStorage.objects.grace_time + 1
 
499
        stat_result = os.stat(path)
 
500
        atime = stat_result.st_atime
 
501
        mtime = stat_result.st_mtime
 
502
        os.utime(path, (atime, mtime - seconds))
 
503
 
 
504
    def test_get_existing_storage_returns_None_if_none_found(self):
 
505
        nonexistent_file = factory.getRandomString()
 
506
        self.assertIsNone(
 
507
            FileStorage.objects.get_existing_storage(nonexistent_file))
 
508
 
 
509
    def test_get_existing_storage_finds_FileStorage(self):
 
510
        self.make_upload_dir()
 
511
        storage = factory.make_file_storage()
 
512
        self.assertEqual(
 
513
            storage,
 
514
            FileStorage.objects.get_existing_storage(storage.filename))
 
515
 
 
516
    def test_save_file_creates_storage(self):
 
517
        self.make_upload_dir()
 
518
        filename = factory.getRandomString()
 
519
        data = self.make_data()
 
520
        storage = FileStorage.objects.save_file(filename, BytesIO(data))
 
521
        self.assertEqual(
 
522
            (filename, data),
 
523
            (storage.filename, storage.data.read()))
 
524
 
 
525
    def test_storage_can_be_retrieved(self):
 
526
        self.make_upload_dir()
 
527
        filename = factory.getRandomString()
 
528
        data = self.make_data()
 
529
        factory.make_file_storage(filename=filename, data=data)
 
530
        storage = FileStorage.objects.get(filename=filename)
 
531
        self.assertEqual(
 
532
            (filename, data),
 
533
            (storage.filename, storage.data.read()))
423
534
 
424
535
    def test_stores_binary_data(self):
 
536
        self.make_upload_dir()
 
537
 
425
538
        # This horrible binary data could never, ever, under any
426
 
        # encoding known to man be intepreted as text.  Switch the bytes
427
 
        # of the byte-order mark around and by design you get an invalid
428
 
        # codepoint; put a byte with the high bit set between bytes that
429
 
        # have it cleared, and you have a guaranteed non-UTF-8 sequence.
 
539
        # encoding known to man be interpreted as text(1).  Switch the
 
540
        # bytes of the byte-order mark around and by design you get an
 
541
        # invalid codepoint; put a byte with the high bit set between bytes
 
542
        # that have it cleared, and you have a guaranteed non-UTF-8
 
543
        # sequence.
 
544
        #
 
545
        # (1) Provided, of course, that man know only about ASCII and
 
546
        # UTF.
430
547
        binary_data = codecs.BOM64_LE + codecs.BOM64_BE + b'\x00\xff\x00'
 
548
 
431
549
        # And yet, because FileStorage supports binary data, it comes
432
550
        # out intact.
433
551
        storage = factory.make_file_storage(filename="x", data=binary_data)
434
552
        self.assertEqual(binary_data, storage.data.read())
435
553
 
 
554
    def test_overwrites_file(self):
 
555
        # If a file of the same name has already been stored, the
 
556
        # reference to the old data gets overwritten with one to the new
 
557
        # data.  They are actually different files on the filesystem.
 
558
        self.make_upload_dir()
 
559
        filename = 'filename-%s' % factory.getRandomString()
 
560
        old_storage = factory.make_file_storage(
 
561
            filename=filename, data=self.make_data('old data'))
 
562
        new_data = self.make_data('new-data')
 
563
        new_storage = factory.make_file_storage(
 
564
            filename=filename, data=new_data)
 
565
        self.assertNotEqual(old_storage.data.name, new_storage.data.name)
 
566
        self.assertEqual(
 
567
            new_data, FileStorage.objects.get(filename=filename).data.read())
 
568
 
 
569
    def test_list_stored_files_lists_files(self):
 
570
        upload_dir = self.make_upload_dir()
 
571
        filename = factory.getRandomString()
 
572
        with open(os.path.join(upload_dir, filename), 'w') as f:
 
573
            f.write(self.make_data())
 
574
        self.assertIn(
 
575
            self.get_media_path(filename),
 
576
            FileStorage.objects.list_stored_files())
 
577
 
 
578
    def test_list_stored_files_includes_referenced_files(self):
 
579
        self.make_upload_dir()
 
580
        storage = factory.make_file_storage()
 
581
        self.assertIn(
 
582
            storage.data.name, FileStorage.objects.list_stored_files())
 
583
 
 
584
    def test_list_referenced_files_lists_FileStorage_files(self):
 
585
        self.make_upload_dir()
 
586
        storage = factory.make_file_storage()
 
587
        self.assertIn(
 
588
            storage.data.name, FileStorage.objects.list_referenced_files())
 
589
 
 
590
    def test_list_referenced_files_excludes_unreferenced_files(self):
 
591
        upload_dir = self.make_upload_dir()
 
592
        filename = factory.getRandomString()
 
593
        with open(os.path.join(upload_dir, filename), 'w') as f:
 
594
            f.write(self.make_data())
 
595
        self.assertNotIn(
 
596
            self.get_media_path(filename),
 
597
            FileStorage.objects.list_referenced_files())
 
598
 
 
599
    def test_list_referenced_files_uses_file_name_not_FileStorage_name(self):
 
600
        self.make_upload_dir()
 
601
        filename = factory.getRandomString()
 
602
        # The filename we're going to use is already taken.  The file
 
603
        # we'll be looking at will have to have a different name.
 
604
        factory.make_file_storage(filename=filename)
 
605
        storage = factory.make_file_storage(filename=filename)
 
606
        # It's the name of the file, not the FileStorage.filename, that
 
607
        # is in list_referenced_files.
 
608
        self.assertIn(
 
609
            storage.data.name, FileStorage.objects.list_referenced_files())
 
610
 
 
611
    def test_is_old_returns_False_for_recent_file(self):
 
612
        upload_dir = self.make_upload_dir()
 
613
        filename = factory.getRandomString()
 
614
        path = os.path.join(upload_dir, filename)
 
615
        with open(path, 'w') as f:
 
616
            f.write(self.make_data())
 
617
        self.age_file(path, FileStorage.objects.grace_time - 60)
 
618
        self.assertFalse(
 
619
            FileStorage.objects.is_old(self.get_media_path(filename)))
 
620
 
 
621
    def test_is_old_returns_True_for_old_file(self):
 
622
        upload_dir = self.make_upload_dir()
 
623
        filename = factory.getRandomString()
 
624
        path = os.path.join(upload_dir, filename)
 
625
        with open(path, 'w') as f:
 
626
            f.write(self.make_data())
 
627
        self.age_file(path, FileStorage.objects.grace_time + 1)
 
628
        self.assertTrue(
 
629
            FileStorage.objects.is_old(self.get_media_path(filename)))
 
630
 
 
631
    def test_collect_garbage_deletes_garbage(self):
 
632
        upload_dir = self.make_upload_dir()
 
633
        filename = factory.getRandomString()
 
634
        path = os.path.join(upload_dir, filename)
 
635
        with open(path, 'w') as f:
 
636
            f.write(self.make_data())
 
637
        self.age_file(path)
 
638
        FileStorage.objects.collect_garbage()
 
639
        self.assertFalse(
 
640
            FileStorage.storage.exists(self.get_media_path(filename)))
 
641
 
 
642
    def test_grace_time_is_generous_but_not_unlimited(self):
 
643
        # Grace time for garbage collection is long enough that it won't
 
644
        # expire while the request that wrote it is still being handled.
 
645
        # But it won't keep a file around for ages.  For instance, it'll
 
646
        # be more than 20 seconds, but less than a day.
 
647
        self.assertThat(FileStorage.objects.grace_time, GreaterThan(20))
 
648
        self.assertThat(FileStorage.objects.grace_time, LessThan(24 * 60 * 60))
 
649
 
 
650
    def test_collect_garbage_leaves_recent_files_alone(self):
 
651
        upload_dir = self.make_upload_dir()
 
652
        filename = factory.getRandomString()
 
653
        with open(os.path.join(upload_dir, filename), 'w') as f:
 
654
            f.write(self.make_data())
 
655
        FileStorage.objects.collect_garbage()
 
656
        self.assertTrue(
 
657
            FileStorage.storage.exists(self.get_media_path(filename)))
 
658
 
 
659
    def test_collect_garbage_leaves_referenced_files_alone(self):
 
660
        self.make_upload_dir()
 
661
        storage = factory.make_file_storage()
 
662
        self.age_file(storage.data.path)
 
663
        FileStorage.objects.collect_garbage()
 
664
        self.assertTrue(FileStorage.storage.exists(storage.data.name))
 
665
 
 
666
    def test_collect_garbage_tolerates_missing_upload_dir(self):
 
667
        # When MaaS is freshly installed, the upload directory is still
 
668
        # missing.  But...
 
669
        FileStorage.objects.collect_garbage()
 
670
        # ...we get through garbage collection without breakage.
 
671
        pass
 
672
 
 
673
 
 
674
class ConfigDefaultTest(TestCase, TestWithFixtures):
 
675
    """Test config default values."""
 
676
 
 
677
    def test_default_config_maas_name(self):
 
678
        name = factory.getRandomString()
 
679
        fixture = EnvironmentVariableFixture('LOGNAME', name)
 
680
        self.useFixture(fixture)
 
681
        default_config = get_default_config()
 
682
        self.assertEqual(
 
683
            "%s's" % name.capitalize(), default_config['maas_name'])
 
684
 
436
685
 
437
686
class ConfigTest(TestCase):
438
687
    """Testing of the :class:`Config` model."""
457
706
        config = Config.objects.get_config(name, None)
458
707
        self.assertEqual(value, config)
459
708
 
 
709
    def test_default_config_cannot_be_changed(self):
 
710
        name = factory.getRandomString()
 
711
        DEFAULT_CONFIG[name] = {'key': 'value'}
 
712
        config = Config.objects.get_config(name)
 
713
        config.update({'key2': 'value2'})
 
714
 
 
715
        self.assertEqual({'key': 'value'}, Config.objects.get_config(name))
 
716
 
460
717
    def test_manager_get_config_list_returns_config_list(self):
461
718
        Config.objects.create(name='name', value='config1')
462
719
        Config.objects.create(name='name', value='config2')