~abentley/juju-ci-tools/client-from-config-4

« back to all changes in this revision

Viewing changes to substrate.py

  • Committer: Aaron Bentley
  • Date: 2014-02-24 17:18:29 UTC
  • mto: This revision was merged to the branch mainline in revision 252.
  • Revision ID: aaron.bentley@canonical.com-20140224171829-sz644yhoygu7m9dm
Use tags to identify and shut down instances.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from contextlib import (
2
 
    contextmanager,
3
 
)
4
 
import json
5
 
import logging
6
 
import os
7
 
import subprocess
8
 
from time import sleep
9
 
import urlparse
10
 
 
11
 
from boto import ec2
12
 
from boto.exception import EC2ResponseError
13
 
 
14
 
import get_ami
15
 
from jujuconfig import (
16
 
    get_euca_env,
17
 
    translate_to_env,
18
 
)
19
 
from jujupy import (
20
 
    EnvJujuClient1X
21
 
)
22
 
from utility import (
23
 
    temp_dir,
24
 
    until_timeout,
25
 
)
26
 
import winazurearm
27
 
 
28
 
 
29
 
__metaclass__ = type
30
 
 
31
 
 
32
 
log = logging.getLogger("substrate")
33
 
 
34
 
 
35
 
LIBVIRT_DOMAIN_RUNNING = 'running'
36
 
LIBVIRT_DOMAIN_SHUT_OFF = 'shut off'
37
 
 
38
 
 
39
 
class StillProvisioning(Exception):
40
 
    """Attempted to terminate instances still provisioning."""
41
 
 
42
 
    def __init__(self, instance_ids):
43
 
        super(StillProvisioning, self).__init__(
44
 
            'Still provisioning: {}'.format(', '.join(instance_ids)))
45
 
        self.instance_ids = instance_ids
46
 
 
47
 
 
48
 
def terminate_instances(env, instance_ids):
49
 
    if len(instance_ids) == 0:
50
 
        log.info("No instances to delete.")
51
 
        return
52
 
    provider_type = env.config.get('type')
53
 
    environ = dict(os.environ)
54
 
    if provider_type == 'ec2':
55
 
        environ.update(get_euca_env(env.config))
56
 
        command_args = ['euca-terminate-instances'] + instance_ids
57
 
    elif provider_type in ('openstack', 'rackspace'):
58
 
        environ.update(translate_to_env(env.config))
59
 
        command_args = ['nova', 'delete'] + instance_ids
60
 
    elif provider_type == 'maas':
61
 
        with maas_account_from_config(env.config) as substrate:
62
 
            substrate.terminate_instances(instance_ids)
63
 
        return
64
 
    else:
65
 
        with make_substrate_manager(env.config) as substrate:
66
 
            if substrate is None:
67
 
                raise ValueError(
68
 
                    "This test does not support the %s provider"
69
 
                    % provider_type)
70
 
            return substrate.terminate_instances(instance_ids)
71
 
    log.info("Deleting %s." % ', '.join(instance_ids))
72
 
    subprocess.check_call(command_args, env=environ)
73
 
 
74
 
 
75
 
class AWSAccount:
76
 
    """Represent the credentials of an AWS account."""
77
 
 
78
 
    @classmethod
79
 
    @contextmanager
80
 
    def manager_from_config(cls, config, region=None):
81
 
        """Create an AWSAccount from a juju environment dict."""
82
 
        euca_environ = get_euca_env(config)
83
 
        if region is None:
84
 
            region = config["region"]
85
 
        client = ec2.connect_to_region(
86
 
            region, aws_access_key_id=euca_environ['EC2_ACCESS_KEY'],
87
 
            aws_secret_access_key=euca_environ['EC2_SECRET_KEY'])
88
 
        yield cls(euca_environ, region, client)
89
 
 
90
 
    def __init__(self, euca_environ, region, client):
91
 
        self.euca_environ = euca_environ
92
 
        self.region = region
93
 
        self.client = client
94
 
 
95
 
    def iter_security_groups(self):
96
 
        """Iterate through security groups created by juju in this account.
97
 
 
98
 
        :return: an iterator of (group-id, group-name) tuples.
99
 
        """
100
 
        groups = self.client.get_all_security_groups(
101
 
            filters={'description': 'juju group'})
102
 
        for group in groups:
103
 
            yield group.id, group.name
104
 
 
105
 
    def iter_instance_security_groups(self, instance_ids=None):
106
 
        """List the security groups used by instances in this account.
107
 
 
108
 
        :param instance_ids: If supplied, list only security groups used by
109
 
            the specified instances.
110
 
        :return: an iterator of (group-id, group-name) tuples.
111
 
        """
112
 
        log.info('Listing security groups in use.')
113
 
        reservations = self.client.get_all_instances(instance_ids=instance_ids)
114
 
        for reservation in reservations:
115
 
            for instance in reservation.instances:
116
 
                for group in instance.groups:
117
 
                    yield group.id, group.name
118
 
 
119
 
    def destroy_security_groups(self, groups):
120
 
        """Destroy the specified security groups.
121
 
 
122
 
        :return: a list of groups that could not be destroyed.
123
 
        """
124
 
        failures = []
125
 
        for group in groups:
126
 
            deleted = self.client.delete_security_group(name=group)
127
 
            if not deleted:
128
 
                failures.append(group)
129
 
        return failures
130
 
 
131
 
    def delete_detached_interfaces(self, security_groups):
132
 
        """Delete detached network interfaces for supplied groups.
133
 
 
134
 
        :param security_groups: A collection of security_group ids.
135
 
        :return: A collection of security groups which still have interfaces in
136
 
            them.
137
 
        """
138
 
        interfaces = self.client.get_all_network_interfaces(
139
 
            filters={'status': 'available'})
140
 
        unclean = set()
141
 
        for interface in interfaces:
142
 
            for group in interface.groups:
143
 
                if group.id in security_groups:
144
 
                    try:
145
 
                        interface.delete()
146
 
                    except EC2ResponseError as e:
147
 
                        if e.error_code not in (
148
 
                                'InvalidNetworkInterface.InUse',
149
 
                                'InvalidNetworkInterfaceID.NotFound'):
150
 
                            raise
151
 
                        log.info(
152
 
                            'Failed to delete interface {!r}. {}'.format(
153
 
                                interface.id, e.message))
154
 
                        unclean.update(g.id for g in interface.groups)
155
 
                    break
156
 
        return unclean
157
 
 
158
 
 
159
 
class OpenStackAccount:
160
 
    """Represent the credentials/region of an OpenStack account."""
161
 
 
162
 
    def __init__(self, username, password, tenant_name, auth_url, region_name):
163
 
        self._username = username
164
 
        self._password = password
165
 
        self._tenant_name = tenant_name
166
 
        self._auth_url = auth_url
167
 
        self._region_name = region_name
168
 
        self._client = None
169
 
 
170
 
    @classmethod
171
 
    @contextmanager
172
 
    def manager_from_config(cls, config):
173
 
        """Create an OpenStackAccount from a juju environment dict."""
174
 
        yield cls(
175
 
            config['username'], config['password'], config['tenant-name'],
176
 
            config['auth-url'], config['region'])
177
 
 
178
 
    def get_client(self):
179
 
        """Return a novaclient Client for this account."""
180
 
        from novaclient import client
181
 
        return client.Client(
182
 
            '1.1', self._username, self._password, self._tenant_name,
183
 
            self._auth_url, region_name=self._region_name,
184
 
            service_type='compute', insecure=False)
185
 
 
186
 
    @property
187
 
    def client(self):
188
 
        """A novaclient Client for this account.  May come from cache."""
189
 
        if self._client is None:
190
 
            self._client = self.get_client()
191
 
        return self._client
192
 
 
193
 
    def iter_security_groups(self):
194
 
        """Iterate through security groups created by juju in this account.
195
 
 
196
 
        :return: an iterator of (group-id, group-name) tuples.
197
 
        """
198
 
        return ((g.id, g.name) for g in self.client.security_groups.list()
199
 
                if g.description == 'juju group')
200
 
 
201
 
    def iter_instance_security_groups(self, instance_ids=None):
202
 
        """List the security groups used by instances in this account.
203
 
 
204
 
        :param instance_ids: If supplied, list only security groups used by
205
 
            the specified instances.
206
 
        :return: an iterator of (group-id, group-name) tuples.
207
 
        """
208
 
        group_names = set()
209
 
        for server in self.client.servers.list():
210
 
            if instance_ids is not None and server.id not in instance_ids:
211
 
                continue
212
 
            # A server that errors before security groups are assigned will
213
 
            # have no security_groups attribute.
214
 
            groups = (getattr(server, 'security_groups', []))
215
 
            group_names.update(group['name'] for group in groups)
216
 
        return ((k, v) for k, v in self.iter_security_groups()
217
 
                if v in group_names)
218
 
 
219
 
 
220
 
class JoyentAccount:
221
 
    """Represent a Joyent account."""
222
 
 
223
 
    def __init__(self, client):
224
 
        self.client = client
225
 
 
226
 
    @classmethod
227
 
    @contextmanager
228
 
    def manager_from_config(cls, config):
229
 
        """Create a ContextManager for a JoyentAccount.
230
 
 
231
 
         Using a juju environment dict, the private key is written to a
232
 
         tmp file. Then, the Joyent client is inited with the path to the
233
 
         tmp key. The key is removed when done.
234
 
         """
235
 
        from joyent import Client
236
 
        with temp_dir() as key_dir:
237
 
            key_path = os.path.join(key_dir, 'joyent.key')
238
 
            open(key_path, 'w').write(config['private-key'])
239
 
            client = Client(
240
 
                config['sdc-url'], config['manta-user'],
241
 
                config['manta-key-id'], key_path, '')
242
 
            yield cls(client)
243
 
 
244
 
    def terminate_instances(self, instance_ids):
245
 
        """Terminate the specified instances."""
246
 
        provisioning = []
247
 
        for instance_id in instance_ids:
248
 
            machine_info = self.client._list_machines(instance_id)
249
 
            if machine_info['state'] == 'provisioning':
250
 
                provisioning.append(instance_id)
251
 
                continue
252
 
            self._terminate_instance(instance_id)
253
 
        if len(provisioning) > 0:
254
 
            raise StillProvisioning(provisioning)
255
 
 
256
 
    def _terminate_instance(self, machine_id):
257
 
        log.info('Stopping instance {}'.format(machine_id))
258
 
        self.client.stop_machine(machine_id)
259
 
        for ignored in until_timeout(30):
260
 
            stopping_machine = self.client._list_machines(machine_id)
261
 
            if stopping_machine['state'] == 'stopped':
262
 
                break
263
 
            sleep(3)
264
 
        else:
265
 
            raise Exception('Instance did not stop: {}'.format(machine_id))
266
 
        log.info('Terminating instance {}'.format(machine_id))
267
 
        self.client.delete_machine(machine_id)
268
 
 
269
 
 
270
 
def convert_to_azure_ids(client, instance_ids):
271
 
    """Return a list of ARM ids from a list juju machine instance-ids.
272
 
 
273
 
    The Juju 2 machine instance-id is not an ARM VM id, it is the non-unique
274
 
    machine name. For any juju controller, there are 2 or more machines named
275
 
    0. Using the client, the machine ids machine names can be found.
276
 
 
277
 
    See: https://bugs.launchpad.net/juju-core/+bug/1586089
278
 
 
279
 
    :param client: An EnvJujuClient instance.
280
 
    :param instance_ids: a list of Juju machine instance-ids
281
 
    :return: A list of ARM VM instance ids.
282
 
    """
283
 
    if isinstance(client, EnvJujuClient1X):
284
 
        # Juju 1.x reports the true vm instance-id.
285
 
        return instance_ids
286
 
    elif not instance_ids[0].startswith('machine'):
287
 
        log.info('Bug Lp 1586089 is fixed in {}.'.format(client.version))
288
 
        log.info('substrate.convert_to_azure_ids can be deleted.')
289
 
        return instance_ids
290
 
    models = client.get_models()['models']
291
 
    model = [m for m in models if m['name'] == client.model_name][0]
292
 
    resource_group = 'juju-{}-model-{}'.format(
293
 
        model['name'], model['model-uuid'])
294
 
    config = client.env.config
295
 
    arm_client = winazurearm.ARMClient(
296
 
        config['subscription-id'], config['application-id'],
297
 
        config['application-password'], config['tenant-id'])
298
 
    arm_client.init_services()
299
 
    resources = winazurearm.list_resources(
300
 
        arm_client, glob=resource_group, recursive=True)
301
 
    vm_ids = []
302
 
    for machine_name in instance_ids:
303
 
        rgd, vm = winazurearm.find_vm_instance(
304
 
            resources, machine_name, resource_group)
305
 
        vm_ids.append(vm.vm_id)
306
 
    return vm_ids
307
 
 
308
 
 
309
 
class AzureARMAccount:
310
 
    """Represent an Azure ARM Account."""
311
 
 
312
 
    def __init__(self, arm_client):
313
 
        """Constructor.
314
 
 
315
 
        :param arm_client: An instance of winazurearm.ARMClient.
316
 
        """
317
 
        self.arm_client = arm_client
318
 
 
319
 
    @classmethod
320
 
    @contextmanager
321
 
    def manager_from_config(cls, config):
322
 
        """A context manager for a Azure RM account.
323
 
 
324
 
        In the case of the Juju 1x, the ARM keys must be in the env's config.
325
 
        subscription_id is the same. The PEM for the SMS is ignored.
326
 
        """
327
 
        arm_client = winazurearm.ARMClient(
328
 
            config['subscription-id'], config['application-id'],
329
 
            config['application-password'], config['tenant-id'])
330
 
        arm_client.init_services()
331
 
        yield cls(arm_client)
332
 
 
333
 
    def terminate_instances(self, instance_ids):
334
 
        """Terminate the specified instances."""
335
 
        for instance_id in instance_ids:
336
 
            winazurearm.delete_instance(
337
 
                self.arm_client, instance_id, resource_group=None)
338
 
 
339
 
 
340
 
class AzureAccount:
341
 
    """Represent an Azure Account."""
342
 
 
343
 
    def __init__(self, service_client):
344
 
        """Constructor.
345
 
 
346
 
        :param service_client: An instance of
347
 
            azure.servicemanagement.ServiceManagementService.
348
 
        """
349
 
        self.service_client = service_client
350
 
 
351
 
    @classmethod
352
 
    @contextmanager
353
 
    def manager_from_config(cls, config):
354
 
        """A context manager for a AzureAccount.
355
 
 
356
 
        It writes the certificate to a temp file because the Azure client
357
 
        library requires it, then deletes the temp file when done.
358
 
        """
359
 
        from azure.servicemanagement import ServiceManagementService
360
 
        with temp_dir() as cert_dir:
361
 
            cert_file = os.path.join(cert_dir, 'azure.pem')
362
 
            open(cert_file, 'w').write(config['management-certificate'])
363
 
            service_client = ServiceManagementService(
364
 
                config['management-subscription-id'], cert_file)
365
 
            yield cls(service_client)
366
 
 
367
 
    @staticmethod
368
 
    def convert_instance_ids(instance_ids):
369
 
        """Convert juju instance ids into Azure service/role names.
370
 
 
371
 
        Return a dict mapping service name to role names.
372
 
        """
373
 
        services = {}
374
 
        for instance_id in instance_ids:
375
 
            service, role = instance_id.rsplit('-', 1)
376
 
            services.setdefault(service, set()).add(role)
377
 
        return services
378
 
 
379
 
    @contextmanager
380
 
    def terminate_instances_cxt(self, instance_ids):
381
 
        """Terminate instances in a context.
382
 
 
383
 
        This context manager requests termination, then allows the "with"
384
 
        block to happen.  When the block is exited, it waits until the
385
 
        operations complete.
386
 
 
387
 
        The strategy for terminating instances varies depending on whether all
388
 
        roles are being terminated.  If all roles are being terminated, the
389
 
        deployment and hosted service are deleted.  If not all roles are being
390
 
        terminated, the roles themselves are deleted.
391
 
        """
392
 
        converted = self.convert_instance_ids(instance_ids)
393
 
        requests = set()
394
 
        services_to_delete = set(converted.keys())
395
 
        for service, roles in converted.items():
396
 
            properties = self.service_client.get_hosted_service_properties(
397
 
                service, embed_detail=True)
398
 
            for deployment in properties.deployments:
399
 
                role_names = set(
400
 
                    d_role.role_name for d_role in deployment.role_list)
401
 
                if role_names.difference(roles) == set():
402
 
                    requests.add(self.service_client.delete_deployment(
403
 
                        service, deployment.name))
404
 
                else:
405
 
                    services_to_delete.discard(service)
406
 
                    for role in roles:
407
 
                        requests.add(
408
 
                            self.service_client.delete_role(
409
 
                                service, deployment.name, role))
410
 
        yield
411
 
        self.block_on_requests(requests)
412
 
        for service in services_to_delete:
413
 
            self.service_client.delete_hosted_service(service)
414
 
 
415
 
    def block_on_requests(self, requests):
416
 
        """Wait until the requests complete."""
417
 
        requests = set(requests)
418
 
        while len(requests) > 0:
419
 
            for request in list(requests):
420
 
                op = self.service_client.get_operation_status(
421
 
                    request.request_id)
422
 
                if op.status == 'Succeeded':
423
 
                    requests.remove(request)
424
 
 
425
 
    def terminate_instances(self, instance_ids):
426
 
        """Terminate the specified instances.
427
 
 
428
 
        See terminate_instances_cxt for details.
429
 
        """
430
 
        with self.terminate_instances_cxt(instance_ids):
431
 
            return
432
 
 
433
 
 
434
 
class MAASAccount:
435
 
    """Represent a MAAS 2.0 account."""
436
 
 
437
 
    _API_PATH = 'api/2.0/'
438
 
 
439
 
    def __init__(self, profile, url, oauth):
440
 
        self.profile = profile
441
 
        self.url = urlparse.urljoin(url, self._API_PATH)
442
 
        self.oauth = oauth
443
 
 
444
 
    def _maas(self, *args):
445
 
        """Call maas api with given arguments and parse json result."""
446
 
        output = subprocess.check_output(('maas',) + args)
447
 
        return json.loads(output)
448
 
 
449
 
    def login(self):
450
 
        """Login with the maas cli."""
451
 
        subprocess.check_call([
452
 
            'maas', 'login', self.profile, self.url, self.oauth])
453
 
 
454
 
    def logout(self):
455
 
        """Logout with the maas cli."""
456
 
        subprocess.check_call(['maas', 'logout', self.profile])
457
 
 
458
 
    def _machine_release_args(self, machine_id):
459
 
        return (self.profile, 'machine', 'release', machine_id)
460
 
 
461
 
    def terminate_instances(self, instance_ids):
462
 
        """Terminate the specified instances."""
463
 
        for instance in instance_ids:
464
 
            maas_system_id = instance.split('/')[5]
465
 
            log.info('Deleting %s.' % instance)
466
 
            self._maas(*self._machine_release_args(maas_system_id))
467
 
 
468
 
    def _list_allocated_args(self):
469
 
        return (self.profile, 'machines', 'list-allocated')
470
 
 
471
 
    def get_allocated_nodes(self):
472
 
        """Return a dict of allocated nodes with the hostname as keys."""
473
 
        nodes = self._maas(*self._list_allocated_args())
474
 
        allocated = {node['hostname']: node for node in nodes}
475
 
        return allocated
476
 
 
477
 
    def get_allocated_ips(self):
478
 
        """Return a dict of allocated ips with the hostname as keys.
479
 
 
480
 
        A maas node may have many ips. The method selects the first ip which
481
 
        is the address used for virsh access and ssh.
482
 
        """
483
 
        allocated = self.get_allocated_nodes()
484
 
        ips = {k: v['ip_addresses'][0] for k, v in allocated.items()
485
 
               if v['ip_addresses']}
486
 
        return ips
487
 
 
488
 
 
489
 
class MAAS1Account(MAASAccount):
490
 
    """Represent a MAAS 1.X account."""
491
 
 
492
 
    _API_PATH = 'api/1.0/'
493
 
 
494
 
    def _list_allocated_args(self):
495
 
        return (self.profile, 'nodes', 'list-allocated')
496
 
 
497
 
    def _machine_release_args(self, machine_id):
498
 
        return (self.profile, 'node', 'release', machine_id)
499
 
 
500
 
 
501
 
@contextmanager
502
 
def maas_account_from_config(config):
503
 
    """Create a ContextManager for either a MAASAccount or a MAAS1Account.
504
 
 
505
 
    As it's not possible to tell from the maas config which version of the api
506
 
    to use, try 2.0 and if that fails on login fallback to 1.0 instead.
507
 
    """
508
 
    args = (config['name'], config['maas-server'], config['maas-oauth'])
509
 
    manager = MAASAccount(*args)
510
 
    try:
511
 
        manager.login()
512
 
    except subprocess.CalledProcessError:
513
 
        log.info("Could not login with MAAS 2.0 API, trying 1.0")
514
 
        manager = MAAS1Account(*args)
515
 
        manager.login()
516
 
    yield manager
517
 
    manager.logout()
518
 
 
519
 
 
520
 
class LXDAccount:
521
 
    """Represent a LXD account."""
522
 
 
523
 
    def __init__(self, remote=None):
524
 
        self.remote = remote
525
 
 
526
 
    @classmethod
527
 
    @contextmanager
528
 
    def manager_from_config(cls, config):
529
 
        """Create a ContextManager for a LXDAccount."""
530
 
        remote = config.get('region', None)
531
 
        yield cls(remote=remote)
532
 
 
533
 
    def terminate_instances(self, instance_ids):
534
 
        """Terminate the specified instances."""
535
 
        for instance_id in instance_ids:
536
 
            subprocess.check_call(['lxc', 'stop', '--force', instance_id])
537
 
            if self.remote:
538
 
                instance_id = '{}:{}'.format(self.remote, instance_id)
539
 
            subprocess.check_call(['lxc', 'delete', '--force', instance_id])
540
 
 
541
 
 
542
 
@contextmanager
543
 
def make_substrate_manager(config):
544
 
    """A ContextManager that returns an Account for the config's substrate.
545
 
 
546
 
    Returns None if the substrate is not supported.
547
 
    """
548
 
    substrate_factory = {
549
 
        'ec2': AWSAccount.manager_from_config,
550
 
        'openstack': OpenStackAccount.manager_from_config,
551
 
        'rackspace': OpenStackAccount.manager_from_config,
552
 
        'joyent': JoyentAccount.manager_from_config,
553
 
        'azure': AzureAccount.manager_from_config,
554
 
        'azure-arm': AzureARMAccount.manager_from_config,
555
 
        'lxd': LXDAccount.manager_from_config,
556
 
    }
557
 
    substrate_type = config['type']
558
 
    if substrate_type == 'azure' and 'application-id' in config:
559
 
        substrate_type = 'azure-arm'
560
 
    factory = substrate_factory.get(substrate_type)
561
 
    if factory is None:
562
 
        yield None
563
 
    else:
564
 
        with factory(config) as substrate:
565
 
            yield substrate
566
 
 
567
 
 
568
 
def start_libvirt_domain(uri, domain):
569
 
    """Call virsh to start the domain.
570
 
 
571
 
    @Parms URI: The address of the libvirt service.
572
 
    @Parm domain: The name of the domain.
573
 
    """
574
 
 
575
 
    command = ['virsh', '-c', uri, 'start', domain]
576
 
    try:
577
 
        subprocess.check_output(command, stderr=subprocess.STDOUT)
578
 
    except subprocess.CalledProcessError as e:
579
 
        if 'already active' in e.output:
580
 
            return '%s is already running; nothing to do.' % domain
581
 
        raise Exception('%s failed:\n %s' % (command, e.output))
582
 
    sleep(30)
583
 
    for ignored in until_timeout(120):
584
 
        if verify_libvirt_domain(uri, domain, LIBVIRT_DOMAIN_RUNNING):
585
 
            return "%s is now running" % domain
586
 
        sleep(2)
587
 
    raise Exception('libvirt domain %s did not start.' % domain)
588
 
 
589
 
 
590
 
def stop_libvirt_domain(uri, domain):
591
 
    """Call virsh to shutdown the domain.
592
 
 
593
 
    @Parms URI: The address of the libvirt service.
594
 
    @Parm domain: The name of the domain.
595
 
    """
596
 
 
597
 
    command = ['virsh', '-c', uri, 'shutdown', domain]
598
 
    try:
599
 
        subprocess.check_output(command, stderr=subprocess.STDOUT)
600
 
    except subprocess.CalledProcessError as e:
601
 
        if 'domain is not running' in e.output:
602
 
            return ('%s is not running; nothing to do.' % domain)
603
 
        raise Exception('%s failed:\n %s' % (command, e.output))
604
 
    sleep(30)
605
 
    for ignored in until_timeout(120):
606
 
        if verify_libvirt_domain(uri, domain, LIBVIRT_DOMAIN_SHUT_OFF):
607
 
            return "%s is now shut off" % domain
608
 
        sleep(2)
609
 
    raise Exception('libvirt domain %s is not shut off.' % domain)
610
 
 
611
 
 
612
 
def verify_libvirt_domain(uri, domain, state=LIBVIRT_DOMAIN_RUNNING):
613
 
    """Returns a bool based on if the domain is in the given state.
614
 
 
615
 
    @Parms URI: The address of the libvirt service.
616
 
    @Parm domain: The name of the domain.
617
 
    @Parm state: The state to verify (e.g. "running or "shut off").
618
 
    """
619
 
 
620
 
    dom_status = get_libvirt_domstate(uri, domain)
621
 
    return state in dom_status
622
 
 
623
 
 
624
 
def get_libvirt_domstate(uri, domain):
625
 
    """Call virsh to get the state of the given domain.
626
 
 
627
 
    @Parms URI: The address of the libvirt service.
628
 
    @Parm domain: The name of the domain.
629
 
    """
630
 
 
631
 
    command = ['virsh', '-c', uri, 'domstate', domain]
632
 
    try:
633
 
        sub_output = subprocess.check_output(command)
634
 
    except subprocess.CalledProcessError:
635
 
        raise Exception('%s failed' % command)
636
 
    return sub_output
637
 
 
638
 
 
639
 
def parse_euca(euca_output):
640
 
    for line in euca_output.splitlines():
641
 
        fields = line.split('\t')
642
 
        if fields[0] != 'INSTANCE':
643
 
            continue
644
 
        yield fields[1], fields[3]
645
 
 
646
 
 
647
 
def run_instances(count, job_name, series, region=None):
648
 
    """create a number of instances in ec2 and tag them.
649
 
 
650
 
    :param count: The number of instances to create.
651
 
    :param job_name: The name of job that owns the instances (used as a tag).
652
 
    :param series: The series to run in the instance.
653
 
        If None, Precise will be used.
654
 
    """
655
 
    if series is None:
656
 
        series = 'precise'
657
 
    environ = dict(os.environ)
658
 
    ami = get_ami.query_ami(series, "amd64", region=region)
659
 
    command = [
660
 
        'euca-run-instances', '-k', 'id_rsa', '-n', '%d' % count,
661
 
        '-t', 'm1.large', '-g', 'manual-juju-test', ami]
662
 
    run_output = subprocess.check_output(command, env=environ).strip()
663
 
    machine_ids = dict(parse_euca(run_output)).keys()
664
 
    for remaining in until_timeout(300):
665
 
        try:
666
 
            names = dict(describe_instances(machine_ids, env=environ))
667
 
            if '' not in names.values():
668
 
                subprocess.check_call(
669
 
                    ['euca-create-tags', '--tag', 'job_name=%s' % job_name] +
670
 
                    machine_ids, env=environ)
671
 
                return names.items()
672
 
        except subprocess.CalledProcessError:
673
 
            subprocess.call(['euca-terminate-instances'] + machine_ids)
674
 
            raise
675
 
        sleep(1)
676
 
 
677
 
 
678
 
def describe_instances(instances=None, running=False, job_name=None,
679
 
                       env=None):
680
 
    command = ['euca-describe-instances']
681
 
    if job_name is not None:
682
 
        command.extend(['--filter', 'tag:job_name=%s' % job_name])
683
 
    if running:
684
 
        command.extend(['--filter', 'instance-state-name=running'])
685
 
    if instances is not None:
686
 
        command.extend(instances)
687
 
    log.info(' '.join(command))
688
 
    return parse_euca(subprocess.check_output(command, env=env))
689
 
 
690
 
 
691
 
def get_job_instances(job_name):
692
 
    description = describe_instances(job_name=job_name, running=True)
693
 
    return (machine_id for machine_id, name in description)
694
 
 
695
 
 
696
 
def destroy_job_instances(job_name):
697
 
    instances = list(get_job_instances(job_name))
698
 
    if len(instances) == 0:
699
 
        return
700
 
    subprocess.check_call(['euca-terminate-instances'] + instances)
701
 
 
702
 
 
703
 
def resolve_remote_dns_names(env, remote_machines):
704
 
    """Update addresses of given remote_machines as needed by providers."""
705
 
    if env.config['type'] != 'maas':
706
 
        # Only MAAS requires special handling at prsent.
707
 
        return
708
 
    # MAAS hostnames are not resolvable, but we can adapt them to IPs.
709
 
    with maas_account_from_config(env.config) as account:
710
 
        allocated_ips = account.get_allocated_ips()
711
 
    for remote in remote_machines:
712
 
        if remote.get_address() in allocated_ips:
713
 
            remote.update_address(allocated_ips[remote.address])