~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/compute/api.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-01-21 11:48:06 UTC
  • mto: This revision was merged to the branch mainline in revision 9.
  • Revision ID: james.westby@ubuntu.com-20110121114806-v8fvnnl6az4m4ohv
Tags: upstream-2011.1~bzr597
ImportĀ upstreamĀ versionĀ 2011.1~bzr597

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#    under the License.
18
18
 
19
19
"""
20
 
Handles all API requests relating to instances (guest vms).
 
20
Handles all requests relating to instances (guest vms).
21
21
"""
22
22
 
23
23
import datetime
24
 
import logging
 
24
import re
25
25
import time
26
26
 
27
27
from nova import db
28
28
from nova import exception
29
29
from nova import flags
 
30
from nova import log as logging
 
31
from nova import network
30
32
from nova import quota
31
33
from nova import rpc
32
34
from nova import utils
 
35
from nova import volume
33
36
from nova.compute import instance_types
34
37
from nova.db import base
35
38
 
36
39
FLAGS = flags.FLAGS
37
 
 
38
 
 
39
 
def generate_default_hostname(internal_id):
 
40
LOG = logging.getLogger('nova.compute.api')
 
41
 
 
42
 
 
43
def generate_default_hostname(instance_id):
40
44
    """Default function to generate a hostname given an instance reference."""
41
 
    return str(internal_id)
42
 
 
43
 
 
44
 
class ComputeAPI(base.Base):
 
45
    return str(instance_id)
 
46
 
 
47
 
 
48
class API(base.Base):
45
49
    """API for interacting with the compute manager."""
46
50
 
47
 
    def __init__(self, network_manager=None, image_service=None, **kwargs):
48
 
        if not network_manager:
49
 
            network_manager = utils.import_object(FLAGS.network_manager)
50
 
        self.network_manager = network_manager
 
51
    def __init__(self, image_service=None, network_api=None,
 
52
                 volume_api=None, hostname_factory=generate_default_hostname,
 
53
                 **kwargs):
51
54
        if not image_service:
52
55
            image_service = utils.import_object(FLAGS.image_service)
53
56
        self.image_service = image_service
54
 
        super(ComputeAPI, self).__init__(**kwargs)
55
 
 
56
 
    def create_instances(self, context, instance_type, image_id, min_count=1,
57
 
                         max_count=1, kernel_id=None, ramdisk_id=None,
58
 
                         display_name='', description='', key_name=None,
59
 
                         key_data=None, security_group='default',
60
 
                         generate_hostname=generate_default_hostname):
61
 
        """Create the number of instances requested if quote and
 
57
        if not network_api:
 
58
            network_api = network.API()
 
59
        self.network_api = network_api
 
60
        if not volume_api:
 
61
            volume_api = volume.API()
 
62
        self.volume_api = volume_api
 
63
        self.hostname_factory = hostname_factory
 
64
        super(API, self).__init__(**kwargs)
 
65
 
 
66
    def get_network_topic(self, context, instance_id):
 
67
        """Get the network topic for an instance."""
 
68
        try:
 
69
            instance = self.get(context, instance_id)
 
70
        except exception.NotFound as e:
 
71
            LOG.warning(_("Instance %d was not found in get_network_topic"),
 
72
                        instance_id)
 
73
            raise e
 
74
 
 
75
        host = instance['host']
 
76
        if not host:
 
77
            raise exception.Error(_("Instance %d has no host") % instance_id)
 
78
        topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
 
79
        return rpc.call(context,
 
80
                        topic,
 
81
                        {"method": "get_network_topic", "args": {'fake': 1}})
 
82
 
 
83
    def create(self, context, instance_type,
 
84
               image_id, kernel_id=None, ramdisk_id=None,
 
85
               min_count=1, max_count=1,
 
86
               display_name='', display_description='',
 
87
               key_name=None, key_data=None, security_group='default',
 
88
               availability_zone=None, user_data=None):
 
89
        """Create the number of instances requested if quota and
62
90
        other arguments check out ok."""
63
91
 
64
 
        num_instances = quota.allowed_instances(context, max_count,
65
 
                                                instance_type)
 
92
        type_data = instance_types.INSTANCE_TYPES[instance_type]
 
93
        num_instances = quota.allowed_instances(context, max_count, type_data)
66
94
        if num_instances < min_count:
67
 
            logging.warn("Quota exceeeded for %s, tried to run %s instances",
68
 
                         context.project_id, min_count)
69
 
            raise quota.QuotaError("Instance quota exceeded. You can only "
70
 
                                   "run %s more instances of this type." %
 
95
            LOG.warn(_("Quota exceeeded for %s, tried to run %s instances"),
 
96
                     context.project_id, min_count)
 
97
            raise quota.QuotaError(_("Instance quota exceeded. You can only "
 
98
                                     "run %s more instances of this type.") %
71
99
                                   num_instances, "InstanceLimitExceeded")
72
100
 
73
101
        is_vpn = image_id == FLAGS.vpn_image_id
74
102
        if not is_vpn:
75
103
            image = self.image_service.show(context, image_id)
76
104
            if kernel_id is None:
77
 
                kernel_id = image.get('kernelId', FLAGS.default_kernel)
 
105
                kernel_id = image.get('kernelId', None)
78
106
            if ramdisk_id is None:
79
 
                ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
80
 
 
81
 
            # Make sure we have access to kernel and ramdisk
82
 
            self.image_service.show(context, kernel_id)
83
 
            self.image_service.show(context, ramdisk_id)
 
107
                ramdisk_id = image.get('ramdiskId', None)
 
108
            # No kernel and ramdisk for raw images
 
109
            if kernel_id == str(FLAGS.null_kernel):
 
110
                kernel_id = None
 
111
                ramdisk_id = None
 
112
                LOG.debug(_("Creating a raw instance"))
 
113
            # Make sure we have access to kernel and ramdisk (if not raw)
 
114
            logging.debug("Using Kernel=%s, Ramdisk=%s" %
 
115
                           (kernel_id, ramdisk_id))
 
116
            if kernel_id:
 
117
                self.image_service.show(context, kernel_id)
 
118
            if ramdisk_id:
 
119
                self.image_service.show(context, ramdisk_id)
84
120
 
85
121
        if security_group is None:
86
122
            security_group = ['default']
99
135
            key_pair = db.key_pair_get(context, context.user_id, key_name)
100
136
            key_data = key_pair['public_key']
101
137
 
102
 
        type_data = instance_types.INSTANCE_TYPES[instance_type]
103
138
        base_options = {
104
139
            'reservation_id': utils.generate_uid('r'),
105
140
            'image_id': image_id,
106
 
            'kernel_id': kernel_id,
107
 
            'ramdisk_id': ramdisk_id,
 
141
            'kernel_id': kernel_id or '',
 
142
            'ramdisk_id': ramdisk_id or '',
108
143
            'state_description': 'scheduling',
109
144
            'user_id': context.user_id,
110
145
            'project_id': context.project_id,
114
149
            'vcpus': type_data['vcpus'],
115
150
            'local_gb': type_data['local_gb'],
116
151
            'display_name': display_name,
117
 
            'display_description': description,
 
152
            'display_description': display_description,
 
153
            'user_data': user_data or '',
118
154
            'key_name': key_name,
119
 
            'key_data': key_data}
 
155
            'key_data': key_data,
 
156
            'locked': False,
 
157
            'availability_zone': availability_zone}
120
158
 
121
159
        elevated = context.elevated()
122
160
        instances = []
123
 
        logging.debug("Going to run %s instances...", num_instances)
 
161
        LOG.debug(_("Going to run %s instances..."), num_instances)
124
162
        for num in range(num_instances):
125
163
            instance = dict(mac_address=utils.generate_mac(),
126
164
                            launch_index=num,
127
165
                            **base_options)
128
166
            instance = self.db.instance_create(context, instance)
129
167
            instance_id = instance['id']
130
 
            internal_id = instance['internal_id']
131
168
 
132
169
            elevated = context.elevated()
133
170
            if not security_groups:
138
175
                                                    security_group_id)
139
176
 
140
177
            # Set sane defaults if not specified
141
 
            updates = dict(hostname=generate_hostname(internal_id))
142
 
            if 'display_name' not in instance:
143
 
                updates['display_name'] = "Server %s" % internal_id
 
178
            updates = dict(hostname=self.hostname_factory(instance_id))
 
179
            if (not hasattr(instance, 'display_name') or
 
180
                    instance.display_name == None):
 
181
                updates['display_name'] = "Server %s" % instance_id
144
182
 
145
 
            instance = self.update_instance(context, instance_id, **updates)
 
183
            instance = self.update(context, instance_id, **updates)
146
184
            instances.append(instance)
147
185
 
148
 
            # TODO(vish): This probably should be done in the scheduler
149
 
            #             or in compute as a call.  The network should be
150
 
            #             allocated after the host is assigned and setup
151
 
            #             can happen at the same time.
152
 
            address = self.network_manager.allocate_fixed_ip(context,
153
 
                                                             instance_id,
154
 
                                                             is_vpn)
155
 
            rpc.cast(elevated,
156
 
                     self._get_network_topic(context),
157
 
                     {"method": "setup_fixed_ip",
158
 
                      "args": {"address": address}})
159
 
 
160
 
            logging.debug("Casting to scheduler for %s/%s's instance %s",
 
186
            LOG.debug(_("Casting to scheduler for %s/%s's instance %s"),
161
187
                          context.project_id, context.user_id, instance_id)
162
188
            rpc.cast(context,
163
189
                     FLAGS.scheduler_topic,
164
190
                     {"method": "run_instance",
165
191
                      "args": {"topic": FLAGS.compute_topic,
166
 
                               "instance_id": instance_id}})
167
 
 
168
 
        return instances
 
192
                               "instance_id": instance_id,
 
193
                               "availability_zone": availability_zone}})
 
194
 
 
195
        for group_id in security_groups:
 
196
            self.trigger_security_group_members_refresh(elevated, group_id)
 
197
 
 
198
        return [dict(x.iteritems()) for x in instances]
169
199
 
170
200
    def ensure_default_security_group(self, context):
171
201
        """ Create security group for the security context if it
184
214
                      'project_id': context.project_id}
185
215
            db.security_group_create(context, values)
186
216
 
187
 
    def update_instance(self, context, instance_id, **kwargs):
 
217
    def trigger_security_group_rules_refresh(self, context, security_group_id):
 
218
        """Called when a rule is added to or removed from a security_group"""
 
219
 
 
220
        security_group = self.db.security_group_get(context, security_group_id)
 
221
 
 
222
        hosts = set()
 
223
        for instance in security_group['instances']:
 
224
            if instance['host'] is not None:
 
225
                hosts.add(instance['host'])
 
226
 
 
227
        for host in hosts:
 
228
            rpc.cast(context,
 
229
                     self.db.queue_get_for(context, FLAGS.compute_topic, host),
 
230
                     {"method": "refresh_security_group_rules",
 
231
                      "args": {"security_group_id": security_group.id}})
 
232
 
 
233
    def trigger_security_group_members_refresh(self, context, group_id):
 
234
        """Called when a security group gains a new or loses a member
 
235
 
 
236
        Sends an update request to each compute node for whom this is
 
237
        relevant."""
 
238
 
 
239
        # First, we get the security group rules that reference this group as
 
240
        # the grantee..
 
241
        security_group_rules = \
 
242
                self.db.security_group_rule_get_by_security_group_grantee(
 
243
                                                                     context,
 
244
                                                                     group_id)
 
245
 
 
246
        # ..then we distill the security groups to which they belong..
 
247
        security_groups = set()
 
248
        for rule in security_group_rules:
 
249
            security_group = self.db.security_group_get(
 
250
                                                    context,
 
251
                                                    rule['parent_group_id'])
 
252
            security_groups.add(security_group)
 
253
 
 
254
        # ..then we find the instances that are members of these groups..
 
255
        instances = set()
 
256
        for security_group in security_groups:
 
257
            for instance in security_group['instances']:
 
258
                instances.add(instance)
 
259
 
 
260
        # ...then we find the hosts where they live...
 
261
        hosts = set()
 
262
        for instance in instances:
 
263
            if instance['host']:
 
264
                hosts.add(instance['host'])
 
265
 
 
266
        # ...and finally we tell these nodes to refresh their view of this
 
267
        # particular security group.
 
268
        for host in hosts:
 
269
            rpc.cast(context,
 
270
                     self.db.queue_get_for(context, FLAGS.compute_topic, host),
 
271
                     {"method": "refresh_security_group_members",
 
272
                      "args": {"security_group_id": group_id}})
 
273
 
 
274
    def update(self, context, instance_id, **kwargs):
188
275
        """Updates the instance in the datastore.
189
276
 
190
277
        :param context: The security context
196
283
        :retval None
197
284
 
198
285
        """
199
 
        return self.db.instance_update(context, instance_id, kwargs)
 
286
        rv = self.db.instance_update(context, instance_id, kwargs)
 
287
        return dict(rv.iteritems())
200
288
 
201
 
    def delete_instance(self, context, instance_id):
202
 
        logging.debug("Going to try and terminate %d" % instance_id)
 
289
    def delete(self, context, instance_id):
 
290
        LOG.debug(_("Going to try to terminate %s"), instance_id)
203
291
        try:
204
 
            instance = self.db.instance_get_by_internal_id(context,
205
 
                                                           instance_id)
 
292
            instance = self.get(context, instance_id)
206
293
        except exception.NotFound as e:
207
 
            logging.warning("Instance %d was not found during terminate",
208
 
                            instance_id)
 
294
            LOG.warning(_("Instance %d was not found during terminate"),
 
295
                        instance_id)
209
296
            raise e
210
297
 
211
298
        if (instance['state_description'] == 'terminating'):
212
 
            logging.warning("Instance %d is already being terminated",
213
 
                            instance_id)
 
299
            LOG.warning(_("Instance %d is already being terminated"),
 
300
                        instance_id)
214
301
            return
215
302
 
216
 
        self.update_instance(context,
217
 
                             instance['id'],
218
 
                             state_description='terminating',
219
 
                             state=0,
220
 
                             terminated_at=datetime.datetime.utcnow())
221
 
 
222
 
        # FIXME(ja): where should network deallocate occur?
223
 
        address = self.db.instance_get_floating_address(context,
224
 
                                                        instance['id'])
225
 
        if address:
226
 
            logging.debug("Disassociating address %s" % address)
227
 
            # NOTE(vish): Right now we don't really care if the ip is
228
 
            #             disassociated.  We may need to worry about
229
 
            #             checking this later.  Perhaps in the scheduler?
230
 
            rpc.cast(context,
231
 
                     self._get_network_topic(context),
232
 
                     {"method": "disassociate_floating_ip",
233
 
                      "args": {"floating_address": address}})
234
 
 
235
 
        address = self.db.instance_get_fixed_address(context, instance['id'])
236
 
        if address:
237
 
            logging.debug("Deallocating address %s" % address)
238
 
            # NOTE(vish): Currently, nothing needs to be done on the
239
 
            #             network node until release. If this changes,
240
 
            #             we will need to cast here.
241
 
            self.network_manager.deallocate_fixed_ip(context.elevated(),
242
 
                                                     address)
 
303
        self.update(context,
 
304
                    instance['id'],
 
305
                    state_description='terminating',
 
306
                    state=0,
 
307
                    terminated_at=datetime.datetime.utcnow())
243
308
 
244
309
        host = instance['host']
245
310
        if host:
246
 
            rpc.cast(context,
247
 
                     self.db.queue_get_for(context, FLAGS.compute_topic, host),
248
 
                     {"method": "terminate_instance",
249
 
                      "args": {"instance_id": instance['id']}})
 
311
            self._cast_compute_message('terminate_instance', context,
 
312
                    instance_id, host)
250
313
        else:
251
 
            self.db.instance_destroy(context, instance['id'])
252
 
 
253
 
    def get_instances(self, context, project_id=None):
254
 
        """Get all instances, possibly filtered by project ID or
255
 
        user ID. If there is no filter and the context is an admin,
256
 
        it will retreive all instances in the system."""
 
314
            self.db.instance_destroy(context, instance_id)
 
315
 
 
316
    def get(self, context, instance_id):
 
317
        """Get a single instance with the given ID."""
 
318
        rv = self.db.instance_get_by_id(context, instance_id)
 
319
        return dict(rv.iteritems())
 
320
 
 
321
    def get_all(self, context, project_id=None, reservation_id=None,
 
322
                fixed_ip=None):
 
323
        """Get all instances, possibly filtered by one of the
 
324
        given parameters. If there is no filter and the context is
 
325
        an admin, it will retreive all instances in the system."""
 
326
        if reservation_id is not None:
 
327
            return self.db.instance_get_all_by_reservation(context,
 
328
                                                             reservation_id)
 
329
        if fixed_ip is not None:
 
330
            return self.db.fixed_ip_get_instance(context, fixed_ip)
257
331
        if project_id or not context.is_admin:
258
332
            if not context.project:
259
333
                return self.db.instance_get_all_by_user(context,
260
334
                                                        context.user_id)
261
335
            if project_id is None:
262
336
                project_id = context.project_id
263
 
            return self.db.instance_get_all_by_project(context, project_id)
 
337
            return self.db.instance_get_all_by_project(context,
 
338
            project_id)
264
339
        return self.db.instance_get_all(context)
265
340
 
266
 
    def get_instance(self, context, instance_id):
267
 
        return self.db.instance_get_by_internal_id(context, instance_id)
 
341
    def _cast_compute_message(self, method, context, instance_id, host=None,
 
342
                              params=None):
 
343
        """Generic handler for RPC casts to compute.
 
344
 
 
345
        :param params: Optional dictionary of arguments to be passed to the
 
346
                       compute worker
 
347
 
 
348
        :retval None
 
349
        """
 
350
        if not params:
 
351
            params = {}
 
352
        if not host:
 
353
            instance = self.get(context, instance_id)
 
354
            host = instance['host']
 
355
        queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
 
356
        params['instance_id'] = instance_id
 
357
        kwargs = {'method': method, 'args': params}
 
358
        rpc.cast(context, queue, kwargs)
 
359
 
 
360
    def _call_compute_message(self, method, context, instance_id, host=None,
 
361
                              params=None):
 
362
        """Generic handler for RPC calls to compute.
 
363
 
 
364
        :param params: Optional dictionary of arguments to be passed to the
 
365
                       compute worker
 
366
 
 
367
        :retval: Result returned by compute worker
 
368
        """
 
369
        if not params:
 
370
            params = {}
 
371
        if not host:
 
372
            instance = self.get(context, instance_id)
 
373
            host = instance["host"]
 
374
        queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
 
375
        params['instance_id'] = instance_id
 
376
        kwargs = {'method': method, 'args': params}
 
377
        return rpc.call(context, queue, kwargs)
 
378
 
 
379
    def snapshot(self, context, instance_id, name):
 
380
        """Snapshot the given instance.
 
381
 
 
382
        :retval: A dict containing image metadata
 
383
        """
 
384
        data = {'name': name, 'is_public': False}
 
385
        image_meta = self.image_service.create(context, data)
 
386
        params = {'image_id': image_meta['id']}
 
387
        self._cast_compute_message('snapshot_instance', context, instance_id,
 
388
                                   params=params)
 
389
        return image_meta
268
390
 
269
391
    def reboot(self, context, instance_id):
270
392
        """Reboot the given instance."""
271
 
        instance = self.db.instance_get_by_internal_id(context, instance_id)
272
 
        host = instance['host']
273
 
        rpc.cast(context,
274
 
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
275
 
                 {"method": "reboot_instance",
276
 
                  "args": {"instance_id": instance['id']}})
 
393
        self._cast_compute_message('reboot_instance', context, instance_id)
 
394
 
 
395
    def pause(self, context, instance_id):
 
396
        """Pause the given instance."""
 
397
        self._cast_compute_message('pause_instance', context, instance_id)
 
398
 
 
399
    def unpause(self, context, instance_id):
 
400
        """Unpause the given instance."""
 
401
        self._cast_compute_message('unpause_instance', context, instance_id)
 
402
 
 
403
    def get_diagnostics(self, context, instance_id):
 
404
        """Retrieve diagnostics for the given instance."""
 
405
        return self._call_compute_message(
 
406
            "get_diagnostics",
 
407
            context,
 
408
            instance_id)
 
409
 
 
410
    def get_actions(self, context, instance_id):
 
411
        """Retrieve actions for the given instance."""
 
412
        return self.db.instance_get_actions(context, instance_id)
 
413
 
 
414
    def suspend(self, context, instance_id):
 
415
        """suspend the instance with instance_id"""
 
416
        self._cast_compute_message('suspend_instance', context, instance_id)
 
417
 
 
418
    def resume(self, context, instance_id):
 
419
        """resume the instance with instance_id"""
 
420
        self._cast_compute_message('resume_instance', context, instance_id)
277
421
 
278
422
    def rescue(self, context, instance_id):
279
423
        """Rescue the given instance."""
280
 
        instance = self.db.instance_get_by_internal_id(context, instance_id)
281
 
        host = instance['host']
282
 
        rpc.cast(context,
283
 
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
284
 
                 {"method": "rescue_instance",
285
 
                  "args": {"instance_id": instance['id']}})
 
424
        self._cast_compute_message('rescue_instance', context, instance_id)
286
425
 
287
426
    def unrescue(self, context, instance_id):
288
427
        """Unrescue the given instance."""
289
 
        instance = self.db.instance_get_by_internal_id(context, instance_id)
290
 
        host = instance['host']
291
 
        rpc.cast(context,
292
 
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
293
 
                 {"method": "unrescue_instance",
294
 
                  "args": {"instance_id": instance['id']}})
295
 
 
296
 
    def _get_network_topic(self, context):
297
 
        """Retrieves the network host for a project"""
298
 
        network_ref = self.network_manager.get_network(context)
299
 
        host = network_ref['host']
300
 
        if not host:
301
 
            host = rpc.call(context,
302
 
                            FLAGS.network_topic,
303
 
                            {"method": "set_network_host",
304
 
                             "args": {"network_id": network_ref['id']}})
305
 
        return self.db.queue_get_for(context, FLAGS.network_topic, host)
 
428
        self._cast_compute_message('unrescue_instance', context, instance_id)
 
429
 
 
430
    def set_admin_password(self, context, instance_id):
 
431
        """Set the root/admin password for the given instance."""
 
432
        self._cast_compute_message('set_admin_password', context, instance_id)
 
433
 
 
434
    def get_ajax_console(self, context, instance_id):
 
435
        """Get a url to an AJAX Console"""
 
436
        instance = self.get(context, instance_id)
 
437
        output = self._call_compute_message('get_ajax_console',
 
438
                                            context,
 
439
                                            instance_id)
 
440
        rpc.cast(context, '%s' % FLAGS.ajax_console_proxy_topic,
 
441
                 {'method': 'authorize_ajax_console',
 
442
                  'args': {'token': output['token'], 'host': output['host'],
 
443
                  'port': output['port']}})
 
444
        return {'url': '%s?token=%s' % (FLAGS.ajax_console_proxy_url,
 
445
                output['token'])}
 
446
 
 
447
    def get_console_output(self, context, instance_id):
 
448
        """Get console output for an an instance"""
 
449
        return self._call_compute_message('get_console_output',
 
450
                                          context,
 
451
                                          instance_id)
 
452
 
 
453
    def lock(self, context, instance_id):
 
454
        """lock the instance with instance_id"""
 
455
        self._cast_compute_message('lock_instance', context, instance_id)
 
456
 
 
457
    def unlock(self, context, instance_id):
 
458
        """unlock the instance with instance_id"""
 
459
        self._cast_compute_message('unlock_instance', context, instance_id)
 
460
 
 
461
    def get_lock(self, context, instance_id):
 
462
        """return the boolean state of (instance with instance_id)'s lock"""
 
463
        instance = self.get(context, instance_id)
 
464
        return instance['locked']
 
465
 
 
466
    def attach_volume(self, context, instance_id, volume_id, device):
 
467
        if not re.match("^/dev/[a-z]d[a-z]+$", device):
 
468
            raise exception.ApiError(_("Invalid device specified: %s. "
 
469
                                     "Example device: /dev/vdb") % device)
 
470
        self.volume_api.check_attach(context, volume_id)
 
471
        instance = self.get(context, instance_id)
 
472
        host = instance['host']
 
473
        rpc.cast(context,
 
474
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
 
475
                 {"method": "attach_volume",
 
476
                  "args": {"volume_id": volume_id,
 
477
                           "instance_id": instance_id,
 
478
                           "mountpoint": device}})
 
479
 
 
480
    def detach_volume(self, context, volume_id):
 
481
        instance = self.db.volume_get_instance(context.elevated(), volume_id)
 
482
        if not instance:
 
483
            raise exception.ApiError(_("Volume isn't attached to anything!"))
 
484
        self.volume_api.check_detach(context, volume_id)
 
485
        host = instance['host']
 
486
        rpc.cast(context,
 
487
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
 
488
                 {"method": "detach_volume",
 
489
                  "args": {"instance_id": instance['id'],
 
490
                           "volume_id": volume_id}})
 
491
        return instance
 
492
 
 
493
    def associate_floating_ip(self, context, instance_id, address):
 
494
        instance = self.get(context, instance_id)
 
495
        self.network_api.associate_floating_ip(context, address,
 
496
                                               instance['fixed_ip'])