~james-w/udd/management-commands

« back to all changes in this revision

Viewing changes to udd/scripts/mass_import.py

  • Committer: James Westby
  • Date: 2011-12-13 21:09:23 UTC
  • mfrom: (557.1.1 drop_email_failures)
  • Revision ID: james.westby@canonical.com-20111213210923-tfrirlx3xbwmi70u
Merged drop_email_failures into management-commands.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
from stat import ST_DEV, ST_INO
8
8
import sys
9
9
 
 
10
from bzrlib import cmdline
10
11
from launchpadlib.errors import HTTPError
11
12
 
12
13
from udd import (
15
16
    iconfig,
16
17
    threads,
17
18
    )
18
 
from udd.paths import paths
19
19
 
20
20
 
21
21
class WatchedFileHandler(logging.FileHandler):
86
86
    logger = logging.getLogger(__name__)
87
87
    logger.setLevel(logging.DEBUG)
88
88
 
89
 
    conf = iconfig.Iconfig()
90
 
    log_debug_path = conf.get('pkgimport.driver.log.debug')
91
 
    log_progress_path = conf.get('pkgimport.driver.log.progress')
 
89
    conf = iconfig.ImporterStack()
 
90
    log_debug_path = conf.get('pi.driver.log.debug')
 
91
    log_progress_path = conf.get('pi.driver.log.progress')
92
92
    for p in (log_debug_path, log_progress_path):
93
93
        dir_path = os.path.dirname(p)
94
94
        icommon.ensure_directory(dir_path)
110
110
    return logger
111
111
 
112
112
 
113
 
 
114
113
def report_exception(logger, component, exc):
115
114
    import traceback
116
115
    exc_class, exc_value, exc_tb = exc
121
120
 
122
121
class Importer(threads.SubprocessMonitor):
123
122
 
124
 
    def __init__(self, logger, args, package_name, job_id):
 
123
    def __init__(self, logger, package_name, job_id, _command=None):
125
124
        """Spawn and monitor an import.
126
125
 
127
 
        :param args: The script and args to be used to do the import (including
128
 
            the package name).
129
 
 
130
126
        :param package_name: The name of the package to import (for display
131
127
            purposes).
132
128
 
133
129
        :param job_id: The status db job id for this import.
134
130
        """
 
131
        imp_conf = iconfig.ImporterStack()
 
132
        if _command is None:
 
133
            _command = imp_conf.get('pi.import_command', expand=False)
 
134
        _command = imp_conf.expand_options(_command,
 
135
                                           env=dict(package=package_name))
135
136
        conf = iconfig.PackageStack(package_name)
136
137
        super(Importer, self).__init__(
137
 
            args, max_duration=conf.get('pkgimport.max_duration'),
138
 
            grace_period=conf.get('pkgimport.kill.grace_period'))
 
138
            cmdline.split(_command),
 
139
            max_duration=conf.get('pi.max_duration'),
 
140
            grace_period=conf.get('pi.kill.grace_period'))
139
141
        self.logger = logger
140
142
        self.package_name = package_name
141
143
        self.job_id = job_id
142
 
        self.status_db = icommon.StatusDatabase(paths.sqlite_file)
 
144
        self.status_db = icommon.StatusDatabase(
 
145
            iconfig.ImporterStack().get('pi.sqlite_file'))
143
146
        self.success = None
144
147
        self.failure_sig = None
145
148
 
182
185
    sure that it is needed.
183
186
    """
184
187
 
185
 
    def __init__(self, logger, script_path):
 
188
    def __init__(self, logger):
186
189
        """Construct an `AllQueue`.
187
190
 
188
191
        :param script_path: The path to the script to run to import the
190
193
        """
191
194
        Queue.Queue.__init__(self)
192
195
        self.logger = logger
193
 
        self.script_path = script_path
194
196
        self.tried = set()
195
 
        self.packages_db = icommon.PackageDatabase(paths.sqlite_package_file)
196
 
        self.status_db = icommon.StatusDatabase(paths.sqlite_file)
 
197
        conf = iconfig.ImporterStack()
 
198
        self.packages_db = icommon.PackageDatabase(
 
199
            conf.get('pi.sqlite_package_file'))
 
200
        self.status_db = icommon.StatusDatabase(conf.get('pi.sqlite_file'))
197
201
 
198
202
    def get_nowait(self):
199
203
        # FIXME: It's bogus to implement get_nowait() this way (without
205
209
        except Queue.Empty:
206
210
            self.unfinished_tasks += 1
207
211
            job_id, package_name = self.next_job()
208
 
        return Importer(self.logger, [self.script_path, package_name],
209
 
                        package_name, job_id)
 
212
        return Importer(self.logger, package_name, job_id)
210
213
 
211
214
    def next_job(self):
212
215
        while True:
255
258
    def __init__(self, logger, threshold, delay):
256
259
        super(LaunchpadCircuitBreaker, self).__init__(threshold, delay)
257
260
        self.logger = logger
258
 
        self.status_db = icommon.StatusDatabase(paths.sqlite_file)
 
261
        conf = iconfig.ImporterStack()
 
262
        self.status_db = icommon.StatusDatabase(conf.get('pi.sqlite_file'))
259
263
        # For log purposes, not to be confused with the breaker state itself
260
264
        # which is authoritative.
261
265
        self.lp_state = LP_UP
320
324
 
321
325
    def before_start(self):
322
326
        super(ImportDriver, self).before_start()
323
 
        # XXX: Probably not the best place to put this code, but what the
324
 
        # heck! jml
325
 
        conf = iconfig.Iconfig()
326
 
        script_path = conf.get('pkgimport.import_script')
327
 
        self.queue = AllQueue(self.logger, script_path)
 
327
        self.queue = AllQueue(self.logger)
328
328
        # Launchpad downtimes should be less than 5 minutes (300 seconds).
329
329
        self.circuit_breaker = LaunchpadCircuitBreaker(self.logger, 0, 300)
330
330
 
381
381
    """Monitor the ImportDriver and interpret signals."""
382
382
 
383
383
    def __init__(self, logger):
384
 
        self.status_db = icommon.StatusDatabase(paths.sqlite_file)
 
384
        self.conf = iconfig.ImporterStack()
 
385
        self.status_db = icommon.StatusDatabase(self.conf.get('pi.sqlite_file'))
385
386
        # Start with an 8-way Driver
386
387
        self.driver = ImportDriver(logger, 8)
387
388
        self.logger = logger
388
 
 
389
 
    def run(self, retry_all=False):
390
 
        lock = icommon.lock_main()
 
389
        self.stop_file = self.conf.get('pi.stop_file')
 
390
        self.max_threads_file = self.conf.get('pi.max_threads_file')
 
391
 
 
392
 
 
393
    def run(self):
 
394
        lock = icommon.lock_path(self.conf.get('pi.base_dir'), 'mass_import')
391
395
        if lock is None:
392
396
            self.logger.info("Another main process is running")
393
397
            raise Stop
394
398
        try:
395
 
            if retry_all:
 
399
            if self.conf.get('pi.retry_all_failed_jobs'):
396
400
                # We have been configured to retry _all_ failed jobs.
397
401
                self.status_db.add_failed_jobs()
398
402
            else:
411
415
                if self.driver.stopped.isSet():
412
416
                    break
413
417
                # Then check the graceful stop file
414
 
                if (os.path.exists(paths.stop_file)
 
418
                if (os.path.exists(self.stop_file)
415
419
                    and not self.driver.queue_closed.isSet()):
416
420
                    self.driver.queue_closed.set()
417
421
                    self.logger.info('Driver would not process new requests')
420
424
                nb_threads = self.get_max_threads()
421
425
                if self.driver.max_threads != nb_threads:
422
426
                    self.logger.info('Read %d in %s'
423
 
                                     % (nb_threads, paths.max_threads_file))
 
427
                                     % (nb_threads, self.max_threads_file))
424
428
                    self.driver.max_threads = nb_threads
425
429
            # Catch driver exception if any
426
430
            self.report_driver_exception()
432
436
    def get_max_threads(self):
433
437
        # By default, keep the existing one
434
438
        nb_threads = self.driver.max_threads
435
 
        if os.path.exists(paths.max_threads_file):
436
 
            f = open(paths.max_threads_file)
437
 
            try:
 
439
        try:
 
440
            with open(self.max_threads_file) as f:
438
441
                line = f.readline().strip()
439
442
                if line:
440
443
                   nb_threads = int(line)
441
 
            except Exception, e:
442
 
                self.logger.warning('Error reading max threads file %s: %s'
443
 
                                    % (paths.max_threads_file, str(e)))
444
 
            finally:
445
 
                f.close()
 
444
        except Exception, e:
 
445
            self.logger.warning('Error accessing max threads file %s: %s'
 
446
                                % (self.max_threads_file, str(e)))
446
447
        return nb_threads
447
448
 
448
449
    def report_driver_exception(self):
479
480
    signal.signal(signal.SIGINT, handler)
480
481
    signal.signal(signal.SIGTERM, handler)
481
482
 
482
 
    config = iconfig.Iconfig()
483
 
    retry_all = config.get('pkgimport.retry_all_failed_jobs') == 'true'
484
483
    # Run the controller
485
484
    try:
486
 
        controller.run(retry_all)
 
485
        controller.run()
487
486
    except HTTPError, e:
488
487
        logger.critical(e.content)
489
488
        raise