74
87
flags.DEFINE_float('xenapi_task_poll_interval',
76
89
'The interval used for polling of remote tasks '
77
'(Async.VM.start, etc). Used only if '
90
'(Async.VM.start, etc). Used only if '
78
91
'connection_type=xenapi.')
92
flags.DEFINE_string('xenapi_image_service',
94
'Where to get VM images: glance or objectstore.')
95
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
97
'The interval used for polling of coalescing vhds.'
98
' Used only if connection_type=xenapi.')
99
flags.DEFINE_integer('xenapi_vhd_coalesce_max_attempts',
101
'Max number of times to poll for VHD to coalesce.'
102
' Used only if connection_type=xenapi.')
103
flags.DEFINE_string('target_host',
106
flags.DEFINE_string('target_port',
108
'iSCSI Target Port, 3260 Default')
109
flags.DEFINE_string('iqn_prefix',
110
'iqn.2010-10.org.openstack',
83
114
def get_connection(_):
84
115
"""Note that XenAPI doesn't have a read-only connection mode, so
85
116
the read_only parameter is ignored."""
86
# This is loaded late so that there's no need to install this
87
# library when not using XenAPI.
90
XenAPI = __import__('XenAPI')
91
117
url = FLAGS.xenapi_connection_url
92
118
username = FLAGS.xenapi_connection_username
93
119
password = FLAGS.xenapi_connection_password
94
120
if not url or password is None:
95
raise Exception('Must specify xenapi_connection_url, '
96
'xenapi_connection_username (optionally), and '
97
'xenapi_connection_password to use '
98
'connection_type=xenapi')
121
raise Exception(_('Must specify xenapi_connection_url, '
122
'xenapi_connection_username (optionally), and '
123
'xenapi_connection_password to use '
124
'connection_type=xenapi'))
99
125
return XenAPIConnection(url, username, password)
102
128
class XenAPIConnection(object):
103
""" A connection to XenServer or Xen Cloud Platform """
129
"""A connection to XenServer or Xen Cloud Platform"""
104
131
def __init__(self, url, user, pw):
105
132
session = XenAPISession(url, user, pw)
106
133
self._vmops = VMOps(session)
107
134
self._volumeops = VolumeOps(session)
137
#FIXME(armando): implement this
138
#NOTE(armando): would we need a method
139
#to call when shutting down the host?
140
#e.g. to do session logout?
109
143
def list_instances(self):
110
""" List VM instances """
144
"""List VM instances"""
111
145
return self._vmops.list_instances()
113
147
def spawn(self, instance):
114
""" Create VM instance """
148
"""Create VM instance"""
115
149
self._vmops.spawn(instance)
151
def snapshot(self, instance, image_id):
152
""" Create snapshot from a running VM instance """
153
self._vmops.snapshot(instance, image_id)
117
155
def reboot(self, instance):
118
""" Reboot VM instance """
156
"""Reboot VM instance"""
119
157
self._vmops.reboot(instance)
159
def set_admin_password(self, instance, new_pass):
160
"""Set the root/admin password on the VM instance"""
161
self._vmops.set_admin_password(instance, new_pass)
121
163
def destroy(self, instance):
122
""" Destroy VM instance """
164
"""Destroy VM instance"""
123
165
self._vmops.destroy(instance)
167
def pause(self, instance, callback):
168
"""Pause VM instance"""
169
self._vmops.pause(instance, callback)
171
def unpause(self, instance, callback):
172
"""Unpause paused VM instance"""
173
self._vmops.unpause(instance, callback)
175
def suspend(self, instance, callback):
176
"""suspend the specified instance"""
177
self._vmops.suspend(instance, callback)
179
def resume(self, instance, callback):
180
"""resume the specified instance"""
181
self._vmops.resume(instance, callback)
125
183
def get_info(self, instance_id):
126
""" Return data about VM instance """
184
"""Return data about VM instance"""
127
185
return self._vmops.get_info(instance_id)
129
def get_diagnostics(self, instance_id):
187
def get_diagnostics(self, instance):
130
188
"""Return data about VM diagnostics"""
131
return self._vmops.get_diagnostics(instance_id)
189
return self._vmops.get_diagnostics(instance)
133
191
def get_console_output(self, instance):
134
""" Return snapshot of console """
192
"""Return snapshot of console"""
135
193
return self._vmops.get_console_output(instance)
195
def get_ajax_console(self, instance):
196
"""Return link to instance's ajax console"""
197
return self._vmops.get_ajax_console(instance)
137
199
def attach_volume(self, instance_name, device_path, mountpoint):
138
""" Attach volume storage to VM instance """
200
"""Attach volume storage to VM instance"""
139
201
return self._volumeops.attach_volume(instance_name,
143
205
def detach_volume(self, instance_name, mountpoint):
144
""" Detach volume storage to VM instance """
206
"""Detach volume storage to VM instance"""
145
207
return self._volumeops.detach_volume(instance_name, mountpoint)
209
def get_console_pool_info(self, console_type):
210
xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
211
return {'address': xs_url.netloc,
212
'username': FLAGS.xenapi_connection_username,
213
'password': FLAGS.xenapi_connection_password}
148
216
class XenAPISession(object):
149
""" The session to invoke XenAPI SDK calls """
217
"""The session to invoke XenAPI SDK calls"""
150
219
def __init__(self, url, user, pw):
151
self._session = XenAPI.Session(url)
220
self.XenAPI = self.get_imported_xenapi()
221
self._session = self._create_session(url)
152
222
self._session.login_with_password(user, pw)
225
def get_imported_xenapi(self):
226
"""Stubout point. This can be replaced with a mock xenapi module."""
227
return __import__('XenAPI')
154
229
def get_xenapi(self):
155
""" Return the xenapi object """
230
"""Return the xenapi object"""
156
231
return self._session.xenapi
158
233
def get_xenapi_host(self):
159
""" Return the xenapi host """
234
"""Return the xenapi host"""
160
235
return self._session.xenapi.session.get_this_host(self._session.handle)
162
@utils.deferredToThread
163
237
def call_xenapi(self, method, *args):
164
"""Call the specified XenAPI method on a background thread. Returns
165
a Deferred for the result."""
238
"""Call the specified XenAPI method on a background thread."""
166
239
f = self._session.xenapi
167
240
for m in method.split('.'):
168
241
f = f.__getattr__(m)
171
@utils.deferredToThread
242
return tpool.execute(f, *args)
244
def call_xenapi_request(self, method, *args):
245
"""Some interactions with dom0, such as interacting with xenstore's
246
param record, require using the xenapi_request method of the session
247
object. This wraps that call on a background thread.
249
f = self._session.xenapi_request
250
return tpool.execute(f, method, *args)
172
252
def async_call_plugin(self, plugin, fn, args):
173
"""Call Async.host.call_plugin on a background thread. Returns a
174
Deferred with the task reference."""
175
return _unwrap_plugin_exceptions(
176
self._session.xenapi.Async.host.call_plugin,
177
self.get_xenapi_host(), plugin, fn, args)
179
def wait_for_task(self, task):
180
"""Return a Deferred that will give the result of the given task.
181
The task is polled until it completes."""
183
reactor.callLater(0, self._poll_task, task, d)
186
@utils.deferredToThread
187
def _poll_task(self, task, deferred):
188
"""Poll the given XenAPI task, and fire the given Deferred if we
253
"""Call Async.host.call_plugin on a background thread."""
254
return tpool.execute(self._unwrap_plugin_exceptions,
255
self._session.xenapi.Async.host.call_plugin,
256
self.get_xenapi_host(), plugin, fn, args)
258
def wait_for_task(self, id, task):
259
"""Return the result of the given task. The task is polled
260
until it completes. Not re-entrant."""
262
self.loop = utils.LoopingCall(self._poll_task, id, task, done)
263
self.loop.start(FLAGS.xenapi_task_poll_interval, now=True)
268
def _stop_loop(self):
269
"""Stop polling for task to finish."""
270
#NOTE(sandy-walsh) Had to break this call out to support unit tests.
274
def _create_session(self, url):
275
"""Stubout point. This can be replaced with a mock session."""
276
return self.XenAPI.Session(url)
278
def _poll_task(self, id, task, done):
279
"""Poll the given XenAPI task, and fire the given action if we
191
#logging.debug('Polling task %s...', task)
283
name = self._session.xenapi.task.get_name_label(task)
192
284
status = self._session.xenapi.task.get_status(task)
193
if status == 'pending':
194
reactor.callLater(FLAGS.xenapi_task_poll_interval,
195
self._poll_task, task, deferred)
196
elif status == 'success':
287
action=name[0:255], # Ensure action is never > 255
289
if status == "pending":
291
elif status == "success":
197
292
result = self._session.xenapi.task.get_result(task)
198
logging.info('Task %s status: success. %s', task, result)
199
deferred.callback(_parse_xmlrpc_value(result))
293
LOG.info(_("Task [%s] %s status: success %s") % (
297
done.send(_parse_xmlrpc_value(result))
201
299
error_info = self._session.xenapi.task.get_error_info(task)
202
logging.warn('Task %s status: %s. %s', task, status,
204
deferred.errback(XenAPI.Failure(error_info))
205
#logging.debug('Polling task %s done.', task)
206
except XenAPI.Failure, exc:
208
deferred.errback(exc)
211
def _unwrap_plugin_exceptions(func, *args, **kwargs):
212
""" Parse exception details """
214
return func(*args, **kwargs)
215
except XenAPI.Failure, exc:
216
logging.debug("Got exception: %s", exc)
217
if (len(exc.details) == 4 and
218
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
219
exc.details[2] == 'Failure'):
222
params = eval(exc.details[3])
225
raise XenAPI.Failure(params)
300
action["error"] = str(error_info)
301
LOG.warn(_("Task [%s] %s status: %s %s") % (
306
done.send_exception(self.XenAPI.Failure(error_info))
307
db.instance_action_create(context.get_admin_context(), action)
308
except self.XenAPI.Failure, exc:
310
done.send_exception(*sys.exc_info())
313
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
314
"""Parse exception details"""
316
return func(*args, **kwargs)
317
except self.XenAPI.Failure, exc:
318
LOG.debug(_("Got exception: %s"), exc)
319
if (len(exc.details) == 4 and
320
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
321
exc.details[2] == 'Failure'):
324
params = eval(exc.details[3])
327
raise self.XenAPI.Failure(params)
330
except xmlrpclib.ProtocolError, exc:
331
LOG.debug(_("Got exception: %s"), exc)
228
except xmlrpclib.ProtocolError, exc:
229
logging.debug("Got exception: %s", exc)
233
335
def _parse_xmlrpc_value(val):
234
"""Parse the given value as if it were an XML-RPC value. This is
336
"""Parse the given value as if it were an XML-RPC value. This is
235
337
sometimes used as the format for the task.result field."""