71
72
return {'private_key': private_key, 'fingerprint': fingerprint}
75
def ec2_id_to_internal_id(ec2_id):
76
"""Convert an ec2 ID (i-[base 36 number]) to an internal id (int)"""
77
return int(ec2_id[2:], 36)
80
def internal_id_to_ec2_id(internal_id):
81
"""Convert an internal ID (int) to an ec2 ID (i-[base 36 number])"""
83
while internal_id != 0:
84
internal_id, remainder = divmod(internal_id, 36)
85
digits.append('0123456789abcdefghijklmnopqrstuvwxyz'[remainder])
86
return "i-%s" % ''.join(reversed(digits))
74
89
class CloudController(object):
75
90
""" CloudController provides the critical dispatch between
76
91
inbound API calls through the endpoint and messages
102
117
def _get_mpi_data(self, project_id):
104
for instance in db.instance_get_by_project(None, project_id):
119
for instance in db.instance_get_all_by_project(None, project_id):
105
120
if instance['fixed_ip']:
106
line = '%s slots=%d' % (instance['fixed_ip']['str_id'],
121
line = '%s slots=%d' % (instance['fixed_ip']['address'],
107
122
INSTANCE_TYPES[instance['instance_type']]['vcpus'])
108
123
key = str(instance['key_name'])
109
124
if key in result:
144
159
'hostname': hostname,
145
160
'instance-action': 'none',
146
'instance-id': instance_ref['str_id'],
161
'instance-id': internal_id_to_ec2_id(instance_ref['internal_id']),
147
162
'instance-type': instance_ref['instance_type'],
148
163
'local-hostname': hostname,
149
164
'local-ipv4': address,
243
258
def delete_security_group(self, context, group_name, **kwargs):
246
def get_console_output(self, context, instance_id, **kwargs):
247
# instance_id is passed in as a list of instances
248
instance_ref = db.instance_get_by_str(context, instance_id[0])
261
def get_console_output(self, context, ec2_id_list, **kwargs):
262
# ec2_id_list is passed in as a list of instances
263
ec2_id = ec2_id_list[0]
264
internal_id = ec2_id_to_internal_id(ec2_id)
265
instance_ref = db.instance_get_by_internal_id(context, internal_id)
249
266
return rpc.call('%s.%s' % (FLAGS.compute_topic,
250
267
instance_ref['host']),
251
268
{"method": "get_console_output",
256
273
if context.user.is_admin():
257
274
volumes = db.volume_get_all(context)
259
volumes = db.volume_get_by_project(context, context.project.id)
276
volumes = db.volume_get_all_by_project(context, context.project.id)
261
278
volumes = [self._format_volume(context, v) for v in volumes]
265
282
def _format_volume(self, context, volume):
267
v['volumeId'] = volume['str_id']
284
v['volumeId'] = volume['ec2_id']
268
285
v['status'] = volume['status']
269
286
v['size'] = volume['size']
270
287
v['availabilityZone'] = volume['availability_zone']
282
299
'device': volume['mountpoint'],
283
300
'instanceId': volume['instance_id'],
284
301
'status': 'attached',
285
'volume_id': volume['str_id']}]
302
'volume_id': volume['ec2_id']}]
287
304
v['attachmentSet'] = [{}]
306
v['display_name'] = volume['display_name']
307
v['display_description'] = volume['display_description']
290
310
def create_volume(self, context, size, **kwargs):
302
322
vol['availability_zone'] = FLAGS.storage_availability_zone
303
323
vol['status'] = "creating"
304
324
vol['attach_status'] = "detached"
325
vol['display_name'] = kwargs.get('display_name')
326
vol['display_description'] = kwargs.get('display_description')
305
327
volume_ref = db.volume_create(context, vol)
307
329
rpc.cast(FLAGS.scheduler_topic,
316
338
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
317
volume_ref = db.volume_get_by_str(context, volume_id)
339
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
318
340
# TODO(vish): abstract status checking?
319
341
if volume_ref['status'] != "available":
320
342
raise exception.ApiError("Volume status must be available")
321
343
if volume_ref['attach_status'] == "attached":
322
344
raise exception.ApiError("Volume is already attached")
323
instance_ref = db.instance_get_by_str(context, instance_id)
345
internal_id = ec2_id_to_internal_id(instance_id)
346
instance_ref = db.instance_get_by_internal_id(context, internal_id)
324
347
host = instance_ref['host']
325
348
rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host),
326
349
{"method": "attach_volume",
336
359
'volumeId': volume_ref['id']}
338
361
def detach_volume(self, context, volume_id, **kwargs):
339
volume_ref = db.volume_get_by_str(context, volume_id)
362
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
340
363
instance_ref = db.volume_get_instance(context, volume_ref['id'])
341
364
if not instance_ref:
342
365
raise exception.ApiError("Volume isn't attached to anything!")
354
377
# If the instance doesn't exist anymore,
355
378
# then we need to call detach blind
356
379
db.volume_detached(context)
380
internal_id = instance_ref['internal_id']
381
ec2_id = internal_id_to_ec2_id(internal_id)
357
382
return {'attachTime': volume_ref['attach_time'],
358
383
'device': volume_ref['mountpoint'],
359
'instanceId': instance_ref['str_id'],
384
'instanceId': internal_id,
360
385
'requestId': context.request_id,
361
386
'status': volume_ref['attach_status'],
362
387
'volumeId': volume_ref['id']}
369
394
return [{label: x} for x in lst]
396
def update_volume(self, context, volume_id, **kwargs):
397
updatable_fields = ['display_name', 'display_description']
399
for field in updatable_fields:
401
changes[field] = kwargs[field]
403
db.volume_update(context, volume_id, kwargs)
371
406
def describe_instances(self, context, **kwargs):
372
407
return self._format_describe_instances(context)
382
417
def _format_instances(self, context, reservation_id=None):
383
418
reservations = {}
384
419
if reservation_id:
385
instances = db.instance_get_by_reservation(context,
420
instances = db.instance_get_all_by_reservation(context,
388
423
if context.user.is_admin():
389
424
instances = db.instance_get_all(context)
391
instances = db.instance_get_by_project(context,
426
instances = db.instance_get_all_by_project(context,
393
428
for instance in instances:
394
429
if not context.user.is_admin():
395
430
if instance['image_id'] == FLAGS.vpn_image_id:
398
i['instanceId'] = instance['str_id']
433
internal_id = instance['internal_id']
434
ec2_id = internal_id_to_ec2_id(internal_id)
435
i['instanceId'] = ec2_id
399
436
i['imageId'] = instance['image_id']
400
437
i['instanceState'] = {
401
438
'code': instance['state'],
404
441
fixed_addr = None
405
442
floating_addr = None
406
443
if instance['fixed_ip']:
407
fixed_addr = instance['fixed_ip']['str_id']
444
fixed_addr = instance['fixed_ip']['address']
408
445
if instance['fixed_ip']['floating_ips']:
409
446
fixed = instance['fixed_ip']
410
floating_addr = fixed['floating_ips'][0]['str_id']
447
floating_addr = fixed['floating_ips'][0]['address']
411
448
i['privateDnsName'] = fixed_addr
412
449
i['publicDnsName'] = floating_addr
413
450
i['dnsName'] = i['publicDnsName'] or i['privateDnsName']
420
457
i['instanceType'] = instance['instance_type']
421
458
i['launchTime'] = instance['created_at']
422
459
i['amiLaunchIndex'] = instance['launch_index']
460
i['displayName'] = instance['display_name']
461
i['displayDescription'] = instance['display_description']
423
462
if not reservations.has_key(instance['reservation_id']):
425
464
r['reservationId'] = instance['reservation_id']
439
478
if context.user.is_admin():
440
479
iterator = db.floating_ip_get_all(context)
442
iterator = db.floating_ip_get_by_project(context,
481
iterator = db.floating_ip_get_all_by_project(context,
444
483
for floating_ip_ref in iterator:
445
address = floating_ip_ref['str_id']
484
address = floating_ip_ref['address']
446
485
instance_id = None
447
486
if (floating_ip_ref['fixed_ip']
448
487
and floating_ip_ref['fixed_ip']['instance']):
449
instance_id = floating_ip_ref['fixed_ip']['instance']['str_id']
488
internal_id = floating_ip_ref['fixed_ip']['instance']['ec2_id']
489
ec2_id = internal_id_to_ec2_id(internal_id)
450
490
address_rv = {'public_ip': address,
451
'instance_id': instance_id}
491
'instance_id': ec2_id}
452
492
if context.user.is_admin():
453
493
details = "%s (%s)" % (address_rv['instance_id'],
454
494
floating_ip_ref['project_id'])
477
517
rpc.cast(network_topic,
478
518
{"method": "deallocate_floating_ip",
479
519
"args": {"context": None,
480
"floating_address": floating_ip_ref['str_id']}})
520
"floating_address": floating_ip_ref['address']}})
481
521
return {'releaseResponse': ["Address released."]}
483
def associate_address(self, context, instance_id, public_ip, **kwargs):
484
instance_ref = db.instance_get_by_str(context, instance_id)
485
fixed_ip_ref = db.fixed_ip_get_by_instance(context, instance_ref['id'])
523
def associate_address(self, context, ec2_id, public_ip, **kwargs):
524
internal_id = ec2_id_to_internal_id(ec2_id)
525
instance_ref = db.instance_get_by_internal_id(context, internal_id)
526
fixed_address = db.instance_get_fixed_address(context,
486
528
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
487
529
network_topic = self._get_network_topic(context)
488
530
rpc.cast(network_topic,
489
531
{"method": "associate_floating_ip",
490
532
"args": {"context": None,
491
"floating_address": floating_ip_ref['str_id'],
492
"fixed_address": fixed_ip_ref['str_id']}})
533
"floating_address": floating_ip_ref['address'],
534
"fixed_address": fixed_address}})
493
535
return {'associateResponse': ["Address associated."]}
495
537
def disassociate_address(self, context, public_ip, **kwargs):
498
540
rpc.cast(network_topic,
499
541
{"method": "disassociate_floating_ip",
500
542
"args": {"context": None,
501
"floating_address": floating_ip_ref['str_id']}})
543
"floating_address": floating_ip_ref['address']}})
502
544
return {'disassociateResponse': ["Address disassociated."]}
504
546
def _get_network_topic(self, context):
576
618
base_options['user_data'] = kwargs.get('user_data', '')
577
619
base_options['security_group'] = security_group
578
620
base_options['instance_type'] = instance_type
621
base_options['display_name'] = kwargs.get('display_name')
622
base_options['display_description'] = kwargs.get('display_description')
580
624
type_data = INSTANCE_TYPES[instance_type]
581
625
base_options['memory_mb'] = type_data['memory_mb']
590
634
inst['mac_address'] = utils.generate_mac()
591
635
inst['launch_index'] = num
592
inst['hostname'] = instance_ref['str_id']
636
internal_id = instance_ref['internal_id']
637
ec2_id = internal_id_to_ec2_id(internal_id)
638
inst['hostname'] = ec2_id
593
639
db.instance_update(context, inst_id, inst)
594
640
address = self.network_manager.allocate_fixed_ip(context,
613
659
return self._format_run_instances(context, reservation_id)
616
def terminate_instances(self, context, instance_id, **kwargs):
662
def terminate_instances(self, context, ec2_id_list, **kwargs):
617
663
logging.debug("Going to start terminating instances")
618
for id_str in instance_id:
664
for id_str in ec2_id_list:
665
internal_id = ec2_id_to_internal_id(id_str)
619
666
logging.debug("Going to try and terminate %s" % id_str)
621
instance_ref = db.instance_get_by_str(context, id_str)
668
instance_ref = db.instance_get_by_internal_id(context,
622
670
except exception.NotFound:
623
671
logging.warning("Instance %s was not found during terminate"
640
688
rpc.cast(network_topic,
641
689
{"method": "disassociate_floating_ip",
642
690
"args": {"context": None,
643
"address": address}})
691
"floating_address": address}})
645
693
address = db.instance_get_fixed_address(context,
646
694
instance_ref['id'])
664
712
def reboot_instances(self, context, instance_id, **kwargs):
665
713
"""instance_id is a list of instance ids"""
666
714
for id_str in instance_id:
667
instance_ref = db.instance_get_by_str(context, id_str)
668
host = instance_ref['host']
669
rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host),
670
{"method": "reboot_instance",
671
"args": {"context": None,
672
"instance_id": instance_ref['id']}})
715
cloud.reboot(id_str, context=context)
718
def update_instance(self, context, ec2_id, **kwargs):
719
updatable_fields = ['display_name', 'display_description']
721
for field in updatable_fields:
723
changes[field] = kwargs[field]
726
internal_id = ec2_id_to_internal_id(ec2_id)
727
inst = db.instance_get_by_internal_id(db_context, internal_id)
728
db.instance_update(db_context, inst['id'], kwargs)
675
731
def delete_volume(self, context, volume_id, **kwargs):
676
732
# TODO: return error if not authorized
677
volume_ref = db.volume_get_by_str(context, volume_id)
733
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
678
734
if volume_ref['status'] != "available":
679
735
raise exception.ApiError("Volume status must be available")
680
736
now = datetime.datetime.utcnow()
727
783
if not operation_type in ['add', 'remove']:
728
784
raise exception.ApiError('operation_type must be add or remove')
729
785
return images.modify(context, image_id, operation_type)
787
def update_image(self, context, image_id, **kwargs):
788
result = images.update(context, image_id, dict(kwargs))