~mojo-maintainers/mojo/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
from __future__ import division, print_function
import datetime
import logging
import os
import subprocess
import time
import yaml

from collections import Counter

from .wait import wait, JujuWaitException
from ..shutil_which import get_command

# We can't use the implementation in .utils due to a circular import
juju_path, _ = get_command("juju")

try:
    with open(os.devnull, "w") as devnull:
        version = subprocess.check_output([juju_path, "version"], stderr=devnull, universal_newlines=True)
    major_version = int(version[0])
except subprocess.CalledProcessError:
    # In cases where the Juju Status yaml is provided by a file this may legitimately fail
    version = None
    major_version = None


class JujuStatusError(Exception):
    pass


class JujuStatusCommandTimedOutError(JujuStatusError):
    pass


def check_output_with_timeout(command, command_timeout, timeout_exception):
    """Run COMMAND and kill it it doesn't return after COMMAND_TIMEOUT seconds, raising TIMEOUT_EXCEPTION."""
    cmd = []

    if command_timeout is not None:
        cmd.extend(["/usr/bin/timeout", "--kill-after", "5", str(int(command_timeout))])

    cmd.extend(command)
    try:
        return subprocess.check_output(cmd).decode("utf-8")
    except subprocess.CalledProcessError as cpe:
        if command_timeout is not None:
            # magic numbers here as per timeout(1)
            if cpe.returncode == 124:  # exited w/o a fuss
                raise timeout_exception('"{}" timed out after {} seconds'.format(" ".join(command), command_timeout))
            if cpe.returncode == (128 + 9):  # SIGKILL was sent
                raise timeout_exception(
                    '"{}" timed out after {} seconds (killed)'.format(" ".join(command), command_timeout)
                )
        raise


class JujuStatusBase(object):
    """An Abstract base class for juju status implementations.
    This class defines the basic methods that are common across all versions and methods needing implementation.
    Any method needing an implementation raises NotImplementedError in this class.
    """

    def __init__(self, environment=None):
        self.environment = environment

    def check_and_wait(self, wait_for_steady=False, timeout=1800, max_wait=None, additional_ready_states=None):
        raise NotImplementedError()

    def machine_ids_list(self):
        return sorted(self.yaml_status()["machines"].keys())

    def machine_and_container_ids_list(self):
        machine_and_container_ids = []
        for machine in self.machine_ids_list():
            machine_and_container_ids.append(machine)
            try:
                containers = sorted(self.yaml_status()["machines"][machine]["containers"].keys())
            except KeyError:
                containers = []
            machine_and_container_ids.extend(containers)
        return machine_and_container_ids

    def machine_instance_id(self, machine_num):
        raise NotImplementedError()

    def service_units(self, service):
        raise NotImplementedError()

    def services_list(self, exclude_subordinates=True):
        raise NotImplementedError()

    def service_machine_numbers(self, service):
        raise NotImplementedError()

    def status(self, force_update=False):
        """
        Args:
            force_update: If true update from Juju

        Returns: The raw output of the `juju status --format=yaml` command as a string.
        """
        raise NotImplementedError()

    def yaml_status(self, force_update=False):
        """
        Args:
            force_update: If true update from Juju
        Returns: The output `juju status --format=yaml` after parsing by pyyaml.
        """
        raise NotImplementedError()


class Juju1Status(JujuStatusBase):
    JUJU_STATUSES = {
        "good": ["ACTIVE", "started"],
        "bad": ["error"],
        "transitional": ["pending", "down", "installed", "stopped"],
    }

    def __init__(self, environment=None, additional_ready_states=None, command_timeout=600):
        if additional_ready_states is None:
            additional_ready_states = []
        JujuStatusBase.__init__(self, environment)
        self._status_output = None
        self._command_timeout = command_timeout

        # Update our good, bad, transitional states if appropriate
        if additional_ready_states:
            for state in additional_ready_states:
                if state not in self.JUJU_STATUSES["good"]:
                    self.JUJU_STATUSES["good"].append(state)
                for error_state in ["bad", "transitional"]:
                    if state in self.JUJU_STATUSES[error_state]:
                        self.JUJU_STATUSES[error_state].remove(state)

    def _get_status(self, force_update=False):
        if self._status_output and not force_update:
            return self._status_output
        cmd = [juju_path, "status", "--format=yaml"]
        if self.environment:
            cmd.extend(["-e", self.environment])
        self._status_output = check_output_with_timeout(cmd, self._command_timeout, JujuStatusCommandTimedOutError)
        return self._status_output

    def _get_machine_state(self, state_type):
        states = Counter()
        for machine_no in self.yaml_status()["machines"]:
            if state_type in self.yaml_status()["machines"][machine_no]:
                state = self.yaml_status()["machines"][machine_no][state_type]
            else:
                state = "unknown"
            states[state] += 1
        return states

    def _get_machine_agent_states(self):
        return self._get_machine_state("agent-state")

    def _get_machine_instance_states(self):
        return self._get_machine_state("instance-state")

    def _get_service_agent_states(self):
        service_state = Counter()
        for service in self.yaml_status()["services"]:
            if "units" in self.yaml_status()["services"][service]:
                for unit in self.yaml_status()["services"][service]["units"]:
                    unit_info = self.yaml_status()["services"][service]["units"][unit]
                    service_state[unit_info["agent-state"]] += 1
                    if "subordinates" in unit_info:
                        for sub_unit in unit_info["subordinates"]:
                            sub_sstate = unit_info["subordinates"][sub_unit]["agent-state"]
                            service_state[sub_sstate] += 1
        return service_state

    def _get_units_in_state(self, state):
        unit_names = []
        for service in self.yaml_status()["services"]:
            if "units" in self.yaml_status()["services"][service]:
                for unit in self.yaml_status()["services"][service]["units"]:
                    unit_info = self.yaml_status()["services"][service]["units"][unit]
                    if unit_info["agent-state"] == state:
                        unit_names.append(unit)
                    if "subordinates" in unit_info:
                        for sub_unit in unit_info["subordinates"]:
                            sub_sstate = unit_info["subordinates"][sub_unit]["agent-state"]
                            if sub_sstate == state:
                                unit_names.append(sub_unit)
        return unit_names

    def check_and_wait(
        self, wait_for_steady=False, timeout=1800, max_wait=None, additional_ready_states=None, wait_for_workload=False
    ):
        if additional_ready_states is None:
            additional_ready_states = []
        checks = {
            "Machines": [
                {"Heading": "Instance State", "check_func": self._get_machine_instance_states},
                {"Heading": "Agent State", "check_func": self._get_machine_agent_states},
            ],
            "Services": [{"Heading": "Agent State", "check_func": self._get_service_agent_states}],
        }
        check_and_wait_start = datetime.datetime.now()
        while True:
            check_and_wait_running = datetime.datetime.now() - check_and_wait_start
            if check_and_wait_running.total_seconds() > int(timeout):
                logging.error(subprocess.check_output([juju_path, "status", "--format=tabular"]).decode("utf-8"))
                raise JujuStatusError("Timed out checking Juju status for stable state")
            stable_state = []
            for check_info in checks.values():
                for check in check_info:
                    check_function = check["check_func"]
                    states = check_function()
                    if self._error_check(states):
                        logging.error(
                            subprocess.check_output([juju_path, "status", "--format=tabular"]).decode("utf-8")
                        )
                        for unit in self._get_units_in_state("error"):
                            logging.error("Last 200 lines in %s unit log", unit)
                            logging.error(
                                subprocess.check_output(
                                    [
                                        juju_path,
                                        "ssh",
                                        unit,
                                        "sudo",
                                        "tail",
                                        "-200",
                                        "/var/log/juju/unit-%s.log" % unit.replace("/", "-"),
                                    ]
                                ).decode("utf-8")
                            )
                        raise JujuStatusError(
                            "Error in juju status on {}".format(", ".join(self._get_units_in_state("error")))
                        )
                    stable_state.append(self._all_stable(states))
            if False not in stable_state:
                break
            time.sleep(5)
            self._get_status(force_update=True)
        for juju_objtype, check_info in checks.items():
            for check in check_info:
                check_function = check["check_func"]
                states = check_function()
                self._summary(juju_objtype, check["Heading"], states)

        if wait_for_steady:
            logging.info("Waiting for environment to reach steady state")
            wait(max_wait=max_wait, wait_for_workload=wait_for_workload)
            logging.info("Environment has reached steady state")

    def _all_stable(self, states):
        for state in states:
            if state in self.JUJU_STATUSES["transitional"]:
                logging.info("Some statuses are in a transitional state")
                return False
        logging.info("Statuses are in a stable state")
        return True

    def _error_check(self, states):
        for state in states:
            if state in self.JUJU_STATUSES["bad"]:
                logging.error("Some statuses are in a bad state")
                return True
        logging.info("No statuses are in a bad state")
        return False

    def _summary(self, heading, statetype, states):
        output = [heading]
        output.append("   " + statetype)
        for state in states:
            output.append("    %s: %i" % (state, states[state]))
        logging.info("\n".join(output))

    def _service_info(self, svc):
        if svc == "ALL":
            units = {}
            for service in self.yaml_status()["services"].values():
                for unit, data in service.get("units", {}).items():
                    units[unit] = data
        elif svc not in self.yaml_status()["services"]:
            return None
        else:
            service = self.yaml_status()["services"][svc]
            units = service.get("units", {})
        return units

    def charm_location(self, service):
        """Get charm location.

        Not implemented for Juju 1."""
        return None

    def machine(self, machine_num):
        if machine_num not in self.yaml_status()["machines"]:
            return None
        return self.yaml_status()["machines"][machine_num]

    def machine_ip(self, machine_num):
        machine = self.machine(machine_num)
        if machine is None:
            return None
        return machine.get("dns-name")

    def machine_instance_id(self, machine_num):
        machine = self.machine(machine_num)
        if machine is None:
            return None
        return machine.get("instance-id")

    def service_instance_ids(self, service):
        machine_numbers = self.service_machine_numbers(service)
        if machine_numbers is None:
            return None
        elif not machine_numbers:
            return []
        return [self.machine_instance_id(m) for m in machine_numbers]

    def service_ips(self, service):
        units = self._service_info(service)
        if units:
            return [u.get("public-address") for u in units.values() if u.get("public-address")]
        return units

    def service_units(self, service):
        units = self._service_info(service)
        if units:
            return list(units.keys())
        return units

    def service_machine_numbers(self, service):
        units = self._service_info(service)
        if units:
            return [u.get("machine") for u in units.values() if u.get("machine")]
        return units

    def services_list(self, exclude_subordinates=True):
        services_list = []
        for service in self.yaml_status()["services"]:
            if not exclude_subordinates or "units" in self.yaml_status()["services"][service]:
                services_list.append(service)
        return sorted(services_list)

    def status(self, force_update=False):
        return self._get_status(force_update)

    def yaml_status(self, force_update=False):
        yaml_source = self._get_status(force_update)
        if getattr(self, "_status_output_yaml_source", None) != yaml_source:
            self._status_output_yaml = yaml.safe_load(yaml_source)
            self._status_output_yaml_source = yaml_source
        return self._status_output_yaml


class Juju2Status(JujuStatusBase):
    # From https://github.com/juju/juju/blob/master/status/status.go
    JUJU_STATUS = {
        "ready": ["active", "idle", "running", "started", "unknown"],
        "busy": [
            "allocating",
            "lost",
            "down",
            "blocked",
            "executing",
            "maintenance",
            "pending",
            "rebooting",
            "waiting",
        ],
        "error": ["error", "failed", "provisioning error", "stopped", "terminated"],
    }

    def __init__(self, environment=None, status_yaml=None, additional_ready_states=None, command_timeout=600):
        """
        Args:
            environment: Known in Juju2 as a model but kept as environment during init to retain compatibility.
            status_yaml: Often the status is pulled from the local Juju model but the output of
                         'juju status --format=yaml'can be optionally specified.
            additional_ready_states: per pad.lv/1645784 sometimes we want to consider blocked state as ready
            command_timeout: Kill "juju status" and give up if this amount of time elapses.
        """
        JujuStatusBase.__init__(self, environment)
        if additional_ready_states is None:
            additional_ready_states = []
        self.model = self.environment
        self._raw_status = status_yaml
        self._command_timeout = command_timeout
        self._cached_status = None

        # Verify using Juju2+
        if status_yaml is None:
            if major_version < 2:
                raise JujuStatusError("This class only supports Juju version 2+")
        else:
            if self.model_version()[0] < "2":
                raise JujuStatusError("This class only supports Juju version 2+")

        # Update our ready, busy, error states if appropriate
        if additional_ready_states:
            for state in additional_ready_states:
                if state not in self.JUJU_STATUS["ready"]:
                    self.JUJU_STATUS["ready"].append(state)
                for error_state in ["busy", "error"]:
                    if state in self.JUJU_STATUS[error_state]:
                        self.JUJU_STATUS[error_state].remove(state)

    def _check_ready(self, status, status_name):
        """Check a status value against self.JUJU_STATUS and raise JujuStatusError if it is an error.
        Args:
            status: The status value to check
            status_name: The name of the status value, used in logging and exceptions.
        Returns: True if status is ready or unknown, false if status is busy.
        """
        if status in self.JUJU_STATUS["error"]:
            raise JujuStatusError("{} is in status {}".format(status_name, status))
        if status in self.JUJU_STATUS["busy"]:
            return False
        if status not in self.JUJU_STATUS["ready"]:
            logging.warning("{} status {} is unknown".format(status_name, status))

        return True

    def _get_units(self, application):
        app = self.yaml_status()["applications"].get(application)
        if app is None:
            return None

        units = app.get("units")

        return units

    def application_ips(self, application):
        units = self._get_units(application)
        if units is None:
            return []

        return [u.get("public-address") for u in units.values() if u.get("public-address")]

    def application_machine_numbers(self, application):
        units = self._get_units(application)
        if units is None:
            return None

        return [u.get("machine") for u in units.values() if u.get("machine")]

    def applications_list(self, exclude_subordinates=True):
        applications = []
        for app in self.yaml_status()["applications"].keys():
            if exclude_subordinates and "subordinate-to" in self.yaml_status()["applications"][app]:
                continue
            applications.append(app)

        return sorted(applications)

    def application_units(self, application):
        units = self._get_units(application)
        if units is None:
            return None

        return list(units.keys())

    def applications_ready(self):
        """Check status of all applications, raise JujuStatusError for an error state.
        Returns: True if the applications in the status are in a good state, False if they are busy.
        """
        applications = self.yaml_status()["applications"]

        for name in applications:
            application = applications[name]
            try:
                if not self._check_ready(application["application-status"]["current"], name):
                    return False
            except KeyError:
                # Juju may at times return `application-status: {}` per lp#1847117
                return False

        return True

    def charm_location(self, application):
        """Get the location of the charm if possible, or return None."""
        app = self.yaml_status()["applications"].get(application)
        if app is None:
            return None

        if app["charm"].startswith("cs:"):
            return app["charm"]
        return None

    def check_and_wait(self, wait_for_steady=False, max_wait=None, timeout=1800, wait_for_workload=False):
        """Wait for the current status of Juju to become ready (e.g. not blocked or in maintenance) and raise
            JujuStatusError on error state. Optionally at the end wait for the current status to become steady
            (i.e. not running any hook).
        Args:
            wait_for_steady: Whether or not to wait for steadiness at the end.
            max_wait: If defined used instead of timeout when waiting for steadiness.
            timeout: Max amount of time to wait for readiness. Raise JujuWaitException after timeout. Also Max
                amount of time to wait for steadiness if max_wait is None.
            wait_for_workload: Whether to wait for the workload to reach an active state.
        Returns: None
        """
        logging.info(
            "Waiting up to {} seconds for environment to become ready"
            " (not blocked or in maintenance)".format(timeout)
        )
        check_and_wait_start = datetime.datetime.now()
        while True:
            check_and_wait_running = datetime.datetime.now() - check_and_wait_start
            if check_and_wait_running.total_seconds() > int(timeout):
                raise JujuWaitException("Timed out checking Juju status for stable state")
            self.status(force_update=True)
            if self.ready():  # self.ready() will raise exceptions on error states
                break
            time.sleep(5)
        logging.info("Environment is ready (not blocked or in maintenance)")

        if not wait_for_steady:
            logging.info("Not waiting for environment to reach steady state (not running any hook)")
            return

        if max_wait is None:
            max_wait = timeout

        logging.info(
            "Waiting up to {} seconds for environment to reach steady state (not running any hook), "
            "with wait-for-workload set to {}".format(max_wait, wait_for_workload)
        )
        wait(max_wait=max_wait, wait_for_workload=wait_for_workload)
        logging.info("Environment has reached steady state (not running any hook)")

    def controller_version(self):
        """Parses the output of 'juju controllers' returning the controller version for the current controller.
        Returns: The controller version reported by Juju
        """
        controllers = {}
        controllers = yaml.safe_load(
            subprocess.check_output([juju_path, "controllers", "--format", "yaml"], universal_newlines=True)
        )
        current_controller = controllers["current-controller"]
        return controllers["controllers"][current_controller]["agent-version"]

    def machine_instance_id(self, machine_num):
        machine = self.yaml_status()["machines"].get(machine_num)
        if machine is None:
            return None
        return machine.get("instance-id")

    def machines_ready(self):
        """Check status of all machines, raise JujuStatusError for an error state.
        Returns: True if the machines in the status are in a good state, False if they are busy.
        """
        machines = self.yaml_status()["machines"]

        for num in machines:
            machine = machines[num]
            if not self._check_ready(machine["machine-status"]["current"], "machine {} machine-status".format(num)):
                return False

            if not self._check_ready(machine["juju-status"]["current"], "machine {} juju-status".format(num)):
                return False

        return True

    def model_version(self):
        return self.yaml_status()["model"]["version"]

    def ready(self):
        """Check the current status of Juju raise JujuStatusError on error state.
        Returns: True if all statuses are in a good state, False if they are busy.
        """
        machines_ready = self.machines_ready()
        applications_ready = self.applications_ready()
        units_ready = self.units_ready()
        subordinates_ready = self.subordinates_ready()

        return machines_ready and applications_ready and units_ready and subordinates_ready

    def services_list(self, exclude_subordinates=True):
        """In Juju2 a service is now known as an application, keeping this method for compatibility."""
        return self.applications_list(exclude_subordinates)

    def service_machine_numbers(self, service):
        """In Juju2 a service is now known as an application, keeping this method for compatibility."""
        return self.application_machine_numbers(service)

    def service_units(self, service):
        """In Juju2 a service is now known as an application, keeping this method for compatibility."""
        return self.application_units(service)

    def status(self, force_update=False):
        """
        Args:
            force_update: If true update from Juju.
        Returns: The raw output of the `juju status --format=yaml` command as a string.
        """
        if self._raw_status is None or force_update:
            cmd = [juju_path, "status", "--format=yaml"]
            if self.model is not None:
                cmd.extend(["-m", self.model])
            self._raw_status = check_output_with_timeout(cmd, self._command_timeout, JujuStatusCommandTimedOutError)
            # Clear cached status, so subsequent calls to yaml_status will get
            # updated information.
            self._cached_status = None
        return self._raw_status

    def subordinates_ready(self):
        """Check status of all subordinate units, raise JujuStatusError for an error state.
        Returns: True if the subordinate in the status are in a good state, False if they are busy.
        """
        applications = self.yaml_status()["applications"]

        for app in applications:
            units = self._get_units(app)
            if units is None:
                continue
            subordinates = {}
            for unit_name in units:
                s = units[unit_name].get("subordinates")
                if s is not None:
                    subordinates.update(s)

            for name in subordinates:
                subordinate = subordinates[name]

                try:
                    ws = subordinate["workload-status"]["current"]
                except KeyError:
                    # Juju may at times return `workload-status: {}` per lp#1851439
                    return False
                if not self._check_ready(ws, "Unit {} workload-status".format(name)):
                    return False

                if not self._check_ready(subordinate["juju-status"]["current"], "Unit {} juju-status".format(name)):
                    return False

        return True

    def units_ready(self):
        """Check status of all units, raise JujuStatusError for an error state.
        Returns: True if the units in the status are in a good state, False if they are busy.
        """
        applications = self.yaml_status()["applications"]

        for app in applications:
            units = self._get_units(app)
            if units is None:
                continue
            for name in units:
                unit = units[name]

                if not self._check_ready(unit["workload-status"]["current"], "Unit {} workload-status".format(name)):
                    return False

                if not self._check_ready(unit["juju-status"]["current"], "Unit {} juju-status".format(name)):
                    return False

        return True

    def yaml_status(self, force_update=False):
        """
        Args:
            force_update: If true update from Juju.
        Returns: The output `juju status --format=yaml` after parsing by pyyaml.
        """
        if not self._cached_status or force_update:
            self._cached_status = yaml.safe_load(self.status(force_update))
        return self._cached_status


Status = Juju1Status if major_version == 1 else Juju2Status