12
12
from landscape.broker.remote import RemoteBroker
15
class PackageTaskError(Exception):
16
"""Raised when a task hasn't been successfully completed."""
15
19
class PackageTaskHandlerConfiguration(Configuration):
16
20
"""Specialized configuration for L{PackageTaskHandler}s."""
66
70
self._facade = package_facade
67
71
self._broker = remote_broker
68
72
self._config = config
69
self._channels_reloaded = False
71
def ensure_channels_reloaded(self):
72
if not self._channels_reloaded:
73
self._channels_reloaded = True
74
self._facade.reload_channels()
77
76
return self.handle_tasks()
79
78
def handle_tasks(self):
79
"""Handle the tasks in the queue.
81
The tasks will be handed over one by one to L{handle_task} until the
82
queue is empty or a task fails.
80
86
return self._handle_next_task(None)
82
88
def _handle_next_task(self, result, last_task=None):
89
"""Pick the next task from the queue and pass it to C{handle_task}."""
83
91
if last_task is not None:
84
92
# Last task succeeded. We can safely kill it now.
87
96
task = self._store.get_next_task(self.queue_name)
90
99
# We have another task. Let's handle it.
91
100
result = self.handle_task(task)
92
result.addCallback(self._handle_next_task, task)
101
result.addCallback(self._handle_next_task, last_task=task)
102
result.addErrback(self._handle_task_failure)
96
106
# No more tasks! We're done!
97
107
return succeed(None)
109
def _handle_task_failure(self, failure):
110
"""Gracefully handle a L{PackageTaskError} and stop handling tasks."""
111
failure.trap(PackageTaskError)
99
113
def handle_task(self, task):
114
"""Handle a sigle task.
116
Sub-classes must override this method in order to trigger task-specific
119
This method must return a L{Deferred} firing the task result. If the
120
deferred is successful the task will be removed from the queue and the
121
next one will be picked. If the task can't be completed, this method
122
must raise a L{PackageTaskError}, in this case the handler will stop
123
processing tasks and the failed task won't be removed from the queue.
100
125
return succeed(None)
128
def handled_tasks_count(self):
130
Return the number of tasks that have been successfully handled so far.
102
134
def use_hash_id_db(self):
104
136
Attach the appropriate pre-canned hash=>id database to our store.