1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright [2010] [Anso Labs, LLC]
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
19
Runs on each compute node, managing the
20
hypervisor using libvirt.
32
from nova import vendor
33
from twisted.internet import defer
34
from twisted.internet import task
35
from twisted.application import service
39
except Exception, err:
40
logging.warning('no libvirt found')
42
from nova import exception
43
from nova import fakevirt
44
from nova import flags
45
from nova import process
46
from nova import utils
47
from nova.compute import disk
48
from nova.compute import model
49
from nova.compute import network
50
from nova.objectstore import image # for image_path flag
53
flags.DEFINE_string('libvirt_xml_template',
54
utils.abspath('compute/libvirt.xml.template'),
55
'Network XML Template')
56
flags.DEFINE_bool('use_s3', True,
57
'whether to get images from s3 or use local copy')
58
flags.DEFINE_string('instances_path', utils.abspath('../instances'),
59
'where instances are stored on disk')
60
flags.DEFINE_string('instances_prefix', 'compute-',
61
'prefix for keepers for instances')
64
INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0}
65
INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10}
66
INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10}
67
INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10}
68
INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10}
69
INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10}
71
# The number of processes to start in our process pool
72
# TODO(termie): this should probably be a flag and the pool should probably
77
class Node(object, service.Service):
79
Manages the running instances.
82
""" load configuration options for this node and connect to libvirt """
83
super(Node, self).__init__()
85
self._conn = self._get_connection()
86
self._pool = process.Pool(PROCESS_POOL_SIZE)
87
self.instdir = model.InstanceDirectory()
88
# TODO(joshua): This needs to ensure system state, specifically: modprobe aoe
90
def _get_connection(self):
91
""" returns a libvirt connection object """
92
# TODO(termie): maybe lazy load after initial check for permissions
93
# TODO(termie): check whether we can be disconnected
94
if FLAGS.fake_libvirt:
95
conn = fakevirt.FakeVirtConnection.instance()
97
auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],
100
conn = libvirt.openAuth('qemu:///system', auth, 0)
102
logging.error('Failed to open connection to the hypervisor')
107
""" simple test of an AMQP message call """
108
return defer.succeed('PONG')
110
def get_instance(self, instance_id):
111
# inst = self.instdir.get(instance_id)
113
if self.instdir.exists(instance_id):
114
return Instance.fromName(self._conn, self._pool, instance_id)
117
@exception.wrap_exception
118
def adopt_instances(self):
119
""" if there are instances already running, adopt them """
120
return defer.succeed(0)
121
instance_names = [self._conn.lookupByID(x).name()
122
for x in self._conn.listDomainsID()]
123
for name in instance_names:
125
new_inst = Instance.fromName(self._conn, self._pool, name)
126
new_inst.update_state()
129
return defer.succeed(len(self._instances))
131
@exception.wrap_exception
132
def describe_instances(self):
134
for inst in self.instdir.by_node(FLAGS.node_name):
135
retval[inst['instance_id']] = (Instance.fromName(self._conn, self._pool, inst['instance_id']))
138
@defer.inlineCallbacks
139
def report_state(self):
140
logging.debug("Reporting State")
143
@exception.wrap_exception
144
def run_instance(self, instance_id, **_kwargs):
145
""" launch a new instance with specified options """
146
logging.debug("Starting instance %s..." % (instance_id))
147
inst = self.instdir.get(instance_id)
148
inst['node_name'] = FLAGS.node_name
150
# TODO(vish) check to make sure the availability zone matches
151
new_inst = Instance(self._conn, name=instance_id,
152
pool=self._pool, data=inst)
153
if new_inst.is_running():
154
raise exception.Error("Instance is already running")
158
@exception.wrap_exception
159
def terminate_instance(self, instance_id):
160
""" terminate an instance on this machine """
161
logging.debug("Got told to terminate instance %s" % instance_id)
162
instance = self.get_instance(instance_id)
163
# inst = self.instdir.get(instance_id)
165
raise exception.Error(
166
'trying to terminate unknown instance: %s' % instance_id)
167
d = instance.destroy()
168
# d.addCallback(lambda x: inst.destroy())
171
@exception.wrap_exception
172
def reboot_instance(self, instance_id):
173
""" reboot an instance on this server
174
KVM doesn't support reboot, so we terminate and restart """
175
instance = self.get_instance(instance_id)
177
raise exception.Error(
178
'trying to reboot unknown instance: %s' % instance_id)
179
return instance.reboot()
181
@defer.inlineCallbacks
182
@exception.wrap_exception
183
def get_console_output(self, instance_id):
184
""" send the console output for an instance """
185
logging.debug("Getting console output for %s" % (instance_id))
186
inst = self.instdir.get(instance_id)
187
instance = self.get_instance(instance_id)
189
raise exception.Error(
190
'trying to get console log for unknown: %s' % instance_id)
191
rv = yield instance.console_output()
192
# TODO(termie): this stuff belongs in the API layer, no need to
193
# munge the data we send to ourselves
194
output = {"InstanceId" : instance_id,
196
"output" : base64.b64encode(rv)}
197
defer.returnValue(output)
199
@defer.inlineCallbacks
200
@exception.wrap_exception
201
def attach_volume(self, instance_id = None,
202
aoe_device = None, mountpoint = None):
203
utils.runthis("Attached Volume: %s",
204
"sudo virsh attach-disk %s /dev/etherd/%s %s"
205
% (instance_id, aoe_device, mountpoint.split("/")[-1]))
206
return defer.succeed(True)
209
utils.runthis("Doin an AoE discover, returns %s", "sudo aoe-discover")
210
utils.runthis("Doin an AoE stat, returns %s", "sudo aoe-stat")
212
@exception.wrap_exception
213
def detach_volume(self, instance_id, mountpoint):
214
""" detach a volume from an instance """
215
# despite the documentation, virsh detach-disk just wants the device
216
# name without the leading /dev/
217
target = mountpoint.rpartition('/dev/')[2]
218
utils.runthis("Detached Volume: %s", "sudo virsh detach-disk %s %s "
219
% (instance_id, target))
220
return defer.succeed(True)
224
def __init__(self, group_id):
225
self.group_id = group_id
228
class ProductCode(object):
229
def __init__(self, product_code):
230
self.product_code = product_code
233
def _create_image(data, libvirt_xml):
234
""" create libvirt.xml and copy files into instance path """
235
def basepath(path=''):
236
return os.path.abspath(os.path.join(data['basepath'], path))
238
def imagepath(path=''):
239
return os.path.join(FLAGS.images_path, path)
242
return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path)
244
logging.info(basepath('disk'))
246
os.makedirs(data['basepath'])
247
os.chmod(data['basepath'], 0777)
249
# TODO: there is already an instance with this name, do something
252
logging.info('Creating image for: %s', data['instance_id'])
253
f = open(basepath('libvirt.xml'), 'w')
256
if not FLAGS.fake_libvirt:
258
if not os.path.exists(basepath('disk')):
259
utils.fetchfile(image_url("%s/image" % data['image_id']),
260
basepath('disk-raw'))
261
if not os.path.exists(basepath('kernel')):
262
utils.fetchfile(image_url("%s/image" % data['kernel_id']),
264
if not os.path.exists(basepath('ramdisk')):
265
utils.fetchfile(image_url("%s/image" % data['ramdisk_id']),
268
if not os.path.exists(basepath('disk')):
269
shutil.copyfile(imagepath("%s/image" % data['image_id']),
270
basepath('disk-raw'))
271
if not os.path.exists(basepath('kernel')):
272
shutil.copyfile(imagepath("%s/image" % data['kernel_id']),
274
if not os.path.exists(basepath('ramdisk')):
275
shutil.copyfile(imagepath("%s/image" %
279
logging.info('Injecting key data into image %s' %
281
disk.inject_key(data['key_data'], basepath('disk-raw'))
282
if os.path.exists(basepath('disk')):
283
os.remove(basepath('disk'))
284
bytes = INSTANCE_TYPES[data['instance_type']]['local_gb'] * 1024 * 1024 * 1024
285
disk.partition(basepath('disk-raw'), basepath('disk'), bytes)
286
logging.info('Done create image for: %s', data['instance_id'])
287
except Exception as ex:
288
return {'exception': ex}
291
class Instance(object):
301
def is_pending(self):
302
return (self.state == Instance.NOSTATE or self.state == 'pending')
304
def is_destroyed(self):
305
return self.state == Instance.SHUTOFF
307
def is_running(self):
308
logging.debug("Instance state is: %s" % self.state)
309
return (self.state == Instance.RUNNING or self.state == 'running')
311
def __init__(self, conn, pool, name, data):
312
# TODO(termie): pool should probably be a singleton instead of being passed
313
# here and in the classmethods
314
""" spawn an instance with a given name """
315
# TODO(termie): pool should probably be a singleton instead of being passed
316
# here and in the classmethods
319
self.datamodel = data
322
# NOTE(termie): to be passed to multiprocess self._s must be
323
# pickle-able by cPickle
326
# TODO(termie): is instance_type that actual name for this?
327
size = data.get('instance_type', FLAGS.default_instance_type)
328
if size not in INSTANCE_TYPES:
329
raise exception.Error('invalid instance type: %s' % size)
331
self._s.update(INSTANCE_TYPES[size])
333
self._s['name'] = name
334
self._s['instance_id'] = name
335
self._s['instance_type'] = size
336
self._s['mac_address'] = data.get(
337
'mac_address', 'df:df:df:df:df:df')
338
self._s['basepath'] = data.get(
339
'basepath', os.path.abspath(
340
os.path.join(FLAGS.instances_path, self.name)))
341
self._s['memory_kb'] = int(self._s['memory_mb']) * 1024
342
# TODO(joshua) - Get this from network directory controller later
343
self._s['bridge_name'] = data.get('bridge_name', 'br0')
344
self._s['image_id'] = data.get('image_id', FLAGS.default_image)
345
self._s['kernel_id'] = data.get('kernel_id', FLAGS.default_kernel)
346
self._s['ramdisk_id'] = data.get('ramdisk_id', FLAGS.default_ramdisk)
347
self._s['owner_id'] = data.get('owner_id', '')
348
self._s['node_name'] = data.get('node_name', '')
349
self._s['user_data'] = data.get('user_data', '')
350
self._s['ami_launch_index'] = data.get('ami_launch_index', None)
351
self._s['launch_time'] = data.get('launch_time', None)
352
self._s['reservation_id'] = data.get('reservation_id', None)
353
# self._s['state'] = Instance.NOSTATE
354
self._s['state'] = data.get('state', Instance.NOSTATE)
356
self._s['key_data'] = data.get('key_data', None)
358
# TODO: we may not need to save the next few
359
self._s['groups'] = data.get('security_group', ['default'])
360
self._s['product_codes'] = data.get('product_code', [])
361
self._s['key_name'] = data.get('key_name', None)
362
self._s['addressing_type'] = data.get('addressing_type', None)
363
self._s['availability_zone'] = data.get('availability_zone', 'fixme')
365
#TODO: put real dns items here
366
self._s['private_dns_name'] = data.get('private_dns_name', 'fixme')
367
self._s['dns_name'] = data.get('dns_name',
368
self._s['private_dns_name'])
369
logging.debug("Finished init of Instance with id of %s" % name)
372
# TODO(termie): cache?
373
logging.debug("Starting the toXML method")
374
libvirt_xml = open(FLAGS.libvirt_xml_template).read()
375
xml_info = self._s.copy()
376
#xml_info.update(self._s)
378
# TODO(termie): lazy lazy hack because xml is annoying
379
xml_info['nova'] = json.dumps(self._s)
380
libvirt_xml = libvirt_xml % xml_info
381
logging.debug("Finished the toXML method")
386
def fromName(cls, conn, pool, name):
387
""" use the saved data for reloading the instance """
388
# if FLAGS.fake_libvirt:
389
# raise Exception('this is a bit useless, eh?')
391
instdir = model.InstanceDirectory()
392
instance = instdir.get(name)
393
return cls(conn=conn, pool=pool, name=name, data=instance)
397
return self._s['state']
401
return self._s['name']
407
logging.debug("Getting info for dom %s" % self.name)
408
virt_dom = self._conn.lookupByName(self.name)
409
(state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info()
410
return {'state': state,
414
'cpu_time': cpu_time}
416
def update_state(self):
418
self._s['state'] = info['state']
419
self.datamodel['state'] = info['state']
420
self.datamodel['node_name'] = FLAGS.node_name
421
self.datamodel.save()
423
@exception.wrap_exception
425
if self.is_destroyed():
426
self.datamodel.destroy()
427
raise exception.Error('trying to destroy already destroyed'
428
' instance: %s' % self.name)
430
self._s['state'] = Instance.SHUTDOWN
431
self.datamodel['state'] = 'shutting_down'
432
self.datamodel.save()
434
virt_dom = self._conn.lookupByName(self.name)
436
except Exception, _err:
438
# If the instance is already terminated, we're still happy
440
d.addCallback(lambda x: self.datamodel.destroy())
441
# TODO(termie): short-circuit me for tests
442
# WE'LL save this for when we do shutdown,
443
# instead of destroy - but destroy returns immediately
444
timer = task.LoopingCall(f=None)
445
def _wait_for_shutdown():
448
if info['state'] == Instance.SHUTDOWN:
449
self._s['state'] = Instance.SHUTDOWN
450
#self.datamodel['state'] = 'shutdown'
451
#self.datamodel.save()
455
self._s['state'] = Instance.SHUTDOWN
458
timer.f = _wait_for_shutdown
459
timer.start(interval=0.5, now=True)
462
@defer.inlineCallbacks
463
@exception.wrap_exception
465
# if not self.is_running():
466
# raise exception.Error(
467
# 'trying to reboot a non-running'
468
# 'instance: %s (state: %s)' % (self.name, self.state))
470
yield self._conn.lookupByName(self.name).destroy()
471
self.datamodel['state'] = 'rebooting'
472
self.datamodel.save()
473
self._s['state'] = Instance.NOSTATE
474
self._conn.createXML(self.toXml(), 0)
475
# TODO(termie): this should actually register a callback to check
476
# for successful boot
477
self.datamodel['state'] = 'running'
478
self.datamodel.save()
479
self._s['state'] = Instance.RUNNING
480
logging.debug('rebooted instance %s' % self.name)
481
defer.returnValue(None)
483
@exception.wrap_exception
485
self.datamodel['state'] = "spawning"
486
self.datamodel.save()
487
logging.debug("Starting spawn in Instance")
489
def _launch(retvals):
490
self.datamodel['state'] = 'launching'
491
self.datamodel.save()
493
logging.debug("Arrived in _launch")
494
if retvals and 'exception' in retvals:
495
raise retvals['exception']
496
self._conn.createXML(self.toXml(), 0)
497
# TODO(termie): this should actually register
498
# a callback to check for successful boot
499
self._s['state'] = Instance.RUNNING
500
self.datamodel['state'] = 'running'
501
self.datamodel.save()
502
logging.debug("Instance is running")
503
except Exception as ex:
505
self.datamodel['state'] = 'shutdown'
506
self.datamodel.save()
509
d = self._pool.apply(_create_image, self._s, xml)
510
d.addCallback(_launch)
513
@exception.wrap_exception
514
def console_output(self):
515
if not FLAGS.fake_libvirt:
516
fname = os.path.abspath(
517
os.path.join(self._s['basepath'], 'console.log'))
518
with open(fname, 'r') as f:
521
console = 'FAKE CONSOLE OUTPUT'
522
return defer.succeed(console)
524
def generate_mac(self):
525
mac = [0x00, 0x16, 0x3e, random.randint(0x00, 0x7f),
526
random.randint(0x00, 0xff), random.randint(0x00, 0xff)
528
return ':'.join(map(lambda x: "%02x" % x, mac))
532
class NetworkNode(Node):
533
def __init__(self, **kwargs):
534
super(NetworkNode, self).__init__(**kwargs)
537
def add_network(self, net_dict):
538
net = network.VirtNetwork(**net_dict)
539
self.virtNets[net.name] = net
540
self.virtNets[net.name].express()
541
return defer.succeed({'retval': 'network added'})
543
@exception.wrap_exception
544
def run_instance(self, instance_id, **kwargs):
545
inst = self.instdir.get(instance_id)
546
net_dict = json.loads(inst.get('network_str', "{}"))
547
self.add_network(net_dict)
548
return super(NetworkNode, self).run_instance(instance_id, **kwargs)