86
86
logger = logging.getLogger(__name__)
87
87
logger.setLevel(logging.DEBUG)
89
conf = iconfig.Iconfig()
90
log_debug_path = conf.get('pkgimport.driver.log.debug')
91
log_progress_path = conf.get('pkgimport.driver.log.progress')
89
conf = iconfig.ImporterStack()
90
log_debug_path = conf.get('pi.driver.log.debug')
91
log_progress_path = conf.get('pi.driver.log.progress')
92
92
for p in (log_debug_path, log_progress_path):
93
93
dir_path = os.path.dirname(p)
94
94
icommon.ensure_directory(dir_path)
122
121
class Importer(threads.SubprocessMonitor):
124
def __init__(self, logger, args, package_name, job_id):
123
def __init__(self, logger, package_name, job_id, _command=None):
125
124
"""Spawn and monitor an import.
127
:param args: The script and args to be used to do the import (including
130
126
:param package_name: The name of the package to import (for display
133
129
:param job_id: The status db job id for this import.
131
imp_conf = iconfig.ImporterStack()
133
_command = imp_conf.get('pi.import_command', expand=False)
134
_command = imp_conf.expand_options(_command,
135
env=dict(package=package_name))
135
136
conf = iconfig.PackageStack(package_name)
136
137
super(Importer, self).__init__(
137
args, max_duration=conf.get('pkgimport.max_duration'),
138
grace_period=conf.get('pkgimport.kill.grace_period'))
138
cmdline.split(_command),
139
max_duration=conf.get('pi.max_duration'),
140
grace_period=conf.get('pi.kill.grace_period'))
139
141
self.logger = logger
140
142
self.package_name = package_name
141
143
self.job_id = job_id
142
self.status_db = icommon.StatusDatabase(paths.sqlite_file)
144
self.status_db = icommon.StatusDatabase(
145
iconfig.ImporterStack().get('pi.sqlite_file'))
143
146
self.success = None
144
147
self.failure_sig = None
182
185
sure that it is needed.
185
def __init__(self, logger, script_path):
188
def __init__(self, logger):
186
189
"""Construct an `AllQueue`.
188
191
:param script_path: The path to the script to run to import the
191
194
Queue.Queue.__init__(self)
192
195
self.logger = logger
193
self.script_path = script_path
194
196
self.tried = set()
195
self.packages_db = icommon.PackageDatabase(paths.sqlite_package_file)
196
self.status_db = icommon.StatusDatabase(paths.sqlite_file)
197
conf = iconfig.ImporterStack()
198
self.packages_db = icommon.PackageDatabase(
199
conf.get('pi.sqlite_package_file'))
200
self.status_db = icommon.StatusDatabase(conf.get('pi.sqlite_file'))
198
202
def get_nowait(self):
199
203
# FIXME: It's bogus to implement get_nowait() this way (without
205
209
except Queue.Empty:
206
210
self.unfinished_tasks += 1
207
211
job_id, package_name = self.next_job()
208
return Importer(self.logger, [self.script_path, package_name],
209
package_name, job_id)
212
return Importer(self.logger, package_name, job_id)
211
214
def next_job(self):
255
258
def __init__(self, logger, threshold, delay):
256
259
super(LaunchpadCircuitBreaker, self).__init__(threshold, delay)
257
260
self.logger = logger
258
self.status_db = icommon.StatusDatabase(paths.sqlite_file)
261
conf = iconfig.ImporterStack()
262
self.status_db = icommon.StatusDatabase(conf.get('pi.sqlite_file'))
259
263
# For log purposes, not to be confused with the breaker state itself
260
264
# which is authoritative.
261
265
self.lp_state = LP_UP
321
325
def before_start(self):
322
326
super(ImportDriver, self).before_start()
323
# XXX: Probably not the best place to put this code, but what the
325
conf = iconfig.Iconfig()
326
script_path = conf.get('pkgimport.import_script')
327
self.queue = AllQueue(self.logger, script_path)
327
self.queue = AllQueue(self.logger)
328
328
# Launchpad downtimes should be less than 5 minutes (300 seconds).
329
329
self.circuit_breaker = LaunchpadCircuitBreaker(self.logger, 0, 300)
381
381
"""Monitor the ImportDriver and interpret signals."""
383
383
def __init__(self, logger):
384
self.status_db = icommon.StatusDatabase(paths.sqlite_file)
384
self.conf = iconfig.ImporterStack()
385
self.status_db = icommon.StatusDatabase(self.conf.get('pi.sqlite_file'))
385
386
# Start with an 8-way Driver
386
387
self.driver = ImportDriver(logger, 8)
387
388
self.logger = logger
389
def run(self, retry_all=False):
390
lock = icommon.lock_main()
389
self.stop_file = self.conf.get('pi.stop_file')
390
self.max_threads_file = self.conf.get('pi.max_threads_file')
394
lock = icommon.lock_path(self.conf.get('pi.base_dir'), 'mass_import')
392
396
self.logger.info("Another main process is running")
399
if self.conf.get('pi.retry_all_failed_jobs'):
396
400
# We have been configured to retry _all_ failed jobs.
397
401
self.status_db.add_failed_jobs()
411
415
if self.driver.stopped.isSet():
413
417
# Then check the graceful stop file
414
if (os.path.exists(paths.stop_file)
418
if (os.path.exists(self.stop_file)
415
419
and not self.driver.queue_closed.isSet()):
416
420
self.driver.queue_closed.set()
417
421
self.logger.info('Driver would not process new requests')
420
424
nb_threads = self.get_max_threads()
421
425
if self.driver.max_threads != nb_threads:
422
426
self.logger.info('Read %d in %s'
423
% (nb_threads, paths.max_threads_file))
427
% (nb_threads, self.max_threads_file))
424
428
self.driver.max_threads = nb_threads
425
429
# Catch driver exception if any
426
430
self.report_driver_exception()
432
436
def get_max_threads(self):
433
437
# By default, keep the existing one
434
438
nb_threads = self.driver.max_threads
435
if os.path.exists(paths.max_threads_file):
436
f = open(paths.max_threads_file)
440
with open(self.max_threads_file) as f:
438
441
line = f.readline().strip()
440
443
nb_threads = int(line)
442
self.logger.warning('Error reading max threads file %s: %s'
443
% (paths.max_threads_file, str(e)))
445
self.logger.warning('Error accessing max threads file %s: %s'
446
% (self.max_threads_file, str(e)))
446
447
return nb_threads
448
449
def report_driver_exception(self):
479
480
signal.signal(signal.SIGINT, handler)
480
481
signal.signal(signal.SIGTERM, handler)
482
config = iconfig.Iconfig()
483
retry_all = config.get('pkgimport.retry_all_failed_jobs') == 'true'
484
483
# Run the controller
486
controller.run(retry_all)
487
486
except HTTPError, e:
488
487
logger.critical(e.content)