~martin-nowack/ubuntu/utopic/maas/bug-1425837

« back to all changes in this revision

Viewing changes to src/maasserver/tests/test_forms.py

  • Committer: Package Import Robot
  • Author(s): Julian Edwards, Julian Edwards, Andres Rodriguez
  • Date: 2014-08-21 18:38:27 UTC
  • mfrom: (1.2.34)
  • Revision ID: package-import@ubuntu.com-20140821183827-9xyb5u2o4l8g3zxj
Tags: 1.6.1+bzr2550-0ubuntu1
* New upstream bugfix release:
  - Auto-link node MACs to Networks (LP: #1341619)

[ Julian Edwards ]
* debian/maas-region-controller.postinst: Don't restart RabbitMQ on
  upgrades, just ensure it's running.  Should prevent a race with the
  cluster celery restarting.
* debian/rules: Pull upstream branch from the right place.

[ Andres Rodriguez ]
* debian/maas-region-controller.postinst: Ensure cluster celery is
  started if it also runs on the region.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
__metaclass__ = type
15
15
__all__ = []
16
16
 
 
17
from cStringIO import StringIO
17
18
import json
 
19
import random
18
20
 
19
21
from django import forms
20
22
from django.conf import settings
 
23
from django.contrib import messages
21
24
from django.contrib.auth.models import User
22
25
from django.core.exceptions import ValidationError
23
 
from django.core.files.uploadedfile import SimpleUploadedFile
 
26
from django.core.files.uploadedfile import (
 
27
    InMemoryUploadedFile,
 
28
    SimpleUploadedFile,
 
29
    )
24
30
from django.core.validators import validate_email
25
31
from django.http import QueryDict
26
32
from maasserver.clusterrpc.power_parameters import get_power_type_choices
29
35
    NODEGROUP_STATUS,
30
36
    NODEGROUPINTERFACE_MANAGEMENT,
31
37
    )
 
38
from maasserver.exceptions import NodeActionError
32
39
from maasserver.forms import (
33
40
    AdminNodeForm,
34
41
    AdminNodeWithMACAddressesForm,
35
42
    BLANK_CHOICE,
 
43
    BootSourceForm,
 
44
    BootSourceSelectionForm,
36
45
    BulkNodeActionForm,
37
46
    CommissioningForm,
38
47
    CommissioningScriptForm,
39
48
    ConfigForm,
 
49
    DeployForm,
40
50
    DownloadProgressForm,
41
51
    EditUserForm,
 
52
    ERROR_MESSAGE_STATIC_IPS_OUTSIDE_RANGE,
 
53
    ERROR_MESSAGE_STATIC_RANGE_IN_USE,
42
54
    get_action_form,
43
55
    get_node_create_form,
44
56
    get_node_edit_form,
45
57
    initialize_node_group,
46
58
    InstanceListField,
47
59
    INTERFACES_VALIDATION_ERROR_MESSAGE,
 
60
    LicenseKeyForm,
48
61
    list_all_usable_architectures,
49
62
    MACAddressForm,
 
63
    MAX_MESSAGES,
 
64
    merge_error_messages,
50
65
    NetworkForm,
51
66
    NewUserCreationForm,
52
67
    NO_ARCHITECTURES_AVAILABLE,
53
68
    NodeActionForm,
54
69
    NodeForm,
 
70
    NodeGroupDefineForm,
55
71
    NodeGroupEdit,
56
72
    NodeGroupInterfaceForeignDHCPForm,
57
73
    NodeGroupInterfaceForm,
58
 
    NodeGroupWithInterfacesForm,
59
74
    NodeWithMACAddressesForm,
60
75
    pick_default_architecture,
61
76
    ProfileForm,
62
77
    remove_None_values,
63
78
    SetZoneBulkAction,
64
79
    UnconstrainedMultipleChoiceField,
 
80
    validate_new_static_ip_ranges,
65
81
    validate_nonoverlapping_networks,
66
82
    ValidatorMultipleChoiceField,
67
83
    ZoneForm,
68
84
    )
69
85
from maasserver.models import (
70
86
    Config,
 
87
    LicenseKey,
71
88
    MACAddress,
72
89
    Network,
73
90
    Node,
76
93
    Zone,
77
94
    )
78
95
from maasserver.models.config import DEFAULT_CONFIG
 
96
from maasserver.models.network import get_name_and_vlan_from_cluster_interface
 
97
from maasserver.models.staticipaddress import StaticIPAddress
79
98
from maasserver.node_action import (
80
99
    Commission,
81
100
    Delete,
89
108
    patch_usable_architectures,
90
109
    )
91
110
from maasserver.testing.factory import factory
 
111
from maasserver.testing.osystems import (
 
112
    make_osystem_with_releases,
 
113
    make_usable_osystem,
 
114
    patch_usable_osystems,
 
115
    )
92
116
from maasserver.testing.testcase import MAASServerTestCase
93
117
from maasserver.utils import map_enum
94
118
from maasserver.utils.forms import compose_invalid_choice_text
 
119
from maastesting.utils import sample_binary_data
95
120
from metadataserver.models import CommissioningScript
96
 
from netaddr import IPNetwork
 
121
from netaddr import (
 
122
    IPAddress,
 
123
    IPNetwork,
 
124
    IPRange,
 
125
    )
97
126
from provisioningserver import tasks
 
127
from provisioningserver.drivers.osystem import OperatingSystemRegistry
 
128
from provisioningserver.drivers.osystem.ubuntu import UbuntuOS
98
129
from testtools import TestCase
99
130
from testtools.matchers import (
100
131
    AllMatch,
101
132
    Contains,
102
133
    Equals,
 
134
    HasLength,
103
135
    MatchesAll,
104
136
    MatchesRegex,
105
137
    MatchesStructure,
109
141
 
110
142
class TestHelpers(MAASServerTestCase):
111
143
 
112
 
    def make_usable_boot_images(self, nodegroup=None, arch=None,
113
 
                                subarchitecture=None):
 
144
    def make_usable_boot_images(self, nodegroup=None, osystem=None,
 
145
                                arch=None, subarchitecture=None, release=None):
114
146
        """Create a set of boot images, so the architecture becomes "usable".
115
147
 
116
148
        This will make the images' architecture show up in the list of usable
120
152
        """
121
153
        if nodegroup is None:
122
154
            nodegroup = factory.make_node_group()
 
155
        if osystem is None:
 
156
            osystem = factory.make_name('os')
123
157
        if arch is None:
124
158
            arch = factory.make_name('arch')
125
159
        if subarchitecture is None:
126
160
            subarchitecture = factory.make_name('subarch')
 
161
        if release is None:
 
162
            release = factory.make_name('release')
127
163
        for purpose in ['install', 'commissioning']:
128
164
            factory.make_boot_image(
129
 
                nodegroup=nodegroup, architecture=arch,
130
 
                subarchitecture=subarchitecture, purpose=purpose)
 
165
                nodegroup=nodegroup, osystem=osystem, architecture=arch,
 
166
                subarchitecture=subarchitecture, release=release,
 
167
                purpose=purpose)
131
168
 
132
169
    def test_initialize_node_group_leaves_nodegroup_reference_intact(self):
133
170
        preselected_nodegroup = factory.make_node_group()
274
311
        self.assertFalse(form.is_valid())
275
312
        self.assertEqual(['mac_addresses'], list(form.errors))
276
313
        self.assertEqual(
277
 
            ['Enter a valid MAC address (e.g. AA:BB:CC:DD:EE:FF).'],
 
314
            ["'invalid' is not a valid MAC address."],
278
315
            form.errors['mac_addresses'])
279
316
 
280
317
    def test_NodeWithMACAddressesForm_multiple_invalid(self):
287
324
        self.assertFalse(form.is_valid())
288
325
        self.assertEqual(['mac_addresses'], list(form.errors))
289
326
        self.assertEqual(
290
 
            ['One or more MAC addresses is invalid.'],
 
327
            [
 
328
                "One or more MAC addresses is invalid. "
 
329
                "('invalid_1' is not a valid MAC address. \u2014"
 
330
                " 'invalid_2' is not a valid MAC address.)"
 
331
            ],
291
332
            form.errors['mac_addresses'])
292
333
 
293
334
    def test_NodeWithMACAddressesForm_empty(self):
415
456
            [
416
457
                'hostname',
417
458
                'architecture',
 
459
                'osystem',
418
460
                'distro_series',
 
461
                'license_key',
419
462
                'nodegroup',
420
463
            ], list(form.fields))
421
464
 
474
517
            [NO_ARCHITECTURES_AVAILABLE],
475
518
            form.errors['architecture'])
476
519
 
 
520
    def test_accepts_osystem(self):
 
521
        osystem = make_usable_osystem(self)
 
522
        form = NodeForm(data={
 
523
            'hostname': factory.make_name('host'),
 
524
            'architecture': make_usable_architecture(self),
 
525
            'osystem': osystem.name,
 
526
            })
 
527
        self.assertTrue(form.is_valid(), form._errors)
 
528
 
 
529
    def test_rejects_invalid_osystem(self):
 
530
        patch_usable_osystems(self)
 
531
        form = NodeForm(data={
 
532
            'hostname': factory.make_name('host'),
 
533
            'architecture': make_usable_architecture(self),
 
534
            'osystem': factory.make_name('os'),
 
535
            })
 
536
        self.assertFalse(form.is_valid())
 
537
        self.assertItemsEqual(['osystem'], form._errors.keys())
 
538
 
 
539
    def test_starts_with_default_osystem(self):
 
540
        osystems = [make_osystem_with_releases(self) for _ in range(5)]
 
541
        patch_usable_osystems(self, osystems)
 
542
        form = NodeForm()
 
543
        self.assertEqual(
 
544
            '',
 
545
            form.fields['osystem'].initial)
 
546
 
 
547
    def test_accepts_osystem_distro_series(self):
 
548
        osystem = make_usable_osystem(self)
 
549
        release = osystem.get_default_release()
 
550
        form = NodeForm(data={
 
551
            'hostname': factory.make_name('host'),
 
552
            'architecture': make_usable_architecture(self),
 
553
            'osystem': osystem.name,
 
554
            'distro_series': '%s/%s' % (osystem.name, release),
 
555
            })
 
556
        self.assertTrue(form.is_valid(), form._errors)
 
557
 
 
558
    def test_rejects_invalid_osystem_distro_series(self):
 
559
        osystem = make_usable_osystem(self)
 
560
        release = factory.make_name('release')
 
561
        form = NodeForm(data={
 
562
            'hostname': factory.make_name('host'),
 
563
            'architecture': make_usable_architecture(self),
 
564
            'osystem': osystem.name,
 
565
            'distro_series': '%s/%s' % (osystem.name, release),
 
566
            })
 
567
        self.assertFalse(form.is_valid())
 
568
        self.assertItemsEqual(['distro_series'], form._errors.keys())
 
569
 
 
570
    def test_starts_with_default_distro_series(self):
 
571
        osystems = [make_osystem_with_releases(self) for _ in range(5)]
 
572
        patch_usable_osystems(self, osystems)
 
573
        form = NodeForm()
 
574
        self.assertEqual(
 
575
            '',
 
576
            form.fields['distro_series'].initial)
 
577
 
 
578
    def test_rejects_mismatch_osystem_distro_series(self):
 
579
        osystem = make_usable_osystem(self)
 
580
        release = osystem.get_default_release()
 
581
        invalid = factory.make_name('invalid_os')
 
582
        form = NodeForm(data={
 
583
            'hostname': factory.make_name('host'),
 
584
            'architecture': make_usable_architecture(self),
 
585
            'osystem': osystem.name,
 
586
            'distro_series': '%s/%s' % (invalid, release),
 
587
            })
 
588
        self.assertFalse(form.is_valid())
 
589
        self.assertItemsEqual(['distro_series'], form._errors.keys())
 
590
 
 
591
    def test_rejects_missing_license_key(self):
 
592
        osystem = make_usable_osystem(self)
 
593
        release = osystem.get_default_release()
 
594
        self.patch(osystem, 'requires_license_key').return_value = True
 
595
        mock_validate = self.patch(osystem, 'validate_license_key')
 
596
        mock_validate.return_value = True
 
597
        form = NodeForm(data={
 
598
            'hostname': factory.make_name('host'),
 
599
            'architecture': make_usable_architecture(self),
 
600
            'osystem': osystem.name,
 
601
            'distro_series': '%s/%s*' % (osystem.name, release),
 
602
            })
 
603
        self.assertFalse(form.is_valid())
 
604
        self.assertItemsEqual(['license_key'], form._errors.keys())
 
605
 
 
606
    def test_calls_validate_license_key(self):
 
607
        osystem = make_usable_osystem(self)
 
608
        release = osystem.get_default_release()
 
609
        self.patch(osystem, 'requires_license_key').return_value = True
 
610
        mock_validate = self.patch(osystem, 'validate_license_key')
 
611
        mock_validate.return_value = True
 
612
        form = NodeForm(data={
 
613
            'hostname': factory.make_name('host'),
 
614
            'architecture': make_usable_architecture(self),
 
615
            'osystem': osystem.name,
 
616
            'distro_series': '%s/%s*' % (osystem.name, release),
 
617
            'license_key': factory.getRandomString(),
 
618
            })
 
619
        self.assertTrue(form.is_valid())
 
620
        mock_validate.assert_called_once()
 
621
 
477
622
 
478
623
class TestAdminNodeForm(MAASServerTestCase):
479
624
 
485
630
            [
486
631
                'hostname',
487
632
                'architecture',
 
633
                'osystem',
488
634
                'distro_series',
 
635
                'license_key',
489
636
                'power_type',
490
637
                'power_parameters',
491
638
                'cpu_count',
671
818
        self.assertIn(
672
819
            "is not one of the available choices.", form._errors['action'][0])
673
820
 
 
821
    def test_shows_error_message_for_NodeActionError(self):
 
822
        error_text = factory.getRandomString(prefix="NodeActionError")
 
823
        exc = NodeActionError(error_text)
 
824
        self.patch(StartNode, "execute").side_effect = exc
 
825
        user = factory.make_user()
 
826
        node = factory.make_node(status=NODE_STATUS.READY, owner=user)
 
827
        action = StartNode.name
 
828
        # Required for messages to work:
 
829
        request = factory.make_fake_request("/fake")
 
830
        form = get_action_form(user, request)(
 
831
            node, {NodeActionForm.input_name: action})
 
832
        form.save()
 
833
        [observed] = messages.get_messages(form.request)
 
834
        expected = (messages.ERROR, error_text, '')
 
835
        self.assertEqual(expected, observed)
 
836
 
674
837
 
675
838
class TestUniqueEmailForms(MAASServerTestCase):
676
839
 
782
945
            list(form.fields))
783
946
 
784
947
 
 
948
class TestMergeErrorMessages(MAASServerTestCase):
 
949
 
 
950
    def test_merge_error_messages_returns_summary_message(self):
 
951
        summary = factory.make_name('summary')
 
952
        errors = [factory.make_name('error') for _ in range(2)]
 
953
        result = merge_error_messages(summary, errors, 5)
 
954
        self.assertEqual(
 
955
            "%s (%s)" % (summary, ' \u2014 '.join(errors)), result)
 
956
 
 
957
    def test_merge_error_messages_includes_limited_number_of_msgs(self):
 
958
        summary = factory.make_name('summary')
 
959
        errors = [
 
960
            factory.make_name('error')
 
961
            for _ in range(MAX_MESSAGES + 2)]
 
962
        result = merge_error_messages(summary, errors)
 
963
        self.assertEqual(
 
964
            "%s (%s and 2 more errors)" % (
 
965
                summary, ' \u2014 '.join(errors[:MAX_MESSAGES])),
 
966
            result)
 
967
 
 
968
    def test_merge_error_messages_with_one_more_error(self):
 
969
        summary = factory.make_name('summary')
 
970
        errors = [
 
971
            factory.make_name('error')
 
972
            for _ in range(MAX_MESSAGES + 1)]
 
973
        result = merge_error_messages(summary, errors)
 
974
        self.assertEqual(
 
975
            "%s (%s and 1 more error)" % (
 
976
                summary, ' \u2014 '.join(errors[:MAX_MESSAGES])),
 
977
            result)
 
978
 
 
979
 
785
980
class TestMACAddressForm(MAASServerTestCase):
786
981
 
787
982
    def test_MACAddressForm_creates_mac_address(self):
827
1022
    if management is None:
828
1023
        management = factory.getRandomEnum(NODEGROUPINTERFACE_MANAGEMENT)
829
1024
    # Pick upper and lower boundaries of IP range, with upper > lower.
830
 
    ip_range_low, ip_range_high = factory.make_ip_range(network)
 
1025
    managed_ip_range = IPRange(*factory.make_ip_range(network))
 
1026
    if len(managed_ip_range) > 2:
 
1027
        dynamic_range = IPRange(
 
1028
            IPAddress(managed_ip_range.first),
 
1029
            IPAddress(managed_ip_range[len(managed_ip_range) // 2]))
 
1030
        static_range = IPRange(
 
1031
            IPAddress(managed_ip_range[(len(managed_ip_range) // 2) + 1]),
 
1032
            IPAddress(managed_ip_range.last))
 
1033
        static_low = unicode(IPAddress(static_range.first))
 
1034
        static_high = unicode(IPAddress(static_range.last))
 
1035
    else:
 
1036
        dynamic_range = managed_ip_range
 
1037
        static_low = ''
 
1038
        static_high = ''
831
1039
    return {
832
1040
        'ip': factory.getRandomIPInNetwork(network),
833
1041
        'interface': factory.make_name('interface'),
834
1042
        'subnet_mask': unicode(network.netmask),
835
1043
        'broadcast_ip': unicode(network.broadcast),
836
1044
        'router_ip': factory.getRandomIPInNetwork(network),
837
 
        'ip_range_low': unicode(ip_range_low),
838
 
        'ip_range_high': unicode(ip_range_high),
 
1045
        'ip_range_low': unicode(IPAddress(dynamic_range.first)),
 
1046
        'ip_range_high': unicode(IPAddress(dynamic_range.last)),
 
1047
        'static_ip_range_low': static_low,
 
1048
        'static_ip_range_high': static_high,
839
1049
        'management': management,
840
1050
    }
841
1051
 
842
1052
 
843
1053
nullable_fields = [
844
1054
    'subnet_mask', 'broadcast_ip', 'router_ip', 'ip_range_low',
845
 
    'ip_range_high']
 
1055
    'ip_range_high', 'static_ip_range_low', 'static_ip_range_high',
 
1056
    ]
 
1057
 
 
1058
 
 
1059
def make_ngi_instance(nodegroup=None):
 
1060
    """Create a `NodeGroupInterface` with nothing set but `nodegroup`.
 
1061
 
 
1062
    This is used by tests to instantiate the cluster interface form for
 
1063
    a given cluster.  We create an initial cluster interface object just
 
1064
    to tell it which cluster that is.
 
1065
    """
 
1066
    if nodegroup is None:
 
1067
        nodegroup = factory.make_node_group()
 
1068
    return NodeGroupInterface(nodegroup=nodegroup)
846
1069
 
847
1070
 
848
1071
class TestNodeGroupInterfaceForm(MAASServerTestCase):
849
1072
 
850
1073
    def test_NodeGroupInterfaceForm_validates_parameters(self):
851
 
        form = NodeGroupInterfaceForm(data={'ip': factory.getRandomString()})
 
1074
        form = NodeGroupInterfaceForm(
 
1075
            data={'ip': factory.getRandomString()},
 
1076
            instance=make_ngi_instance())
852
1077
        self.assertFalse(form.is_valid())
853
1078
        self.assertEquals(
854
1079
            {'ip': ['Enter a valid IPv4 or IPv6 address.']}, form._errors)
860
1085
            del int_settings[field_name]
861
1086
        nodegroup = factory.make_node_group()
862
1087
        form = NodeGroupInterfaceForm(
863
 
            data=int_settings,
864
 
            instance=NodeGroupInterface(nodegroup=nodegroup))
 
1088
            data=int_settings, instance=make_ngi_instance(nodegroup))
865
1089
        interface = form.save()
866
1090
        field_values = [
867
1091
            getattr(interface, field_name) for field_name in nullable_fields]
868
1092
        self.assertThat(field_values, AllMatch(Equals('')))
869
1093
 
 
1094
    def test_validates_new_static_ip_ranges(self):
 
1095
        network = IPNetwork("10.1.0.0/24")
 
1096
        nodegroup = factory.make_node_group(
 
1097
            status=NODEGROUP_STATUS.ACCEPTED,
 
1098
            management=NODEGROUPINTERFACE_MANAGEMENT.DHCP_AND_DNS,
 
1099
            network=network)
 
1100
        [interface] = nodegroup.get_managed_interfaces()
 
1101
        StaticIPAddress.objects.allocate_new(
 
1102
            interface.static_ip_range_low, interface.static_ip_range_high)
 
1103
        form = NodeGroupInterfaceForm(
 
1104
            data={'static_ip_range_low': '', 'static_ip_range_high': ''},
 
1105
            instance=interface)
 
1106
        self.assertFalse(form.is_valid())
 
1107
        self.assertEqual(
 
1108
            [ERROR_MESSAGE_STATIC_RANGE_IN_USE],
 
1109
            form._errors['static_ip_range_low'])
 
1110
        self.assertEqual(
 
1111
            [ERROR_MESSAGE_STATIC_RANGE_IN_USE],
 
1112
            form._errors['static_ip_range_high'])
 
1113
 
 
1114
 
 
1115
class TestNodeGroupInterfaceFormNetworkCreation(MAASServerTestCase):
 
1116
    """Tests for when NodeGroupInterfaceForm creates a Network."""
 
1117
 
 
1118
    def test_creates_network_name(self):
 
1119
        int_settings = factory.get_interface_fields()
 
1120
        int_settings['interface'] = 'eth0:1'
 
1121
        interface = make_ngi_instance()
 
1122
        form = NodeGroupInterfaceForm(data=int_settings, instance=interface)
 
1123
        form.save()
 
1124
        [network] = Network.objects.all()
 
1125
        expected, _ = get_name_and_vlan_from_cluster_interface(interface)
 
1126
        self.assertEqual(expected, network.name)
 
1127
 
 
1128
    def test_sets_vlan_tag(self):
 
1129
        int_settings = factory.get_interface_fields()
 
1130
        vlan_tag = random.randint(1, 10)
 
1131
        int_settings['interface'] = 'eth0.%s' % vlan_tag
 
1132
        interface = make_ngi_instance()
 
1133
        form = NodeGroupInterfaceForm(data=int_settings, instance=interface)
 
1134
        form.save()
 
1135
        [network] = Network.objects.all()
 
1136
        self.assertEqual(vlan_tag, network.vlan_tag)
 
1137
 
 
1138
    def test_vlan_tag_is_None_if_no_vlan(self):
 
1139
        int_settings = factory.get_interface_fields()
 
1140
        int_settings['interface'] = 'eth0:1'
 
1141
        interface = make_ngi_instance()
 
1142
        form = NodeGroupInterfaceForm(data=int_settings, instance=interface)
 
1143
        form.save()
 
1144
        [network] = Network.objects.all()
 
1145
        self.assertIs(None, network.vlan_tag)
 
1146
 
 
1147
    def test_sets_network_values(self):
 
1148
        int_settings = factory.get_interface_fields()
 
1149
        interface = make_ngi_instance()
 
1150
        form = NodeGroupInterfaceForm(data=int_settings, instance=interface)
 
1151
        form.save()
 
1152
        [network] = Network.objects.all()
 
1153
        expected_net_address = unicode(interface.network.network)
 
1154
        expected_netmask = unicode(interface.network.netmask)
 
1155
        self.assertThat(
 
1156
            network, MatchesStructure.byEquality(
 
1157
                ip=expected_net_address,
 
1158
                netmask=expected_netmask))
 
1159
 
 
1160
    def test_does_not_create_new_network_if_already_exists(self):
 
1161
        int_settings = factory.get_interface_fields()
 
1162
        interface = make_ngi_instance()
 
1163
        form = NodeGroupInterfaceForm(data=int_settings, instance=interface)
 
1164
        # The easiest way to pre-create the same network is just to save
 
1165
        # the form twice.
 
1166
        form.save()
 
1167
        [existing_network] = Network.objects.all()
 
1168
        form.save()
 
1169
        self.assertItemsEqual([existing_network], Network.objects.all())
 
1170
 
 
1171
    def test_creates_many_unique_networks(self):
 
1172
        names = ('eth0', 'eth0:1', 'eth0.1', 'eth0:1.2')
 
1173
        for name in names:
 
1174
            int_settings = factory.get_interface_fields()
 
1175
            int_settings['interface'] = name
 
1176
            interface = make_ngi_instance()
 
1177
            form = NodeGroupInterfaceForm(
 
1178
                data=int_settings, instance=interface)
 
1179
            form.save()
 
1180
 
 
1181
        self.assertEqual(len(names), len(Network.objects.all()))
 
1182
 
 
1183
 
 
1184
class TestValidateNewStaticIPRanges(MAASServerTestCase):
 
1185
    """Tests for `validate_new_static_ip_ranges`()."""
 
1186
 
 
1187
    def make_interface(self):
 
1188
        network = IPNetwork("10.1.0.0/24")
 
1189
        nodegroup = factory.make_node_group(
 
1190
            status=NODEGROUP_STATUS.ACCEPTED,
 
1191
            management=NODEGROUPINTERFACE_MANAGEMENT.DHCP_AND_DNS,
 
1192
            network=network)
 
1193
        [interface] = nodegroup.get_managed_interfaces()
 
1194
        interface.ip_range_low = '10.1.0.1'
 
1195
        interface.ip_range_high = '10.1.0.10'
 
1196
        interface.static_ip_range_low = '10.1.0.50'
 
1197
        interface.static_ip_range_high = '10.1.0.60'
 
1198
        interface.save()
 
1199
        return interface
 
1200
 
 
1201
    def test_raises_error_when_allocated_ips_fall_outside_new_range(self):
 
1202
        interface = self.make_interface()
 
1203
        StaticIPAddress.objects.allocate_new('10.1.0.56', '10.1.0.60')
 
1204
        error = self.assertRaises(
 
1205
            ValidationError,
 
1206
            validate_new_static_ip_ranges,
 
1207
            instance=interface,
 
1208
            static_ip_range_low='10.1.0.50',
 
1209
            static_ip_range_high='10.1.0.55')
 
1210
        self.assertEqual(
 
1211
            ERROR_MESSAGE_STATIC_IPS_OUTSIDE_RANGE,
 
1212
            error.message)
 
1213
 
 
1214
    def test_removing_static_range_raises_error_if_ips_allocated(self):
 
1215
        interface = self.make_interface()
 
1216
        StaticIPAddress.objects.allocate_new('10.1.0.56', '10.1.0.60')
 
1217
        error = self.assertRaises(
 
1218
            ValidationError,
 
1219
            validate_new_static_ip_ranges,
 
1220
            instance=interface,
 
1221
            static_ip_range_low='',
 
1222
            static_ip_range_high='')
 
1223
        self.assertEqual(
 
1224
            ERROR_MESSAGE_STATIC_RANGE_IN_USE,
 
1225
            error.message)
 
1226
 
 
1227
    def test_allows_range_expansion(self):
 
1228
        interface = self.make_interface()
 
1229
        StaticIPAddress.objects.allocate_new('10.1.0.56', '10.1.0.60')
 
1230
        is_valid = validate_new_static_ip_ranges(
 
1231
            interface, static_ip_range_low='10.1.0.40',
 
1232
            static_ip_range_high='10.1.0.100')
 
1233
        self.assertTrue(is_valid)
 
1234
 
 
1235
    def test_allows_allocated_ip_as_upper_bound(self):
 
1236
        interface = self.make_interface()
 
1237
        StaticIPAddress.objects.allocate_new('10.1.0.55', '10.1.0.55')
 
1238
        is_valid = validate_new_static_ip_ranges(
 
1239
            interface,
 
1240
            static_ip_range_low=interface.static_ip_range_low,
 
1241
            static_ip_range_high='10.1.0.55')
 
1242
        self.assertTrue(is_valid)
 
1243
 
 
1244
    def test_allows_allocated_ip_as_lower_bound(self):
 
1245
        interface = self.make_interface()
 
1246
        StaticIPAddress.objects.allocate_new('10.1.0.55', '10.1.0.55')
 
1247
        is_valid = validate_new_static_ip_ranges(
 
1248
            interface, static_ip_range_low='10.1.0.55',
 
1249
            static_ip_range_high=interface.static_ip_range_high)
 
1250
        self.assertTrue(is_valid)
 
1251
 
 
1252
    def test_ignores_unmanaged_interfaces(self):
 
1253
        interface = self.make_interface()
 
1254
        interface.management = NODEGROUPINTERFACE_MANAGEMENT.UNMANAGED
 
1255
        interface.save()
 
1256
        StaticIPAddress.objects.allocate_new(
 
1257
            interface.static_ip_range_low, interface.static_ip_range_high)
 
1258
        is_valid = validate_new_static_ip_ranges(
 
1259
            interface, static_ip_range_low='10.1.0.57',
 
1260
            static_ip_range_high='10.1.0.58')
 
1261
        self.assertTrue(is_valid)
 
1262
 
 
1263
    def test_ignores_interfaces_with_no_static_range(self):
 
1264
        interface = self.make_interface()
 
1265
        interface.static_ip_range_low = None
 
1266
        interface.static_ip_range_high = None
 
1267
        interface.save()
 
1268
        StaticIPAddress.objects.allocate_new('10.1.0.56', '10.1.0.60')
 
1269
        is_valid = validate_new_static_ip_ranges(
 
1270
            interface, static_ip_range_low='10.1.0.57',
 
1271
            static_ip_range_high='10.1.0.58')
 
1272
        self.assertTrue(is_valid)
 
1273
 
 
1274
    def test_ignores_unchanged_static_range(self):
 
1275
        interface = self.make_interface()
 
1276
        StaticIPAddress.objects.allocate_new(
 
1277
            interface.static_ip_range_low, interface.static_ip_range_high)
 
1278
        is_valid = validate_new_static_ip_ranges(
 
1279
            interface,
 
1280
            static_ip_range_low=interface.static_ip_range_low,
 
1281
            static_ip_range_high=interface.static_ip_range_high)
 
1282
        self.assertTrue(is_valid)
 
1283
 
870
1284
 
871
1285
class TestNodeGroupInterfaceForeignDHCPForm(MAASServerTestCase):
872
1286
 
873
1287
    def test_forms_saves_foreign_dhcp_ip(self):
874
1288
        nodegroup = factory.make_node_group()
875
 
        [interface] = nodegroup.get_managed_interfaces()
 
1289
        interface = factory.make_node_group_interface(nodegroup)
876
1290
        foreign_dhcp_ip = factory.getRandomIPAddress()
877
1291
        form = NodeGroupInterfaceForeignDHCPForm(
878
1292
            data={'foreign_dhcp_ip': foreign_dhcp_ip},
884
1298
 
885
1299
    def test_forms_validates_foreign_dhcp_ip(self):
886
1300
        nodegroup = factory.make_node_group()
887
 
        [interface] = nodegroup.get_managed_interfaces()
 
1301
        interface = factory.make_node_group_interface(nodegroup)
888
1302
        form = NodeGroupInterfaceForeignDHCPForm(
889
1303
            data={'foreign_dhcp_ip': 'invalid-ip'}, instance=interface)
890
1304
        self.assertFalse(form.is_valid())
892
1306
    def test_report_foreign_dhcp_does_not_trigger_update_signal(self):
893
1307
        self.patch(settings, "DHCP_CONNECT", False)
894
1308
        nodegroup = factory.make_node_group(status=NODEGROUP_STATUS.ACCEPTED)
895
 
        [interface] = nodegroup.get_managed_interfaces()
 
1309
        interface = factory.make_node_group_interface(
 
1310
            nodegroup, management=NODEGROUPINTERFACE_MANAGEMENT.DHCP)
896
1311
 
897
1312
        self.patch(settings, "DHCP_CONNECT", True)
898
1313
        self.patch(tasks, 'write_dhcp_config')
993
1408
        self.assertThat(error.messages[0], StartsWith("Conflicting networks"))
994
1409
 
995
1410
 
996
 
class TestNodeGroupWithInterfacesForm(MAASServerTestCase):
 
1411
class TestNodeGroupDefineForm(MAASServerTestCase):
997
1412
 
998
1413
    def test_creates_pending_nodegroup(self):
999
1414
        name = factory.make_name('name')
1000
1415
        uuid = factory.getRandomUUID()
1001
 
        form = NodeGroupWithInterfacesForm(
1002
 
            data={'name': name, 'uuid': uuid})
 
1416
        form = NodeGroupDefineForm(data={'name': name, 'uuid': uuid})
1003
1417
        self.assertTrue(form.is_valid(), form._errors)
1004
1418
        nodegroup = form.save()
1005
1419
        self.assertEqual(
1014
1428
    def test_creates_nodegroup_with_status(self):
1015
1429
        name = factory.make_name('name')
1016
1430
        uuid = factory.getRandomUUID()
1017
 
        form = NodeGroupWithInterfacesForm(
 
1431
        form = NodeGroupDefineForm(
1018
1432
            status=NODEGROUP_STATUS.ACCEPTED,
1019
1433
            data={'name': name, 'uuid': uuid})
1020
1434
        self.assertTrue(form.is_valid(), form._errors)
1024
1438
    def test_validates_parameters(self):
1025
1439
        name = factory.make_name('name')
1026
1440
        too_long_uuid = 'test' * 30
1027
 
        form = NodeGroupWithInterfacesForm(
 
1441
        form = NodeGroupDefineForm(
1028
1442
            data={'name': name, 'uuid': too_long_uuid})
1029
1443
        self.assertFalse(form.is_valid())
1030
1444
        self.assertEquals(
1036
1450
        name = factory.make_name('name')
1037
1451
        uuid = factory.getRandomUUID()
1038
1452
        invalid_interfaces = factory.make_name('invalid_json_interfaces')
1039
 
        form = NodeGroupWithInterfacesForm(
 
1453
        form = NodeGroupDefineForm(
1040
1454
            data={
1041
1455
                'name': name, 'uuid': uuid, 'interfaces': invalid_interfaces})
1042
1456
        self.assertFalse(form.is_valid())
1048
1462
        name = factory.make_name('name')
1049
1463
        uuid = factory.getRandomUUID()
1050
1464
        invalid_interfaces = json.dumps('invalid interface list')
1051
 
        form = NodeGroupWithInterfacesForm(
 
1465
        form = NodeGroupDefineForm(
1052
1466
            data={
1053
1467
                'name': name, 'uuid': uuid, 'interfaces': invalid_interfaces})
1054
1468
        self.assertFalse(form.is_valid())
1063
1477
        # Make the interface invalid.
1064
1478
        interface['ip_range_high'] = 'invalid IP address'
1065
1479
        interfaces = json.dumps([interface])
1066
 
        form = NodeGroupWithInterfacesForm(
 
1480
        form = NodeGroupDefineForm(
1067
1481
            data={'name': name, 'uuid': uuid, 'interfaces': interfaces})
1068
1482
        self.assertFalse(form.is_valid())
1069
1483
        self.assertIn(
1075
1489
        uuid = factory.getRandomUUID()
1076
1490
        interface = make_interface_settings()
1077
1491
        interfaces = json.dumps([interface])
1078
 
        form = NodeGroupWithInterfacesForm(
 
1492
        form = NodeGroupDefineForm(
1079
1493
            data={'name': name, 'uuid': uuid, 'interfaces': interfaces})
1080
1494
        self.assertTrue(form.is_valid(), form._errors)
1081
1495
        form.save()
1082
1496
        nodegroup = NodeGroup.objects.get(uuid=uuid)
 
1497
        # Replace empty strings with None as empty strings are converted into
 
1498
        # None for fields with null=True.
 
1499
        expected_result = {
 
1500
            key: (value if value != '' else None)
 
1501
            for key, value in interface.items()
 
1502
        }
1083
1503
        self.assertThat(
1084
1504
            nodegroup.nodegroupinterface_set.all()[0],
1085
 
            MatchesStructure.byEquality(**interface))
 
1505
            MatchesStructure.byEquality(**expected_result))
1086
1506
 
1087
1507
    def test_checks_against_conflicting_managed_networks(self):
1088
1508
        big_network = IPNetwork('10.0.0.0/255.255.0.0')
1089
1509
        nested_network = IPNetwork('10.0.100.0/255.255.255.0')
1090
1510
        managed = NODEGROUPINTERFACE_MANAGEMENT.DHCP
1091
 
        form = NodeGroupWithInterfacesForm(
 
1511
        form = NodeGroupDefineForm(
1092
1512
            data={
1093
1513
                'name': factory.make_name('cluster'),
1094
1514
                'uuid': factory.getRandomUUID(),
1110
1530
        nested_network = IPNetwork('10.100.100.0/255.255.255.0')
1111
1531
        managed = NODEGROUPINTERFACE_MANAGEMENT.DHCP
1112
1532
        unmanaged = NODEGROUPINTERFACE_MANAGEMENT.UNMANAGED
1113
 
        form = NodeGroupWithInterfacesForm(
 
1533
        form = NodeGroupDefineForm(
1114
1534
            data={
1115
1535
                'name': factory.make_name('cluster'),
1116
1536
                'uuid': factory.getRandomUUID(),
1133
1553
            make_interface_settings(management=management)
1134
1554
            for management in map_enum(NODEGROUPINTERFACE_MANAGEMENT).values()
1135
1555
            ]
1136
 
        form = NodeGroupWithInterfacesForm(
 
1556
        form = NodeGroupDefineForm(
1137
1557
            data={
1138
1558
                'name': name,
1139
1559
                'uuid': uuid,
1148
1568
    def test_populates_cluster_name_default(self):
1149
1569
        name = factory.make_name('name')
1150
1570
        uuid = factory.getRandomUUID()
1151
 
        form = NodeGroupWithInterfacesForm(
 
1571
        form = NodeGroupDefineForm(
1152
1572
            status=NODEGROUP_STATUS.ACCEPTED,
1153
1573
            data={'name': name, 'uuid': uuid})
1154
1574
        self.assertTrue(form.is_valid(), form._errors)
1158
1578
    def test_populates_cluster_name(self):
1159
1579
        cluster_name = factory.make_name('cluster_name')
1160
1580
        uuid = factory.getRandomUUID()
1161
 
        form = NodeGroupWithInterfacesForm(
 
1581
        form = NodeGroupDefineForm(
1162
1582
            status=NODEGROUP_STATUS.ACCEPTED,
1163
1583
            data={'cluster_name': cluster_name, 'uuid': uuid})
1164
1584
        self.assertTrue(form.is_valid(), form._errors)
1171
1591
        interface = make_interface_settings()
1172
1592
        del interface['management']
1173
1593
        interfaces = json.dumps([interface])
1174
 
        form = NodeGroupWithInterfacesForm(
 
1594
        form = NodeGroupDefineForm(
1175
1595
            data={'name': name, 'uuid': uuid, 'interfaces': interfaces})
1176
1596
        self.assertTrue(form.is_valid(), form._errors)
1177
1597
        form.save()
1183
1603
                uuid_nodegroup.nodegroupinterface_set.all()
1184
1604
            ])
1185
1605
 
 
1606
    def test_ensures_boot_sources_when_creating_cluster(self):
 
1607
        form = NodeGroupDefineForm(
 
1608
            data={
 
1609
                'name': factory.make_name('cluster'),
 
1610
                'uuid': factory.getRandomUUID(),
 
1611
            })
 
1612
        self.assertTrue(form.is_valid(), form._errors)
 
1613
        cluster = form.save()
 
1614
        self.assertThat(cluster.bootsource_set.all(), HasLength(1))
 
1615
 
 
1616
    def test_ensures_boot_sources_when_updating_cluster(self):
 
1617
        form = NodeGroupDefineForm(
 
1618
            instance=NodeGroup.objects.ensure_master(),
 
1619
            data={
 
1620
                'name': factory.make_name('cluster'),
 
1621
                'uuid': factory.getRandomUUID(),
 
1622
            })
 
1623
        self.assertTrue(form.is_valid(), form._errors)
 
1624
        cluster = form.save()
 
1625
        self.assertThat(cluster.bootsource_set.all(), HasLength(1))
 
1626
 
1186
1627
 
1187
1628
class TestNodeGroupEdit(MAASServerTestCase):
1188
1629
 
1374
1815
            [[], node3.system_id],
1375
1816
            [existing_nodes, node3_system_id])
1376
1817
 
 
1818
    def test_perform_action_catches_start_action_errors(self):
 
1819
        error_text = factory.getRandomString(prefix="NodeActionError")
 
1820
        exc = NodeActionError(error_text)
 
1821
        self.patch(StartNode, "execute").side_effect = exc
 
1822
        user = factory.make_user()
 
1823
        factory.make_sshkey(user)
 
1824
        node = factory.make_node(status=NODE_STATUS.READY, owner=user)
 
1825
        form = BulkNodeActionForm(
 
1826
            user=user,
 
1827
            data=dict(
 
1828
                action=StartNode.name,
 
1829
                system_id=[node.system_id]))
 
1830
 
 
1831
        self.assertTrue(form.is_valid(), form._errors)
 
1832
        done, not_actionable, not_permitted = form.save()
 
1833
        self.assertEqual(
 
1834
            [0, 1, 0],
 
1835
            [done, not_actionable, not_permitted])
 
1836
 
1377
1837
    def test_first_action_is_empty(self):
1378
1838
        form = BulkNodeActionForm(user=factory.make_admin())
1379
1839
        action = form.fields['action']
1713
2173
            list(form.fields['mac_addresses'].queryset))
1714
2174
 
1715
2175
    def test_macaddresses_widget_displays_MAC_and_node_hostname(self):
1716
 
        network = factory.make_network()
1717
 
        [
 
2176
        networks = factory.make_networks(3)
 
2177
        same_network = networks[0]
 
2178
        misc_networks = networks[1:]
 
2179
        for _ in range(3):
 
2180
            factory.make_mac_address(networks=[same_network])
 
2181
        # Create other MAC addresses.
 
2182
        for network in misc_networks:
1718
2183
            factory.make_mac_address(networks=[network])
1719
 
            for _ in range(3)]
1720
 
        # Create other MAC addresses.
1721
 
        for _ in range(2):
1722
 
            factory.make_mac_address(networks=[factory.make_network()])
1723
 
        form = NetworkForm(data={}, instance=network)
 
2184
        form = NetworkForm(data={}, instance=same_network)
1724
2185
        self.assertItemsEqual(
1725
2186
            [(mac.mac_address, "%s (%s)" % (
1726
2187
                mac.mac_address, mac.node.hostname))
1798
2259
            ValidationError,
1799
2260
            field.clean, [node.system_id for node in nodes] + ['unknown'])
1800
2261
        self.assertEquals(['Unknown node(s): unknown.'], error.messages)
 
2262
 
 
2263
 
 
2264
class TestBootSourceForm(MAASServerTestCase):
 
2265
    """Tests for `BootSourceForm`."""
 
2266
 
 
2267
    def test_edits_boot_source_object(self):
 
2268
        boot_source = factory.make_boot_source()
 
2269
        params = {
 
2270
            'url': 'http://example.com/',
 
2271
            'keyring_filename': factory.make_name('keyring_filename'),
 
2272
        }
 
2273
        form = BootSourceForm(instance=boot_source, data=params)
 
2274
        self.assertTrue(form.is_valid(), form._errors)
 
2275
        form.save()
 
2276
        boot_source = reload_object(boot_source)
 
2277
        self.assertAttributes(boot_source, params)
 
2278
 
 
2279
    def test_creates_boot_source_object_with_keyring_filename(self):
 
2280
        nodegroup = factory.make_node_group()
 
2281
        params = {
 
2282
            'url': 'http://example.com/',
 
2283
            'keyring_filename': factory.make_name('keyring_filename'),
 
2284
        }
 
2285
        form = BootSourceForm(nodegroup=nodegroup, data=params)
 
2286
        self.assertTrue(form.is_valid(), form._errors)
 
2287
        boot_source = form.save()
 
2288
        self.assertAttributes(boot_source, params)
 
2289
 
 
2290
    def test_creates_boot_source_object_with_keyring_data(self):
 
2291
        nodegroup = factory.make_node_group()
 
2292
        in_mem_file = InMemoryUploadedFile(
 
2293
            StringIO(sample_binary_data), name=factory.make_name('name'),
 
2294
            field_name=factory.make_name('field-name'),
 
2295
            content_type='application/octet-stream',
 
2296
            size=len(sample_binary_data),
 
2297
            charset=None)
 
2298
        params = {'url': 'http://example.com/'}
 
2299
        form = BootSourceForm(
 
2300
            nodegroup=nodegroup, data=params,
 
2301
            files={'keyring_data': in_mem_file})
 
2302
        self.assertTrue(form.is_valid(), form._errors)
 
2303
        boot_source = form.save()
 
2304
        self.assertEqual(sample_binary_data, bytes(boot_source.keyring_data))
 
2305
        self.assertAttributes(boot_source, params)
 
2306
 
 
2307
 
 
2308
class TestBootSourceSelectionForm(MAASServerTestCase):
 
2309
    """Tests for `BootSourceSelectionForm`."""
 
2310
 
 
2311
    def test_edits_boot_source_selection_object(self):
 
2312
        boot_source_selection = factory.make_boot_source_selection()
 
2313
        ubuntu_os = UbuntuOS()
 
2314
        new_release = factory.getRandomRelease(ubuntu_os)
 
2315
        params = {
 
2316
            'release': new_release,
 
2317
            'arches': [factory.make_name('arch'), factory.make_name('arch')],
 
2318
            'subarches': [
 
2319
                factory.make_name('subarch'), factory.make_name('subarch')],
 
2320
            'labels': [factory.make_name('label'), factory.make_name('label')],
 
2321
        }
 
2322
        form = BootSourceSelectionForm(
 
2323
            instance=boot_source_selection, data=params)
 
2324
        self.assertTrue(form.is_valid(), form._errors)
 
2325
        form.save()
 
2326
        boot_source_selection = reload_object(boot_source_selection)
 
2327
        self.assertAttributes(boot_source_selection, params)
 
2328
 
 
2329
    def test_creates_boot_source_selection_object(self):
 
2330
        boot_source = factory.make_boot_source()
 
2331
        ubuntu_os = UbuntuOS()
 
2332
        new_release = factory.getRandomRelease(ubuntu_os)
 
2333
        params = {
 
2334
            'release': new_release,
 
2335
            'arches': [factory.make_name('arch'), factory.make_name('arch')],
 
2336
            'subarches': [
 
2337
                factory.make_name('subarch'), factory.make_name('subarch')],
 
2338
            'labels': [factory.make_name('label'), factory.make_name('label')],
 
2339
        }
 
2340
        form = BootSourceSelectionForm(boot_source=boot_source, data=params)
 
2341
        self.assertTrue(form.is_valid(), form._errors)
 
2342
        boot_source_selection = form.save()
 
2343
        self.assertAttributes(boot_source_selection, params)
 
2344
 
 
2345
 
 
2346
class TestDeployForm(MAASServerTestCase):
 
2347
    """Tests for `DeployForm`."""
 
2348
 
 
2349
    def test_uses_live_data(self):
 
2350
        # The DeployForm uses the database rather than just relying on
 
2351
        # hard-coded stuff.
 
2352
        osystem = make_usable_osystem(self)
 
2353
        os_name = osystem.name
 
2354
        release_name = factory.getRandomRelease(osystem)
 
2355
        release_name = "%s/%s" % (os_name, release_name)
 
2356
        deploy_form = DeployForm()
 
2357
        os_choices = deploy_form.fields['default_osystem'].choices
 
2358
        os_names = [name for name, title in os_choices]
 
2359
        release_choices = deploy_form.fields['default_distro_series'].choices
 
2360
        release_names = [name for name, title in release_choices]
 
2361
        self.assertIn(os_name, os_names)
 
2362
        self.assertIn(release_name, release_names)
 
2363
 
 
2364
    def test_accepts_new_values(self):
 
2365
        osystem = make_usable_osystem(self)
 
2366
        os_name = osystem.name
 
2367
        release_name = factory.getRandomRelease(osystem)
 
2368
        params = {
 
2369
            'default_osystem': os_name,
 
2370
            'default_distro_series': "%s/%s" % (os_name, release_name),
 
2371
            }
 
2372
        form = DeployForm(data=params)
 
2373
        self.assertTrue(form.is_valid())
 
2374
 
 
2375
 
 
2376
class TestLicenseKeyForm(MAASServerTestCase):
 
2377
    """Tests for `LicenseKeyForm`."""
 
2378
 
 
2379
    def make_os_with_license_key(self, osystem=None, has_key=True,
 
2380
                                 key_is_valid=True, patch_registry=True):
 
2381
        """Makes a fake operating system that requires a license key, and
 
2382
        validates to key_is_valid."""
 
2383
        if osystem is None:
 
2384
            osystem = make_usable_osystem(self)
 
2385
        self.patch(osystem, 'requires_license_key').return_value = has_key
 
2386
        self.patch(osystem, 'validate_license_key').return_value = key_is_valid
 
2387
        if patch_registry:
 
2388
            self.patch(
 
2389
                OperatingSystemRegistry, 'get_item').return_value = osystem
 
2390
        return osystem
 
2391
 
 
2392
    def make_all_osystems_require_key(self, key_is_valid=True):
 
2393
        """Patches all operating systems in the registry, to require a key."""
 
2394
        for _, osystem in OperatingSystemRegistry:
 
2395
            self.patch(
 
2396
                osystem, 'requires_license_key').return_value = True
 
2397
            self.patch(
 
2398
                osystem, 'validate_license_key').return_value = key_is_valid
 
2399
 
 
2400
    def make_only_one_osystem_require_key(self, key_is_valid=True):
 
2401
        """Patches a random operating system from the registry, to require a
 
2402
        key. Patches the remaining operating systems to not require a key."""
 
2403
        osystem = factory.getRandomOS()
 
2404
        self.patch(
 
2405
            osystem, 'requires_license_key').return_value = True
 
2406
        self.patch(
 
2407
            osystem, 'validate_license_key').return_value = key_is_valid
 
2408
        for _, other_osystem in OperatingSystemRegistry:
 
2409
            if osystem == other_osystem:
 
2410
                continue
 
2411
            self.patch(
 
2412
                other_osystem, 'requires_license_key').return_value = False
 
2413
        return osystem
 
2414
 
 
2415
    def test_creates_license_key(self):
 
2416
        osystem = self.make_os_with_license_key()
 
2417
        series = factory.getRandomRelease(osystem)
 
2418
        key = factory.make_name('key')
 
2419
        definition = {
 
2420
            'osystem': osystem.name,
 
2421
            'distro_series': series,
 
2422
            'license_key': key,
 
2423
            }
 
2424
        data = definition.copy()
 
2425
        data['distro_series'] = '%s/%s' % (osystem.name, series)
 
2426
        form = LicenseKeyForm(data=data)
 
2427
        form.save()
 
2428
        license_key_obj = LicenseKey.objects.get(
 
2429
            osystem=osystem.name, distro_series=series)
 
2430
        self.assertAttributes(license_key_obj, definition)
 
2431
 
 
2432
    def test_updates_license_key(self):
 
2433
        osystem = self.make_os_with_license_key()
 
2434
        series = factory.getRandomRelease(osystem)
 
2435
        license_key = factory.make_license_key(
 
2436
            osystem=osystem.name, distro_series=series,
 
2437
            license_key=factory.make_name('key'))
 
2438
        new_key = factory.make_name('key')
 
2439
        form = LicenseKeyForm(
 
2440
            data={'license_key': new_key}, instance=license_key)
 
2441
        form.save()
 
2442
        license_key = reload_object(license_key)
 
2443
        self.assertEqual(new_key, license_key.license_key)
 
2444
 
 
2445
    def test_validates_license_key(self):
 
2446
        osystem = self.make_os_with_license_key(key_is_valid=False)
 
2447
        series = factory.getRandomRelease(osystem)
 
2448
        license_key = factory.make_license_key(
 
2449
            osystem=osystem.name, distro_series=series,
 
2450
            license_key=factory.make_name('key'))
 
2451
        new_key = factory.make_name('key')
 
2452
        form = LicenseKeyForm(
 
2453
            data={'license_key': new_key}, instance=license_key)
 
2454
        self.assertFalse(form.is_valid(), form.errors)
 
2455
        self.assertEqual(
 
2456
            {'__all__': ['Invalid license key.']},
 
2457
            form.errors)
 
2458
 
 
2459
    def test_handles_missing_osystem_in_distro_series(self):
 
2460
        osystem = self.make_os_with_license_key()
 
2461
        series = factory.getRandomRelease(osystem)
 
2462
        key = factory.make_name('key')
 
2463
        definition = {
 
2464
            'osystem': osystem.name,
 
2465
            'distro_series': series,
 
2466
            'license_key': key,
 
2467
            }
 
2468
        form = LicenseKeyForm(data=definition.copy())
 
2469
        form.save()
 
2470
        license_key_obj = LicenseKey.objects.get(
 
2471
            osystem=osystem.name, distro_series=series)
 
2472
        self.assertAttributes(license_key_obj, definition)
 
2473
 
 
2474
    def test_requires_all_fields(self):
 
2475
        form = LicenseKeyForm(data={})
 
2476
        self.assertFalse(form.is_valid(), form.errors)
 
2477
        self.assertItemsEqual(
 
2478
            ['osystem', 'distro_series', 'license_key'],
 
2479
            form.errors.keys())
 
2480
 
 
2481
    def test_errors_on_not_unique(self):
 
2482
        osystem = self.make_os_with_license_key()
 
2483
        series = factory.getRandomRelease(osystem)
 
2484
        key = factory.make_name('key')
 
2485
        factory.make_license_key(
 
2486
            osystem=osystem.name, distro_series=series, license_key=key)
 
2487
        definition = {
 
2488
            'osystem': osystem.name,
 
2489
            'distro_series': series,
 
2490
            'license_key': key,
 
2491
            }
 
2492
        form = LicenseKeyForm(data=definition)
 
2493
        self.assertFalse(form.is_valid(), form.errors)
 
2494
        self.assertEqual({
 
2495
            '__all__': ['%s %s' % (
 
2496
                "License key with this operating system and distro series",
 
2497
                "already exists.")]},
 
2498
            form.errors)
 
2499
 
 
2500
    def test_doesnt_include_default_osystem(self):
 
2501
        form = LicenseKeyForm()
 
2502
        self.assertNotIn(('', 'Default OS'), form.fields['osystem'].choices)
 
2503
 
 
2504
    def test_includes_all_osystems(self):
 
2505
        self.make_all_osystems_require_key()
 
2506
        osystems = [osystem for _, osystem in OperatingSystemRegistry]
 
2507
        expected = [
 
2508
            (osystem.name, osystem.title)
 
2509
            for osystem in osystems
 
2510
            ]
 
2511
        form = LicenseKeyForm()
 
2512
        self.assertItemsEqual(expected, form.fields['osystem'].choices)
 
2513
 
 
2514
    def test_includes_all_osystems_sorted(self):
 
2515
        self.make_all_osystems_require_key()
 
2516
        osystems = [osystem for _, osystem in OperatingSystemRegistry]
 
2517
        osystems = sorted(osystems, key=lambda osystem: osystem.title)
 
2518
        expected = [
 
2519
            (osystem.name, osystem.title)
 
2520
            for osystem in osystems
 
2521
            ]
 
2522
        form = LicenseKeyForm()
 
2523
        self.assertEqual(expected, form.fields['osystem'].choices)
 
2524
 
 
2525
    def test_includes_only_osystems_that_require_license_keys(self):
 
2526
        osystem = self.make_only_one_osystem_require_key()
 
2527
        expected = [(osystem.name, osystem.title)]
 
2528
        form = LicenseKeyForm()
 
2529
        self.assertEquals(expected, form.fields['osystem'].choices)
 
2530
 
 
2531
    def test_doesnt_include_default_distro_series(self):
 
2532
        form = LicenseKeyForm()
 
2533
        self.assertNotIn(
 
2534
            ('', 'Default OS Release'), form.fields['distro_series'].choices)
 
2535
 
 
2536
    def test_includes_all_distro_series(self):
 
2537
        self.make_all_osystems_require_key()
 
2538
        osystems = [osystem for _, osystem in OperatingSystemRegistry]
 
2539
        expected = []
 
2540
        for osystem in osystems:
 
2541
            releases = osystem.get_supported_releases()
 
2542
            for name, title in osystem.format_release_choices(releases):
 
2543
                expected.append((
 
2544
                    '%s/%s' % (osystem.name, name),
 
2545
                    title
 
2546
                    ))
 
2547
        form = LicenseKeyForm()
 
2548
        self.assertItemsEqual(expected, form.fields['distro_series'].choices)
 
2549
 
 
2550
    def test_includes_only_distro_series_that_require_license_keys(self):
 
2551
        osystem = self.make_only_one_osystem_require_key()
 
2552
        releases = osystem.get_supported_releases()
 
2553
        expected = [
 
2554
            ('%s/%s' % (osystem.name, name), title)
 
2555
            for name, title in osystem.format_release_choices(releases)
 
2556
            ]
 
2557
        form = LicenseKeyForm()
 
2558
        self.assertEquals(expected, form.fields['distro_series'].choices)