~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to smoketests/test_netadmin.py

  • Committer: Eric Day
  • Date: 2010-10-21 18:49:51 UTC
  • mto: This revision was merged to the branch mainline in revision 377.
  • Revision ID: eday@oddments.org-20101021184951-x0vs3s8y7mc0aeyy
PEP8 and pylint cleanup. There should be no functional changes here, just style changes to get violations down.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
 
19
 
import commands
20
 
import os
21
 
import random
22
 
import sys
23
 
import time
24
 
 
25
 
# If ../nova/__init__.py exists, add ../ to Python search path, so that
26
 
# it will override what happens to be installed in /usr/(local/)lib/python...
27
 
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
28
 
                                   os.pardir,
29
 
                                   os.pardir))
30
 
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
31
 
    sys.path.insert(0, possible_topdir)
32
 
 
33
 
from smoketests import flags
34
 
from smoketests import base
35
 
 
36
 
 
37
 
FLAGS = flags.FLAGS
38
 
 
39
 
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
40
 
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
41
 
TEST_KEY = '%s_key' % TEST_PREFIX
42
 
TEST_GROUP = '%s_group' % TEST_PREFIX
43
 
 
44
 
 
45
 
class AddressTests(base.UserSmokeTestCase):
46
 
    def test_000_setUp(self):
47
 
        self.create_key_pair(self.conn, TEST_KEY)
48
 
        reservation = self.conn.run_instances(FLAGS.test_image,
49
 
                                              instance_type='m1.tiny',
50
 
                                              key_name=TEST_KEY)
51
 
        self.data['instance'] = reservation.instances[0]
52
 
        if not self.wait_for_running(self.data['instance']):
53
 
            self.fail('instance failed to start')
54
 
        self.data['instance'].update()
55
 
        if not self.wait_for_ping(self.data['instance'].private_dns_name):
56
 
            self.fail('could not ping instance')
57
 
        if not self.wait_for_ssh(self.data['instance'].private_dns_name,
58
 
                                 TEST_KEY):
59
 
            self.fail('could not ssh to instance')
60
 
 
61
 
    def test_001_can_allocate_floating_ip(self):
62
 
        result = self.conn.allocate_address()
63
 
        self.assertTrue(hasattr(result, 'public_ip'))
64
 
        self.data['public_ip'] = result.public_ip
65
 
 
66
 
    def test_002_can_associate_ip_with_instance(self):
67
 
        result = self.conn.associate_address(self.data['instance'].id,
68
 
                                             self.data['public_ip'])
69
 
        self.assertTrue(result)
70
 
 
71
 
    def test_003_can_ssh_with_public_ip(self):
72
 
        ssh_authorized = False
73
 
        groups = self.conn.get_all_security_groups(['default'])
74
 
        for rule in groups[0].rules:
75
 
            if (rule.ip_protocol == 'tcp' and
76
 
                int(rule.from_port) <= 22 and
77
 
                int(rule.to_port) >= 22):
78
 
                ssh_authorized = True
79
 
                break
80
 
        if not ssh_authorized:
81
 
            self.conn.authorize_security_group('default',
82
 
                                               ip_protocol='tcp',
83
 
                                               from_port=22,
84
 
                                               to_port=22)
85
 
        try:
86
 
            if not self.wait_for_ssh(self.data['public_ip'], TEST_KEY):
87
 
                self.fail('could not ssh to public ip')
88
 
        finally:
89
 
            if not ssh_authorized:
90
 
                self.conn.revoke_security_group('default',
91
 
                                                ip_protocol='tcp',
92
 
                                                from_port=22,
93
 
                                                to_port=22)
94
 
 
95
 
    def test_004_can_disassociate_ip_from_instance(self):
96
 
        result = self.conn.disassociate_address(self.data['public_ip'])
97
 
        self.assertTrue(result)
98
 
 
99
 
    def test_005_can_deallocate_floating_ip(self):
100
 
        result = self.conn.release_address(self.data['public_ip'])
101
 
        self.assertTrue(result)
102
 
 
103
 
    def test_999_tearDown(self):
104
 
        self.delete_key_pair(self.conn, TEST_KEY)
105
 
        self.conn.terminate_instances([self.data['instance'].id])
106
 
 
107
 
 
108
 
class SecurityGroupTests(base.UserSmokeTestCase):
109
 
 
110
 
    def __get_metadata_item(self, category):
111
 
        id_url = "latest/meta-data/%s" % category
112
 
        options = "-f -s --max-time 1"
113
 
        command = "curl %s %s/%s" % (options, self.data['public_ip'], id_url)
114
 
        status, output = commands.getstatusoutput(command)
115
 
        value = output.strip()
116
 
        if status > 0:
117
 
            return False
118
 
        return value
119
 
 
120
 
    def __public_instance_is_accessible(self):
121
 
        instance_id = self.__get_metadata_item('instance-id')
122
 
        if not instance_id:
123
 
            return False
124
 
        if instance_id != self.data['instance'].id:
125
 
            raise Exception("Wrong instance id. Expected: %s, Got: %s" %
126
 
                               (self.data['instance'].id, instance_id))
127
 
        return True
128
 
 
129
 
    def test_001_can_create_security_group(self):
130
 
        self.conn.create_security_group(TEST_GROUP, description='test')
131
 
 
132
 
        groups = self.conn.get_all_security_groups()
133
 
        self.assertTrue(TEST_GROUP in [group.name for group in groups])
134
 
 
135
 
    def test_002_can_launch_instance_in_security_group(self):
136
 
        with open("proxy.sh") as f:
137
 
            user_data = f.read()
138
 
        self.create_key_pair(self.conn, TEST_KEY)
139
 
        reservation = self.conn.run_instances(FLAGS.test_image,
140
 
                                              key_name=TEST_KEY,
141
 
                                              security_groups=[TEST_GROUP],
142
 
                                              user_data=user_data,
143
 
                                              instance_type='m1.tiny')
144
 
 
145
 
        self.data['instance'] = reservation.instances[0]
146
 
        if not self.wait_for_running(self.data['instance']):
147
 
            self.fail('instance failed to start')
148
 
        self.data['instance'].update()
149
 
 
150
 
    def test_003_can_authorize_security_group_ingress(self):
151
 
        self.assertTrue(self.conn.authorize_security_group(TEST_GROUP,
152
 
                                                           ip_protocol='tcp',
153
 
                                                           from_port=80,
154
 
                                                           to_port=80))
155
 
 
156
 
    def test_004_can_access_metadata_over_public_ip(self):
157
 
        result = self.conn.allocate_address()
158
 
        self.assertTrue(hasattr(result, 'public_ip'))
159
 
        self.data['public_ip'] = result.public_ip
160
 
 
161
 
        result = self.conn.associate_address(self.data['instance'].id,
162
 
                                             self.data['public_ip'])
163
 
        start_time = time.time()
164
 
        try:
165
 
            while not self.__public_instance_is_accessible():
166
 
                # 1 minute to launch
167
 
                if time.time() - start_time > 60:
168
 
                    raise Exception("Timeout")
169
 
                time.sleep(1)
170
 
        finally:
171
 
            result = self.conn.disassociate_address(self.data['public_ip'])
172
 
 
173
 
    def test_005_validate_metadata(self):
174
 
 
175
 
        instance = self.data['instance']
176
 
        self.assertTrue(instance.instance_type,
177
 
                            self.__get_metadata_item("instance-type"))
178
 
        #FIXME(dprince): validate more metadata here
179
 
 
180
 
    def test_006_can_revoke_security_group_ingress(self):
181
 
        self.assertTrue(self.conn.revoke_security_group(TEST_GROUP,
182
 
                                                        ip_protocol='tcp',
183
 
                                                        from_port=80,
184
 
                                                        to_port=80))
185
 
        start_time = time.time()
186
 
        while self.__public_instance_is_accessible():
187
 
            # 1 minute to teardown
188
 
            if time.time() - start_time > 60:
189
 
                raise Exception("Timeout")
190
 
            time.sleep(1)
191
 
 
192
 
    def test_999_tearDown(self):
193
 
        self.conn.delete_key_pair(TEST_KEY)
194
 
        self.conn.delete_security_group(TEST_GROUP)
195
 
        groups = self.conn.get_all_security_groups()
196
 
        self.assertFalse(TEST_GROUP in [group.name for group in groups])
197
 
        self.conn.terminate_instances([self.data['instance'].id])
198
 
        self.assertTrue(self.conn.release_address(self.data['public_ip']))