~techalchemy/mojo/python3-fixes

« back to all changes in this revision

Viewing changes to mojo/phase.py

  • Committer: daniel.manrique at canonical
  • Date: 2019-04-11 20:11:10 UTC
  • mto: This revision was merged to the branch mainline in revision 496.
  • Revision ID: daniel.manrique@canonical.com-20190411201110-z3kw6y370u2yixse
Parallel execution of nagios check tasks.

Instead of waiting for one machine to respond to move on to the next, we dispatch
all the "juju ssh check-nagios-command" calls in parallel and report on them as the
results arrive.

This is functionally equivalent to serial execution and is much faster.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
import string
13
13
import sys
14
14
import yaml
 
15
from concurrent.futures import ThreadPoolExecutor, as_completed
15
16
from .utils import (
16
17
    bicommand,
17
18
    chdir,
1251
1252
        all_services = juju_status.services_list()
1252
1253
        if not services:
1253
1254
            services = all_services
1254
 
        for service in services:
1255
 
            if service not in all_services:
1256
 
                logging.error("Service {} not found in {}".format(service, ", ".join(all_services)))
 
1255
        jobs = []
 
1256
        max_workers = sum([len(juju_status.service_machine_numbers(svc)) for svc in services]) 
 
1257
        logging.info("%s parallel nagios check workers", max_workers)
 
1258
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
 
1259
            for service in services:
 
1260
                if service not in all_services:
 
1261
                    logging.error("Service {} not found in {}".format(service, ", ".join(all_services)))
 
1262
                    nagios_error = True
 
1263
                    continue
 
1264
                service_machine_ids = juju_status.service_machine_numbers(service)
 
1265
                if service_machine_ids:
 
1266
                    machine_ids.extend(service_machine_ids)
 
1267
                    for unit in juju_status.service_units(service):
 
1268
                        jobs.append(executor.submit(self.run_nagios_checks, unit, skip_checks))
 
1269
                        print("Dispatched one job for %s" % unit)
 
1270
        for future in as_completed(jobs):
 
1271
            error_output, ok_output = future.result()
 
1272
            if ok_output:
 
1273
                logging.info(ok_output)
 
1274
            if error_output:
 
1275
                logging.error(error_output)
1257
1276
                nagios_error = True
1258
 
                continue
1259
 
            service_machine_ids = juju_status.service_machine_numbers(service)
1260
 
            if service_machine_ids:
1261
 
                machine_ids.extend(service_machine_ids)
1262
 
                for unit in juju_status.service_units(service):
1263
 
                    error_output, ok_output = self.run_nagios_checks(unit, skip_checks)
1264
 
                    if ok_output:
1265
 
                        logging.info(ok_output)
1266
 
                    if error_output:
1267
 
                        logging.error(error_output)
1268
 
                        nagios_error = True
1269
1277
        if services == all_services:
1270
1278
            # We're not looking at a specific set of services, so need to warn about machines we've not checked
1271
1279
            for juju_status_machine_id in juju_status.machine_ids_list():