~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/virt/xenapi_conn.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-01-21 11:48:06 UTC
  • mto: This revision was merged to the branch mainline in revision 9.
  • Revision ID: james.westby@ubuntu.com-20110121114806-v8fvnnl6az4m4ohv
Tags: upstream-2011.1~bzr597
ImportĀ upstreamĀ versionĀ 2011.1~bzr597

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
2
 
3
3
# Copyright (c) 2010 Citrix Systems, Inc.
 
4
# Copyright 2010 OpenStack LLC.
4
5
#
5
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
7
#    not use this file except in compliance with the License. You may obtain
19
20
 
20
21
The concurrency model for this class is as follows:
21
22
 
22
 
All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
23
 
deferredToThread).  They are remote calls, and so may hang for the usual
24
 
reasons.  They should not be allowed to block the reactor thread.
 
23
All XenAPI calls are on a green thread (using eventlet's "tpool"
 
24
thread pool). They are remote calls, and so may hang for the usual
 
25
reasons.
25
26
 
26
27
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
27
 
(using XenAPI.VM.async_start etc).  These return a task, which can then be
28
 
polled for completion.  Polling is handled using reactor.callLater.
 
28
(using XenAPI.VM.async_start etc). These return a task, which can then be
 
29
polled for completion.
29
30
 
30
 
This combination of techniques means that we don't block the reactor thread at
 
31
This combination of techniques means that we don't block the main thread at
31
32
all, and at the same time we don't hold lots of threads waiting for
32
33
long-running operations.
33
34
 
44
45
:xenapi_task_poll_interval:  The interval (seconds) used for polling of
45
46
                             remote tasks (Async.VM.start, etc)
46
47
                             (default: 0.5).
47
 
 
 
48
:target_host:                the iSCSI Target Host IP address, i.e. the IP
 
49
                             address for the nova-volume host
 
50
:target_port:                iSCSI Target Port, 3260 Default
 
51
:iqn_prefix:                 IQN Prefix, e.g. 'iqn.2010-10.org.openstack'
48
52
"""
49
53
 
50
 
import logging
 
54
import sys
 
55
import urlparse
51
56
import xmlrpclib
52
57
 
53
 
from twisted.internet import defer
54
 
from twisted.internet import reactor
 
58
from eventlet import event
 
59
from eventlet import tpool
55
60
 
 
61
from nova import context
 
62
from nova import db
56
63
from nova import utils
57
64
from nova import flags
 
65
from nova import log as logging
58
66
from nova.virt.xenapi.vmops import VMOps
59
67
from nova.virt.xenapi.volumeops import VolumeOps
60
68
 
 
69
 
 
70
LOG = logging.getLogger("nova.virt.xenapi")
 
71
 
 
72
 
61
73
FLAGS = flags.FLAGS
 
74
 
62
75
flags.DEFINE_string('xenapi_connection_url',
63
76
                    None,
64
77
                    'URL for connection to XenServer/Xen Cloud Platform.'
74
87
flags.DEFINE_float('xenapi_task_poll_interval',
75
88
                   0.5,
76
89
                   'The interval used for polling of remote tasks '
77
 
                   '(Async.VM.start, etc).  Used only if '
 
90
                   '(Async.VM.start, etc). Used only if '
78
91
                   'connection_type=xenapi.')
79
 
 
80
 
XenAPI = None
 
92
flags.DEFINE_string('xenapi_image_service',
 
93
                    'glance',
 
94
                    'Where to get VM images: glance or objectstore.')
 
95
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
 
96
                   5.0,
 
97
                   'The interval used for polling of coalescing vhds.'
 
98
                   '  Used only if connection_type=xenapi.')
 
99
flags.DEFINE_integer('xenapi_vhd_coalesce_max_attempts',
 
100
                     5,
 
101
                     'Max number of times to poll for VHD to coalesce.'
 
102
                     '  Used only if connection_type=xenapi.')
 
103
flags.DEFINE_string('target_host',
 
104
                    None,
 
105
                    'iSCSI Target Host')
 
106
flags.DEFINE_string('target_port',
 
107
                    '3260',
 
108
                    'iSCSI Target Port, 3260 Default')
 
109
flags.DEFINE_string('iqn_prefix',
 
110
                    'iqn.2010-10.org.openstack',
 
111
                    'IQN Prefix')
81
112
 
82
113
 
83
114
def get_connection(_):
84
115
    """Note that XenAPI doesn't have a read-only connection mode, so
85
116
    the read_only parameter is ignored."""
86
 
    # This is loaded late so that there's no need to install this
87
 
    # library when not using XenAPI.
88
 
    global XenAPI
89
 
    if XenAPI is None:
90
 
        XenAPI = __import__('XenAPI')
91
117
    url = FLAGS.xenapi_connection_url
92
118
    username = FLAGS.xenapi_connection_username
93
119
    password = FLAGS.xenapi_connection_password
94
120
    if not url or password is None:
95
 
        raise Exception('Must specify xenapi_connection_url, '
96
 
                        'xenapi_connection_username (optionally), and '
97
 
                        'xenapi_connection_password to use '
98
 
                        'connection_type=xenapi')
 
121
        raise Exception(_('Must specify xenapi_connection_url, '
 
122
                          'xenapi_connection_username (optionally), and '
 
123
                          'xenapi_connection_password to use '
 
124
                          'connection_type=xenapi'))
99
125
    return XenAPIConnection(url, username, password)
100
126
 
101
127
 
102
128
class XenAPIConnection(object):
103
 
    """ A connection to XenServer or Xen Cloud Platform """
 
129
    """A connection to XenServer or Xen Cloud Platform"""
 
130
 
104
131
    def __init__(self, url, user, pw):
105
132
        session = XenAPISession(url, user, pw)
106
133
        self._vmops = VMOps(session)
107
134
        self._volumeops = VolumeOps(session)
108
135
 
 
136
    def init_host(self):
 
137
        #FIXME(armando): implement this
 
138
        #NOTE(armando): would we need a method
 
139
        #to call when shutting down the host?
 
140
        #e.g. to do session logout?
 
141
        pass
 
142
 
109
143
    def list_instances(self):
110
 
        """ List VM instances """
 
144
        """List VM instances"""
111
145
        return self._vmops.list_instances()
112
146
 
113
147
    def spawn(self, instance):
114
 
        """ Create VM instance """
 
148
        """Create VM instance"""
115
149
        self._vmops.spawn(instance)
116
150
 
 
151
    def snapshot(self, instance, image_id):
 
152
        """ Create snapshot from a running VM instance """
 
153
        self._vmops.snapshot(instance, image_id)
 
154
 
117
155
    def reboot(self, instance):
118
 
        """ Reboot VM instance """
 
156
        """Reboot VM instance"""
119
157
        self._vmops.reboot(instance)
120
158
 
 
159
    def set_admin_password(self, instance, new_pass):
 
160
        """Set the root/admin password on the VM instance"""
 
161
        self._vmops.set_admin_password(instance, new_pass)
 
162
 
121
163
    def destroy(self, instance):
122
 
        """ Destroy VM instance """
 
164
        """Destroy VM instance"""
123
165
        self._vmops.destroy(instance)
124
166
 
 
167
    def pause(self, instance, callback):
 
168
        """Pause VM instance"""
 
169
        self._vmops.pause(instance, callback)
 
170
 
 
171
    def unpause(self, instance, callback):
 
172
        """Unpause paused VM instance"""
 
173
        self._vmops.unpause(instance, callback)
 
174
 
 
175
    def suspend(self, instance, callback):
 
176
        """suspend the specified instance"""
 
177
        self._vmops.suspend(instance, callback)
 
178
 
 
179
    def resume(self, instance, callback):
 
180
        """resume the specified instance"""
 
181
        self._vmops.resume(instance, callback)
 
182
 
125
183
    def get_info(self, instance_id):
126
 
        """ Return data about VM instance """
 
184
        """Return data about VM instance"""
127
185
        return self._vmops.get_info(instance_id)
128
186
 
129
 
    def get_diagnostics(self, instance_id):
 
187
    def get_diagnostics(self, instance):
130
188
        """Return data about VM diagnostics"""
131
 
        return self._vmops.get_diagnostics(instance_id)
 
189
        return self._vmops.get_diagnostics(instance)
132
190
 
133
191
    def get_console_output(self, instance):
134
 
        """ Return snapshot of console """
 
192
        """Return snapshot of console"""
135
193
        return self._vmops.get_console_output(instance)
136
194
 
 
195
    def get_ajax_console(self, instance):
 
196
        """Return link to instance's ajax console"""
 
197
        return self._vmops.get_ajax_console(instance)
 
198
 
137
199
    def attach_volume(self, instance_name, device_path, mountpoint):
138
 
        """ Attach volume storage to VM instance """
 
200
        """Attach volume storage to VM instance"""
139
201
        return self._volumeops.attach_volume(instance_name,
140
202
                                               device_path,
141
203
                                               mountpoint)
142
204
 
143
205
    def detach_volume(self, instance_name, mountpoint):
144
 
        """ Detach volume storage to VM instance """
 
206
        """Detach volume storage to VM instance"""
145
207
        return self._volumeops.detach_volume(instance_name, mountpoint)
146
208
 
 
209
    def get_console_pool_info(self, console_type):
 
210
        xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
 
211
        return  {'address': xs_url.netloc,
 
212
                 'username': FLAGS.xenapi_connection_username,
 
213
                 'password': FLAGS.xenapi_connection_password}
 
214
 
147
215
 
148
216
class XenAPISession(object):
149
 
    """ The session to invoke XenAPI SDK calls """
 
217
    """The session to invoke XenAPI SDK calls"""
 
218
 
150
219
    def __init__(self, url, user, pw):
151
 
        self._session = XenAPI.Session(url)
 
220
        self.XenAPI = self.get_imported_xenapi()
 
221
        self._session = self._create_session(url)
152
222
        self._session.login_with_password(user, pw)
 
223
        self.loop = None
 
224
 
 
225
    def get_imported_xenapi(self):
 
226
        """Stubout point. This can be replaced with a mock xenapi module."""
 
227
        return __import__('XenAPI')
153
228
 
154
229
    def get_xenapi(self):
155
 
        """ Return the xenapi object """
 
230
        """Return the xenapi object"""
156
231
        return self._session.xenapi
157
232
 
158
233
    def get_xenapi_host(self):
159
 
        """ Return the xenapi host """
 
234
        """Return the xenapi host"""
160
235
        return self._session.xenapi.session.get_this_host(self._session.handle)
161
236
 
162
 
    @utils.deferredToThread
163
237
    def call_xenapi(self, method, *args):
164
 
        """Call the specified XenAPI method on a background thread.  Returns
165
 
        a Deferred for the result."""
 
238
        """Call the specified XenAPI method on a background thread."""
166
239
        f = self._session.xenapi
167
240
        for m in method.split('.'):
168
241
            f = f.__getattr__(m)
169
 
        return f(*args)
170
 
 
171
 
    @utils.deferredToThread
 
242
        return tpool.execute(f, *args)
 
243
 
 
244
    def call_xenapi_request(self, method, *args):
 
245
        """Some interactions with dom0, such as interacting with xenstore's
 
246
        param record, require using the xenapi_request method of the session
 
247
        object. This wraps that call on a background thread.
 
248
        """
 
249
        f = self._session.xenapi_request
 
250
        return tpool.execute(f, method, *args)
 
251
 
172
252
    def async_call_plugin(self, plugin, fn, args):
173
 
        """Call Async.host.call_plugin on a background thread.  Returns a
174
 
        Deferred with the task reference."""
175
 
        return _unwrap_plugin_exceptions(
176
 
            self._session.xenapi.Async.host.call_plugin,
177
 
            self.get_xenapi_host(), plugin, fn, args)
178
 
 
179
 
    def wait_for_task(self, task):
180
 
        """Return a Deferred that will give the result of the given task.
181
 
        The task is polled until it completes."""
182
 
        d = defer.Deferred()
183
 
        reactor.callLater(0, self._poll_task, task, d)
184
 
        return d
185
 
 
186
 
    @utils.deferredToThread
187
 
    def _poll_task(self, task, deferred):
188
 
        """Poll the given XenAPI task, and fire the given Deferred if we
189
 
        get a result."""
 
253
        """Call Async.host.call_plugin on a background thread."""
 
254
        return tpool.execute(self._unwrap_plugin_exceptions,
 
255
                             self._session.xenapi.Async.host.call_plugin,
 
256
                             self.get_xenapi_host(), plugin, fn, args)
 
257
 
 
258
    def wait_for_task(self, id, task):
 
259
        """Return the result of the given task. The task is polled
 
260
        until it completes. Not re-entrant."""
 
261
        done = event.Event()
 
262
        self.loop = utils.LoopingCall(self._poll_task, id, task, done)
 
263
        self.loop.start(FLAGS.xenapi_task_poll_interval, now=True)
 
264
        rv = done.wait()
 
265
        self.loop.stop()
 
266
        return rv
 
267
 
 
268
    def _stop_loop(self):
 
269
        """Stop polling for task to finish."""
 
270
        #NOTE(sandy-walsh) Had to break this call out to support unit tests.
 
271
        if self.loop:
 
272
            self.loop.stop()
 
273
 
 
274
    def _create_session(self, url):
 
275
        """Stubout point. This can be replaced with a mock session."""
 
276
        return self.XenAPI.Session(url)
 
277
 
 
278
    def _poll_task(self, id, task, done):
 
279
        """Poll the given XenAPI task, and fire the given action if we
 
280
        get a result.
 
281
        """
190
282
        try:
191
 
            #logging.debug('Polling task %s...', task)
 
283
            name = self._session.xenapi.task.get_name_label(task)
192
284
            status = self._session.xenapi.task.get_status(task)
193
 
            if status == 'pending':
194
 
                reactor.callLater(FLAGS.xenapi_task_poll_interval,
195
 
                                  self._poll_task, task, deferred)
196
 
            elif status == 'success':
 
285
            action = dict(
 
286
                instance_id=int(id),
 
287
                action=name[0:255],  # Ensure action is never > 255
 
288
                error=None)
 
289
            if status == "pending":
 
290
                return
 
291
            elif status == "success":
197
292
                result = self._session.xenapi.task.get_result(task)
198
 
                logging.info('Task %s status: success.  %s', task, result)
199
 
                deferred.callback(_parse_xmlrpc_value(result))
 
293
                LOG.info(_("Task [%s] %s status: success    %s") % (
 
294
                    name,
 
295
                    task,
 
296
                    result))
 
297
                done.send(_parse_xmlrpc_value(result))
200
298
            else:
201
299
                error_info = self._session.xenapi.task.get_error_info(task)
202
 
                logging.warn('Task %s status: %s.  %s', task, status,
203
 
                             error_info)
204
 
                deferred.errback(XenAPI.Failure(error_info))
205
 
            #logging.debug('Polling task %s done.', task)
206
 
        except XenAPI.Failure, exc:
207
 
            logging.warn(exc)
208
 
            deferred.errback(exc)
209
 
 
210
 
 
211
 
def _unwrap_plugin_exceptions(func, *args, **kwargs):
212
 
    """ Parse exception details """
213
 
    try:
214
 
        return func(*args, **kwargs)
215
 
    except XenAPI.Failure, exc:
216
 
        logging.debug("Got exception: %s", exc)
217
 
        if (len(exc.details) == 4 and
218
 
            exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
219
 
            exc.details[2] == 'Failure'):
220
 
            params = None
221
 
            try:
222
 
                params = eval(exc.details[3])
223
 
            except:
224
 
                raise exc
225
 
            raise XenAPI.Failure(params)
226
 
        else:
 
300
                action["error"] = str(error_info)
 
301
                LOG.warn(_("Task [%s] %s status: %s    %s") % (
 
302
                    name,
 
303
                    task,
 
304
                    status,
 
305
                    error_info))
 
306
                done.send_exception(self.XenAPI.Failure(error_info))
 
307
            db.instance_action_create(context.get_admin_context(), action)
 
308
        except self.XenAPI.Failure, exc:
 
309
            LOG.warn(exc)
 
310
            done.send_exception(*sys.exc_info())
 
311
        self._stop_loop()
 
312
 
 
313
    def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
 
314
        """Parse exception details"""
 
315
        try:
 
316
            return func(*args, **kwargs)
 
317
        except self.XenAPI.Failure, exc:
 
318
            LOG.debug(_("Got exception: %s"), exc)
 
319
            if (len(exc.details) == 4 and
 
320
                exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
 
321
                exc.details[2] == 'Failure'):
 
322
                params = None
 
323
                try:
 
324
                    params = eval(exc.details[3])
 
325
                except:
 
326
                    raise exc
 
327
                raise self.XenAPI.Failure(params)
 
328
            else:
 
329
                raise
 
330
        except xmlrpclib.ProtocolError, exc:
 
331
            LOG.debug(_("Got exception: %s"), exc)
227
332
            raise
228
 
    except xmlrpclib.ProtocolError, exc:
229
 
        logging.debug("Got exception: %s", exc)
230
 
        raise
231
333
 
232
334
 
233
335
def _parse_xmlrpc_value(val):
234
 
    """Parse the given value as if it were an XML-RPC value.  This is
 
336
    """Parse the given value as if it were an XML-RPC value. This is
235
337
    sometimes used as the format for the task.result field."""
236
338
    if not val:
237
339
        return val