1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
19
from datetime import datetime, timedelta
23
from nova import flags
25
from nova import utils
26
from nova.compute import model
32
class ModelTestCase(test.TrialTestCase):
34
super(ModelTestCase, self).setUp()
35
self.flags(connection_type='fake',
39
model.Instance('i-test').destroy()
40
model.Host('testhost').destroy()
41
model.Daemon('testhost', 'nova-testdaemon').destroy()
43
def create_instance(self):
44
inst = model.Instance('i-test')
45
inst['reservation_id'] = 'r-test'
46
inst['launch_time'] = '10'
47
inst['user_id'] = 'fake'
48
inst['project_id'] = 'fake'
49
inst['instance_type'] = 'm1.tiny'
50
inst['mac_address'] = utils.generate_mac()
51
inst['ami_launch_index'] = 0
52
inst['private_dns_name'] = '10.0.0.1'
56
def create_host(self):
57
host = model.Host('testhost')
61
def create_daemon(self):
62
daemon = model.Daemon('testhost', 'nova-testdaemon')
66
def create_session_token(self):
67
session_token = model.SessionToken('tk12341234')
68
session_token['user'] = 'testuser'
72
def test_create_instance(self):
73
"""store with create_instace, then test that a load finds it"""
74
instance = self.create_instance()
75
old = model.Instance(instance.identifier)
76
self.assertFalse(old.is_new_record())
78
def test_delete_instance(self):
79
"""create, then destroy, then make sure loads a new record"""
80
instance = self.create_instance()
82
newinst = model.Instance('i-test')
83
self.assertTrue(newinst.is_new_record())
85
def test_instance_added_to_set(self):
86
"""create, then check that it is listed in global set"""
87
instance = self.create_instance()
89
for x in model.InstanceDirectory().all:
90
if x.identifier == 'i-test':
94
def test_instance_associates_project(self):
95
"""create, then check that it is listed for the project"""
96
instance = self.create_instance()
98
for x in model.InstanceDirectory().by_project(instance.project):
99
if x.identifier == 'i-test':
103
def test_instance_associates_ip(self):
104
"""create, then check that it is listed for the ip"""
105
instance = self.create_instance()
107
x = model.InstanceDirectory().by_ip(instance['private_dns_name'])
108
self.assertEqual(x.identifier, 'i-test')
110
def test_instance_associates_node(self):
111
"""create, then check that it is listed for the node_name"""
112
instance = self.create_instance()
114
for x in model.InstanceDirectory().by_node(FLAGS.node_name):
115
if x.identifier == 'i-test':
117
self.assertFalse(found)
118
instance['node_name'] = 'test_node'
120
for x in model.InstanceDirectory().by_node('test_node'):
121
if x.identifier == 'i-test':
126
def test_host_class_finds_hosts(self):
127
host = self.create_host()
128
self.assertEqual('testhost', model.Host.lookup('testhost').identifier)
130
def test_host_class_doesnt_find_missing_hosts(self):
131
rv = model.Host.lookup('woahnelly')
132
self.assertEqual(None, rv)
134
def test_create_host(self):
135
"""store with create_host, then test that a load finds it"""
136
host = self.create_host()
137
old = model.Host(host.identifier)
138
self.assertFalse(old.is_new_record())
140
def test_delete_host(self):
141
"""create, then destroy, then make sure loads a new record"""
142
instance = self.create_host()
144
newinst = model.Host('testhost')
145
self.assertTrue(newinst.is_new_record())
147
def test_host_added_to_set(self):
148
"""create, then check that it is included in list"""
149
instance = self.create_host()
151
for x in model.Host.all():
152
if x.identifier == 'testhost':
156
def test_create_daemon_two_args(self):
157
"""create a daemon with two arguments"""
158
d = self.create_daemon()
159
d = model.Daemon('testhost', 'nova-testdaemon')
160
self.assertFalse(d.is_new_record())
162
def test_create_daemon_single_arg(self):
163
"""Create a daemon using the combined host:bin format"""
164
d = model.Daemon("testhost:nova-testdaemon")
166
d = model.Daemon('testhost:nova-testdaemon')
167
self.assertFalse(d.is_new_record())
169
def test_equality_of_daemon_single_and_double_args(self):
170
"""Create a daemon using the combined host:bin arg, find with 2"""
171
d = model.Daemon("testhost:nova-testdaemon")
173
d = model.Daemon('testhost', 'nova-testdaemon')
174
self.assertFalse(d.is_new_record())
176
def test_equality_daemon_of_double_and_single_args(self):
177
"""Create a daemon using the combined host:bin arg, find with 2"""
178
d = self.create_daemon()
179
d = model.Daemon('testhost:nova-testdaemon')
180
self.assertFalse(d.is_new_record())
182
def test_delete_daemon(self):
183
"""create, then destroy, then make sure loads a new record"""
184
instance = self.create_daemon()
186
newinst = model.Daemon('testhost', 'nova-testdaemon')
187
self.assertTrue(newinst.is_new_record())
189
def test_daemon_heartbeat(self):
190
"""Create a daemon, sleep, heartbeat, check for update"""
191
d = self.create_daemon()
195
d2 = model.Daemon('testhost', 'nova-testdaemon')
196
ts2 = d2['updated_at']
197
self.assert_(ts2 > ts)
199
def test_daemon_added_to_set(self):
200
"""create, then check that it is included in list"""
201
instance = self.create_daemon()
203
for x in model.Daemon.all():
204
if x.identifier == 'testhost:nova-testdaemon':
208
def test_daemon_associates_host(self):
209
"""create, then check that it is listed for the host"""
210
instance = self.create_daemon()
212
for x in model.Daemon.by_host('testhost'):
213
if x.identifier == 'testhost:nova-testdaemon':
215
self.assertTrue(found)
217
def test_create_session_token(self):
219
d = self.create_session_token()
220
d = model.SessionToken(d.token)
221
self.assertFalse(d.is_new_record())
223
def test_delete_session_token(self):
224
"""create, then destroy, then make sure loads a new record"""
225
instance = self.create_session_token()
227
newinst = model.SessionToken(instance.token)
228
self.assertTrue(newinst.is_new_record())
230
def test_session_token_added_to_set(self):
231
"""create, then check that it is included in list"""
232
instance = self.create_session_token()
234
for x in model.SessionToken.all():
235
if x.identifier == instance.token:
239
def test_session_token_associates_user(self):
240
"""create, then check that it is listed for the user"""
241
instance = self.create_session_token()
243
for x in model.SessionToken.associated_to('user', 'testuser'):
244
if x.identifier == instance.identifier:
246
self.assertTrue(found)
248
def test_session_token_generation(self):
249
instance = model.SessionToken.generate('username', 'TokenType')
250
self.assertFalse(instance.is_new_record())
252
def test_find_generated_session_token(self):
253
instance = model.SessionToken.generate('username', 'TokenType')
254
found = model.SessionToken.lookup(instance.identifier)
257
def test_update_session_token_expiry(self):
258
instance = model.SessionToken('tk12341234')
259
oldtime = datetime.utcnow()
260
instance['expiry'] = oldtime.strftime(utils.TIME_FORMAT)
261
instance.update_expiry()
262
expiry = utils.parse_isotime(instance['expiry'])
263
self.assert_(expiry > datetime.utcnow())
265
def test_session_token_lookup_when_expired(self):
266
instance = model.SessionToken.generate("testuser")
267
instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
269
inst = model.SessionToken.lookup(instance.identifier)
270
self.assertFalse(inst)
272
def test_session_token_lookup_when_not_expired(self):
273
instance = model.SessionToken.generate("testuser")
274
inst = model.SessionToken.lookup(instance.identifier)
277
def test_session_token_is_expired_when_expired(self):
278
instance = model.SessionToken.generate("testuser")
279
instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
280
self.assert_(instance.is_expired())
282
def test_session_token_is_expired_when_not_expired(self):
283
instance = model.SessionToken.generate("testuser")
284
self.assertFalse(instance.is_expired())
286
def test_session_token_ttl(self):
287
instance = model.SessionToken.generate("testuser")
288
now = datetime.utcnow()
289
delta = timedelta(hours=1)
290
instance['expiry'] = (now + delta).strftime(utils.TIME_FORMAT)
291
# give 5 seconds of fuzziness
292
self.assert_(abs(instance.ttl() - FLAGS.auth_token_ttl) < 5)