~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/virt/xenapi_conn.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright (c) 2010 Citrix Systems, Inc.
4
 
# Copyright 2010 OpenStack LLC.
5
 
#
6
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7
 
#    not use this file except in compliance with the License. You may obtain
8
 
#    a copy of the License at
9
 
#
10
 
#         http://www.apache.org/licenses/LICENSE-2.0
11
 
#
12
 
#    Unless required by applicable law or agreed to in writing, software
13
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
 
#    License for the specific language governing permissions and limitations
16
 
#    under the License.
17
 
 
18
 
"""
19
 
A connection to XenServer or Xen Cloud Platform.
20
 
 
21
 
The concurrency model for this class is as follows:
22
 
 
23
 
All XenAPI calls are on a green thread (using eventlet's "tpool"
24
 
thread pool). They are remote calls, and so may hang for the usual
25
 
reasons.
26
 
 
27
 
**Related Flags**
28
 
 
29
 
:xenapi_connection_url:  URL for connection to XenServer/Xen Cloud Platform.
30
 
:xenapi_connection_username:  Username for connection to XenServer/Xen Cloud
31
 
                              Platform (default: root).
32
 
:xenapi_connection_password:  Password for connection to XenServer/Xen Cloud
33
 
                              Platform.
34
 
:target_host:                the iSCSI Target Host IP address, i.e. the IP
35
 
                             address for the nova-volume host
36
 
:target_port:                iSCSI Target Port, 3260 Default
37
 
:iqn_prefix:                 IQN Prefix, e.g. 'iqn.2010-10.org.openstack'
38
 
 
39
 
**Variable Naming Scheme**
40
 
 
41
 
- suffix "_ref" for opaque references
42
 
- suffix "_uuid" for UUIDs
43
 
- suffix "_rec" for record objects
44
 
"""
45
 
 
46
 
import contextlib
47
 
import time
48
 
import urlparse
49
 
import xmlrpclib
50
 
 
51
 
from eventlet import greenthread
52
 
from eventlet import queue
53
 
from eventlet import tpool
54
 
from eventlet import timeout
55
 
 
56
 
from nova import context
57
 
from nova import db
58
 
from nova import exception
59
 
from nova import flags
60
 
from nova import log as logging
61
 
from nova.openstack.common import cfg
62
 
from nova.virt import driver
63
 
from nova.virt.xenapi import host
64
 
from nova.virt.xenapi import pool
65
 
from nova.virt.xenapi import vmops
66
 
from nova.virt.xenapi import volumeops
67
 
 
68
 
 
69
 
LOG = logging.getLogger(__name__)
70
 
 
71
 
xenapi_opts = [
72
 
    cfg.StrOpt('xenapi_connection_url',
73
 
               default=None,
74
 
               help='URL for connection to XenServer/Xen Cloud Platform. '
75
 
                    'Required if connection_type=xenapi.'),
76
 
    cfg.StrOpt('xenapi_connection_username',
77
 
               default='root',
78
 
               help='Username for connection to XenServer/Xen Cloud Platform. '
79
 
                    'Used only if connection_type=xenapi.'),
80
 
    cfg.StrOpt('xenapi_connection_password',
81
 
               default=None,
82
 
               help='Password for connection to XenServer/Xen Cloud Platform. '
83
 
                    'Used only if connection_type=xenapi.'),
84
 
    cfg.IntOpt('xenapi_connection_concurrent',
85
 
               default=5,
86
 
               help='Maximum number of concurrent XenAPI connections. '
87
 
                    'Used only if connection_type=xenapi.'),
88
 
    cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval',
89
 
                 default=5.0,
90
 
                 help='The interval used for polling of coalescing vhds. '
91
 
                      'Used only if connection_type=xenapi.'),
92
 
    cfg.IntOpt('xenapi_vhd_coalesce_max_attempts',
93
 
               default=5,
94
 
               help='Max number of times to poll for VHD to coalesce. '
95
 
                    'Used only if connection_type=xenapi.'),
96
 
    cfg.StrOpt('xenapi_agent_path',
97
 
               default='usr/sbin/xe-update-networking',
98
 
               help='Specifies the path in which the xenapi guest agent '
99
 
                    'should be located. If the agent is present, network '
100
 
                    'configuration is not injected into the image. '
101
 
                    'Used if connection_type=xenapi and flat_injected=True'),
102
 
    cfg.StrOpt('xenapi_sr_base_path',
103
 
               default='/var/run/sr-mount',
104
 
               help='Base path to the storage repository'),
105
 
    cfg.StrOpt('target_host',
106
 
               default=None,
107
 
               help='iSCSI Target Host'),
108
 
    cfg.StrOpt('target_port',
109
 
               default='3260',
110
 
               help='iSCSI Target Port, 3260 Default'),
111
 
    cfg.StrOpt('iqn_prefix',
112
 
               default='iqn.2010-10.org.openstack',
113
 
               help='IQN Prefix'),
114
 
    # NOTE(sirp): This is a work-around for a bug in Ubuntu Maverick,
115
 
    # when we pull support for it, we should remove this
116
 
    cfg.BoolOpt('xenapi_remap_vbd_dev',
117
 
                default=False,
118
 
                help='Used to enable the remapping of VBD dev '
119
 
                     '(Works around an issue in Ubuntu Maverick)'),
120
 
    cfg.StrOpt('xenapi_remap_vbd_dev_prefix',
121
 
               default='sd',
122
 
               help='Specify prefix to remap VBD dev to '
123
 
                    '(ex. /dev/xvdb -> /dev/sdb)'),
124
 
    cfg.IntOpt('xenapi_login_timeout',
125
 
               default=10,
126
 
               help='Timeout in seconds for XenAPI login.'),
127
 
    ]
128
 
 
129
 
FLAGS = flags.FLAGS
130
 
FLAGS.register_opts(xenapi_opts)
131
 
 
132
 
 
133
 
def get_connection(_read_only):
134
 
    """Note that XenAPI doesn't have a read-only connection mode, so
135
 
    the read_only parameter is ignored."""
136
 
    url = FLAGS.xenapi_connection_url
137
 
    username = FLAGS.xenapi_connection_username
138
 
    password = FLAGS.xenapi_connection_password
139
 
    if not url or password is None:
140
 
        raise Exception(_('Must specify xenapi_connection_url, '
141
 
                          'xenapi_connection_username (optionally), and '
142
 
                          'xenapi_connection_password to use '
143
 
                          'connection_type=xenapi'))
144
 
    return XenAPIConnection(url, username, password)
145
 
 
146
 
 
147
 
class XenAPIConnection(driver.ComputeDriver):
148
 
    """A connection to XenServer or Xen Cloud Platform"""
149
 
 
150
 
    def __init__(self, url, user, pw):
151
 
        super(XenAPIConnection, self).__init__()
152
 
        self._session = XenAPISession(url, user, pw)
153
 
        self._volumeops = volumeops.VolumeOps(self._session)
154
 
        self._host_state = None
155
 
        self._host = host.Host(self._session)
156
 
        self._product_version = self._session.get_product_version()
157
 
        self._vmops = vmops.VMOps(self._session, self._product_version)
158
 
        self._initiator = None
159
 
        self._pool = pool.ResourcePool(self._session)
160
 
 
161
 
    @property
162
 
    def host_state(self):
163
 
        if not self._host_state:
164
 
            self._host_state = host.HostState(self._session)
165
 
        return self._host_state
166
 
 
167
 
    def init_host(self, host):
168
 
        #FIXME(armando): implement this
169
 
        #NOTE(armando): would we need a method
170
 
        #to call when shutting down the host?
171
 
        #e.g. to do session logout?
172
 
        pass
173
 
 
174
 
    def list_instances(self):
175
 
        """List VM instances"""
176
 
        return self._vmops.list_instances()
177
 
 
178
 
    def list_instances_detail(self):
179
 
        return self._vmops.list_instances_detail()
180
 
 
181
 
    def spawn(self, context, instance, image_meta,
182
 
              network_info=None, block_device_info=None):
183
 
        """Create VM instance"""
184
 
        self._vmops.spawn(context, instance, image_meta, network_info)
185
 
 
186
 
    def confirm_migration(self, migration, instance, network_info):
187
 
        """Confirms a resize, destroying the source VM"""
188
 
        # TODO(Vek): Need to pass context in for access to auth_token
189
 
        self._vmops.confirm_migration(migration, instance, network_info)
190
 
 
191
 
    def finish_revert_migration(self, instance, network_info):
192
 
        """Finish reverting a resize, powering back on the instance"""
193
 
        # NOTE(vish): Xen currently does not use network info.
194
 
        self._vmops.finish_revert_migration(instance)
195
 
 
196
 
    def finish_migration(self, context, migration, instance, disk_info,
197
 
                         network_info, image_meta, resize_instance=False):
198
 
        """Completes a resize, turning on the migrated instance"""
199
 
        self._vmops.finish_migration(context, migration, instance, disk_info,
200
 
                                     network_info, image_meta, resize_instance)
201
 
 
202
 
    def snapshot(self, context, instance, image_id):
203
 
        """ Create snapshot from a running VM instance """
204
 
        self._vmops.snapshot(context, instance, image_id)
205
 
 
206
 
    def reboot(self, instance, network_info, reboot_type):
207
 
        """Reboot VM instance"""
208
 
        self._vmops.reboot(instance, reboot_type)
209
 
 
210
 
    def set_admin_password(self, instance, new_pass):
211
 
        """Set the root/admin password on the VM instance"""
212
 
        self._vmops.set_admin_password(instance, new_pass)
213
 
 
214
 
    def inject_file(self, instance, b64_path, b64_contents):
215
 
        """Create a file on the VM instance. The file path and contents
216
 
        should be base64-encoded.
217
 
        """
218
 
        self._vmops.inject_file(instance, b64_path, b64_contents)
219
 
 
220
 
    def destroy(self, instance, network_info, block_device_info=None):
221
 
        """Destroy VM instance"""
222
 
        self._vmops.destroy(instance, network_info)
223
 
 
224
 
    def pause(self, instance):
225
 
        """Pause VM instance"""
226
 
        self._vmops.pause(instance)
227
 
 
228
 
    def unpause(self, instance):
229
 
        """Unpause paused VM instance"""
230
 
        self._vmops.unpause(instance)
231
 
 
232
 
    def migrate_disk_and_power_off(self, context, instance, dest,
233
 
                                   instance_type, network_info):
234
 
        """Transfers the VHD of a running instance to another host, then shuts
235
 
        off the instance copies over the COW disk"""
236
 
        # NOTE(vish): Xen currently does not use network info.
237
 
        return self._vmops.migrate_disk_and_power_off(context, instance,
238
 
                                                      dest, instance_type)
239
 
 
240
 
    def suspend(self, instance):
241
 
        """suspend the specified instance"""
242
 
        self._vmops.suspend(instance)
243
 
 
244
 
    def resume(self, instance):
245
 
        """resume the specified instance"""
246
 
        self._vmops.resume(instance)
247
 
 
248
 
    def rescue(self, context, instance, network_info, image_meta):
249
 
        """Rescue the specified instance"""
250
 
        self._vmops.rescue(context, instance, network_info, image_meta)
251
 
 
252
 
    def unrescue(self, instance, network_info):
253
 
        """Unrescue the specified instance"""
254
 
        self._vmops.unrescue(instance)
255
 
 
256
 
    def power_off(self, instance):
257
 
        """Power off the specified instance"""
258
 
        self._vmops.power_off(instance)
259
 
 
260
 
    def power_on(self, instance):
261
 
        """Power on the specified instance"""
262
 
        self._vmops.power_on(instance)
263
 
 
264
 
    def poll_rebooting_instances(self, timeout):
265
 
        """Poll for rebooting instances"""
266
 
        self._vmops.poll_rebooting_instances(timeout)
267
 
 
268
 
    def poll_rescued_instances(self, timeout):
269
 
        """Poll for rescued instances"""
270
 
        self._vmops.poll_rescued_instances(timeout)
271
 
 
272
 
    def poll_unconfirmed_resizes(self, resize_confirm_window):
273
 
        """Poll for unconfirmed resizes"""
274
 
        self._vmops.poll_unconfirmed_resizes(resize_confirm_window)
275
 
 
276
 
    def reset_network(self, instance):
277
 
        """reset networking for specified instance"""
278
 
        self._vmops.reset_network(instance)
279
 
 
280
 
    def inject_network_info(self, instance, network_info):
281
 
        """inject network info for specified instance"""
282
 
        self._vmops.inject_network_info(instance, network_info)
283
 
 
284
 
    def plug_vifs(self, instance_ref, network_info):
285
 
        """Plug VIFs into networks."""
286
 
        self._vmops.plug_vifs(instance_ref, network_info)
287
 
 
288
 
    def unplug_vifs(self, instance_ref, network_info):
289
 
        """Unplug VIFs from networks."""
290
 
        self._vmops.unplug_vifs(instance_ref, network_info)
291
 
 
292
 
    def get_info(self, instance):
293
 
        """Return data about VM instance"""
294
 
        return self._vmops.get_info(instance)
295
 
 
296
 
    def get_diagnostics(self, instance):
297
 
        """Return data about VM diagnostics"""
298
 
        return self._vmops.get_diagnostics(instance)
299
 
 
300
 
    def get_all_bw_usage(self, start_time, stop_time=None):
301
 
        """Return bandwidth usage info for each interface on each
302
 
           running VM"""
303
 
        bwusage = []
304
 
        start_time = time.mktime(start_time.timetuple())
305
 
        if stop_time:
306
 
            stop_time = time.mktime(stop_time.timetuple())
307
 
        for iusage in self._vmops.get_all_bw_usage(start_time,
308
 
                                                   stop_time).values():
309
 
            for macaddr, usage in iusage.iteritems():
310
 
                bwusage.append(dict(mac_address=macaddr,
311
 
                                    bw_in=usage['bw_in'],
312
 
                                    bw_out=usage['bw_out']))
313
 
        return bwusage
314
 
 
315
 
    def get_console_output(self, instance):
316
 
        """Return snapshot of console"""
317
 
        return self._vmops.get_console_output(instance)
318
 
 
319
 
    def get_vnc_console(self, instance):
320
 
        """Return link to instance's VNC console"""
321
 
        return self._vmops.get_vnc_console(instance)
322
 
 
323
 
    def get_volume_connector(self, _instance):
324
 
        """Return volume connector information"""
325
 
        if not self._initiator:
326
 
            stats = self.get_host_stats(refresh=True)
327
 
            try:
328
 
                self._initiator = stats['host_other-config']['iscsi_iqn']
329
 
            except (TypeError, KeyError):
330
 
                LOG.warn(_('Could not determine iscsi initiator name'))
331
 
                self._initiator = None
332
 
        return {
333
 
            'ip': self.get_host_ip_addr(),
334
 
            'initiator': self._initiator
335
 
        }
336
 
 
337
 
    @staticmethod
338
 
    def get_host_ip_addr():
339
 
        xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
340
 
        return xs_url.netloc
341
 
 
342
 
    def attach_volume(self, connection_info, instance_name, mountpoint):
343
 
        """Attach volume storage to VM instance"""
344
 
        return self._volumeops.attach_volume(connection_info,
345
 
                                             instance_name,
346
 
                                             mountpoint)
347
 
 
348
 
    def detach_volume(self, connection_info, instance_name, mountpoint):
349
 
        """Detach volume storage to VM instance"""
350
 
        return self._volumeops.detach_volume(connection_info,
351
 
                                             instance_name,
352
 
                                             mountpoint)
353
 
 
354
 
    def get_console_pool_info(self, console_type):
355
 
        xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
356
 
        return  {'address': xs_url.netloc,
357
 
                 'username': FLAGS.xenapi_connection_username,
358
 
                 'password': FLAGS.xenapi_connection_password}
359
 
 
360
 
    def update_available_resource(self, ctxt, host):
361
 
        """Updates compute manager resource info on ComputeNode table.
362
 
 
363
 
        This method is called when nova-compute launches, and
364
 
        whenever admin executes "nova-manage service update_resource".
365
 
 
366
 
        :param ctxt: security context
367
 
        :param host: hostname that compute manager is currently running
368
 
 
369
 
        """
370
 
        try:
371
 
            service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
372
 
        except exception.NotFound:
373
 
            raise exception.ComputeServiceUnavailable(host=host)
374
 
 
375
 
        host_stats = self.get_host_stats(refresh=True)
376
 
 
377
 
        # Updating host information
378
 
        total_ram_mb = host_stats['host_memory_total'] / (1024 * 1024)
379
 
        free_ram_mb = host_stats['host_memory_free'] / (1024 * 1024)
380
 
        total_disk_gb = host_stats['disk_total'] / (1024 * 1024 * 1024)
381
 
        used_disk_gb = host_stats['disk_used'] / (1024 * 1024 * 1024)
382
 
 
383
 
        dic = {'vcpus': 0,
384
 
               'memory_mb': total_ram_mb,
385
 
               'local_gb': total_disk_gb,
386
 
               'vcpus_used': 0,
387
 
               'memory_mb_used': total_ram_mb - free_ram_mb,
388
 
               'local_gb_used': used_disk_gb,
389
 
               'hypervisor_type': 'xen',
390
 
               'hypervisor_version': 0,
391
 
               'hypervisor_hostname': host_stats['host_hostname'],
392
 
               'service_id': service_ref['id'],
393
 
               'cpu_info': host_stats['host_cpu_info']['cpu_count']}
394
 
 
395
 
        compute_node_ref = service_ref['compute_node']
396
 
        if not compute_node_ref:
397
 
            LOG.info(_('Compute_service record created for %s ') % host)
398
 
            db.compute_node_create(ctxt, dic)
399
 
        else:
400
 
            LOG.info(_('Compute_service record updated for %s ') % host)
401
 
            db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
402
 
 
403
 
    def compare_cpu(self, xml):
404
 
        """This method is supported only by libvirt."""
405
 
        raise NotImplementedError('This method is supported only by libvirt.')
406
 
 
407
 
    def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
408
 
        """This method is supported only libvirt."""
409
 
        # NOTE(salvatore-orlando): it enforces security groups on
410
 
        # host initialization and live migration.
411
 
        # Live migration is not supported by XenAPI (as of 2011-11-09)
412
 
        # In XenAPI we do not assume instances running upon host initialization
413
 
        return
414
 
 
415
 
    def live_migration(self, context, instance_ref, dest,
416
 
                       post_method, recover_method, block_migration=False):
417
 
        """This method is supported only by libvirt."""
418
 
        return
419
 
 
420
 
    def unfilter_instance(self, instance_ref, network_info):
421
 
        """Removes security groups configured for an instance."""
422
 
        return self._vmops.unfilter_instance(instance_ref, network_info)
423
 
 
424
 
    def refresh_security_group_rules(self, security_group_id):
425
 
        """ Updates security group rules for all instances
426
 
            associated with a given security group
427
 
            Invoked when security group rules are updated
428
 
        """
429
 
        return self._vmops.refresh_security_group_rules(security_group_id)
430
 
 
431
 
    def refresh_security_group_members(self, security_group_id):
432
 
        """ Updates security group rules for all instances
433
 
            associated with a given security group
434
 
            Invoked when instances are added/removed to a security group
435
 
        """
436
 
        return self._vmops.refresh_security_group_members(security_group_id)
437
 
 
438
 
    def refresh_provider_fw_rules(self):
439
 
        return self._vmops.refresh_provider_fw_rules()
440
 
 
441
 
    def update_host_status(self):
442
 
        """Update the status info of the host, and return those values
443
 
            to the calling program."""
444
 
        return self.host_state.update_status()
445
 
 
446
 
    def get_host_stats(self, refresh=False):
447
 
        """Return the current state of the host. If 'refresh' is
448
 
           True, run the update first."""
449
 
        return self.host_state.get_host_stats(refresh=refresh)
450
 
 
451
 
    def host_power_action(self, host, action):
452
 
        """The only valid values for 'action' on XenServer are 'reboot' or
453
 
        'shutdown', even though the API also accepts 'startup'. As this is
454
 
        not technically possible on XenServer, since the host is the same
455
 
        physical machine as the hypervisor, if this is requested, we need to
456
 
        raise an exception.
457
 
        """
458
 
        if action in ("reboot", "shutdown"):
459
 
            return self._host.host_power_action(host, action)
460
 
        else:
461
 
            msg = _("Host startup on XenServer is not supported.")
462
 
            raise NotImplementedError(msg)
463
 
 
464
 
    def set_host_enabled(self, host, enabled):
465
 
        """Sets the specified host's ability to accept new instances."""
466
 
        return self._host.set_host_enabled(host, enabled)
467
 
 
468
 
    def host_maintenance_mode(self, host, mode):
469
 
        """Start/Stop host maintenance window. On start, it triggers
470
 
        guest VMs evacuation."""
471
 
        return self._host.host_maintenance_mode(host, mode)
472
 
 
473
 
    def add_to_aggregate(self, context, aggregate, host, **kwargs):
474
 
        """Add a compute host to an aggregate."""
475
 
        return self._pool.add_to_aggregate(context, aggregate, host, **kwargs)
476
 
 
477
 
    def remove_from_aggregate(self, context, aggregate, host, **kwargs):
478
 
        """Remove a compute host from an aggregate."""
479
 
        return self._pool.remove_from_aggregate(context,
480
 
                                                aggregate, host, **kwargs)
481
 
 
482
 
 
483
 
class XenAPISession(object):
484
 
    """The session to invoke XenAPI SDK calls"""
485
 
 
486
 
    def __init__(self, url, user, pw):
487
 
        self.XenAPI = self.get_imported_xenapi()
488
 
        self._sessions = queue.Queue()
489
 
        self.host_uuid = None
490
 
        self.is_slave = False
491
 
        exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
492
 
                                          "(is the Dom0 disk full?)"))
493
 
        url = self._create_first_session(url, user, pw, exception)
494
 
        self._populate_session_pool(url, user, pw, exception)
495
 
        self._populate_host_uuid()
496
 
 
497
 
    def _create_first_session(self, url, user, pw, exception):
498
 
        try:
499
 
            session = self._create_session(url)
500
 
            with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
501
 
                session.login_with_password(user, pw)
502
 
        except self.XenAPI.Failure, e:
503
 
            # if user and pw of the master are different, we're doomed!
504
 
            if e.details[0] == 'HOST_IS_SLAVE':
505
 
                master = e.details[1]
506
 
                url = pool.swap_xapi_host(url, master)
507
 
                session = self.XenAPI.Session(url)
508
 
                session.login_with_password(user, pw)
509
 
                self.is_slave = True
510
 
            else:
511
 
                raise
512
 
        self._sessions.put(session)
513
 
        return url
514
 
 
515
 
    def _populate_session_pool(self, url, user, pw, exception):
516
 
        for i in xrange(FLAGS.xenapi_connection_concurrent - 1):
517
 
            session = self._create_session(url)
518
 
            with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
519
 
                session.login_with_password(user, pw)
520
 
            self._sessions.put(session)
521
 
 
522
 
    def _populate_host_uuid(self):
523
 
        if self.is_slave:
524
 
            try:
525
 
                aggr = db.aggregate_get_by_host(context.get_admin_context(),
526
 
                                                FLAGS.host)
527
 
                self.host_uuid = aggr.metadetails[FLAGS.host]
528
 
            except exception.AggregateHostNotFound:
529
 
                LOG.exception(_('Host is member of a pool, but DB '
530
 
                                'says otherwise'))
531
 
                raise
532
 
        else:
533
 
            with self._get_session() as session:
534
 
                host_ref = session.xenapi.session.get_this_host(session.handle)
535
 
                self.host_uuid = session.xenapi.host.get_uuid(host_ref)
536
 
 
537
 
    def get_product_version(self):
538
 
        """Return a tuple of (major, minor, rev) for the host version"""
539
 
        host = self.get_xenapi_host()
540
 
        software_version = self.call_xenapi('host.get_software_version',
541
 
                                            host)
542
 
        product_version = software_version['product_version']
543
 
        return tuple(int(part) for part in product_version.split('.'))
544
 
 
545
 
    def get_imported_xenapi(self):
546
 
        """Stubout point. This can be replaced with a mock xenapi module."""
547
 
        return __import__('XenAPI')
548
 
 
549
 
    def get_session_id(self):
550
 
        """Return a string session_id.  Used for vnc consoles."""
551
 
        with self._get_session() as session:
552
 
            return str(session._session)
553
 
 
554
 
    @contextlib.contextmanager
555
 
    def _get_session(self):
556
 
        """Return exclusive session for scope of with statement"""
557
 
        session = self._sessions.get()
558
 
        try:
559
 
            yield session
560
 
        finally:
561
 
            self._sessions.put(session)
562
 
 
563
 
    def get_xenapi_host(self):
564
 
        """Return the xenapi host on which nova-compute runs on."""
565
 
        with self._get_session() as session:
566
 
            return session.xenapi.host.get_by_uuid(self.host_uuid)
567
 
 
568
 
    def call_xenapi(self, method, *args):
569
 
        """Call the specified XenAPI method on a background thread."""
570
 
        with self._get_session() as session:
571
 
            f = session.xenapi
572
 
            for m in method.split('.'):
573
 
                f = getattr(f, m)
574
 
            return tpool.execute(f, *args)
575
 
 
576
 
    def call_xenapi_request(self, method, *args):
577
 
        """Some interactions with dom0, such as interacting with xenstore's
578
 
        param record, require using the xenapi_request method of the session
579
 
        object. This wraps that call on a background thread.
580
 
        """
581
 
        with self._get_session() as session:
582
 
            f = session.xenapi_request
583
 
            return tpool.execute(f, method, *args)
584
 
 
585
 
    def call_plugin(self, plugin, fn, args):
586
 
        """Call host.call_plugin on a background thread."""
587
 
        # NOTE(johannes): Fetch host before we acquire a session. Since
588
 
        # get_xenapi_host() acquires a session too, it can result in a
589
 
        # deadlock if multiple greenthreads race with each other. See
590
 
        # bug 924918
591
 
        host = self.get_xenapi_host()
592
 
 
593
 
        # NOTE(armando): pass the host uuid along with the args so that
594
 
        # the plugin gets executed on the right host when using XS pools
595
 
        args['host_uuid'] = self.host_uuid
596
 
 
597
 
        with self._get_session() as session:
598
 
            return tpool.execute(self._unwrap_plugin_exceptions,
599
 
                                 session.xenapi.host.call_plugin,
600
 
                                 host, plugin, fn, args)
601
 
 
602
 
    def _create_session(self, url):
603
 
        """Stubout point. This can be replaced with a mock session."""
604
 
        return self.XenAPI.Session(url)
605
 
 
606
 
    def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
607
 
        """Parse exception details"""
608
 
        try:
609
 
            return func(*args, **kwargs)
610
 
        except self.XenAPI.Failure, exc:
611
 
            LOG.debug(_("Got exception: %s"), exc)
612
 
            if (len(exc.details) == 4 and
613
 
                exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
614
 
                exc.details[2] == 'Failure'):
615
 
                params = None
616
 
                try:
617
 
                    params = eval(exc.details[3])
618
 
                except Exception:
619
 
                    raise exc
620
 
                raise self.XenAPI.Failure(params)
621
 
            else:
622
 
                raise
623
 
        except xmlrpclib.ProtocolError, exc:
624
 
            LOG.debug(_("Got exception: %s"), exc)
625
 
            raise
626
 
 
627
 
 
628
 
def _parse_xmlrpc_value(val):
629
 
    """Parse the given value as if it were an XML-RPC value. This is
630
 
    sometimes used as the format for the task.result field."""
631
 
    if not val:
632
 
        return val
633
 
    x = xmlrpclib.loads(
634
 
        '<?xml version="1.0"?><methodResponse><params><param>' +
635
 
        val +
636
 
        '</param></params></methodResponse>')
637
 
    return x[0][0]