2
# Copyright 2015 Canonical Ltd. This software is licensed under the
3
# GNU Affero General Public License version 3 (see the file LICENSE).
5
"""Database Tasks Service.
7
A service that runs deferred database operations, and then ensures they're
8
finished before stopping.
11
from __future__ import (
21
"DatabaseTaskAlreadyRunning",
22
"DatabaseTasksService",
25
from maasserver.utils.threads import deferToDatabase
26
from provisioningserver.utils.twisted import (
30
from twisted.application.service import Service
31
from twisted.internet.defer import (
35
from twisted.internet.task import cooperate
36
from twisted.python import log
39
class DatabaseTaskAlreadyRunning(Exception):
40
"""The database task is running and can no longer be cancelled."""
43
class DatabaseTasksService(Service, object):
44
"""Run deferred database operations one at a time.
46
Once the service is started, `deferTask` and `addTask` can be used to
47
queue up execution of a database task.
49
The former — `deferTask` — will return a `Deferred` that fires with the
50
result of the database task. Errors arising from this task become the
51
responsibility of the caller.
53
The latter — `addTask` — returns nothing, and will log errors arising from
56
Before this service has been started, and as soon as shutdown has
57
commenced, database tasks will be rejected by `deferTask` and `addTask`.
63
def __init__(self, limit=100):
64
"""Initialise a new `DatabaseTasksService`.
66
:param limit: The maximum number of database tasks to defer before
67
rejecting additional tasks.
69
super(DatabaseTasksService, self).__init__()
70
# Start with a queue that rejects puts.
71
self.queue = DeferredQueue(size=0, backlog=1)
75
def deferTask(self, func, *args, **kwargs):
76
"""Schedules `func` to run later.
78
:raise QueueOverflow: If the queue of tasks is full.
79
:return: :class:`Deferred`, which fires with the result of the running
80
the task in a database thread. This can be cancelled while the
81
database task is still enqueued, but will refuse to cancel once
82
the task is running, instead raising `DatabaseTaskAlreadyRunning`.
85
if task in self.queue.pending:
86
self.queue.pending.remove(task)
88
raise DatabaseTaskAlreadyRunning()
90
done = Deferred(cancel)
93
d = deferToDatabase(func, *args, **kwargs)
100
@asynchronous(timeout=FOREVER)
101
def addTask(self, func, *args, **kwargs):
102
"""Schedules `func` to run later.
104
Failures arising from the running the task in a database thread will
107
:raise QueueOverflow: If the queue of tasks is full.
110
done = self.deferTask(func, *args, **kwargs)
111
done.addErrback(log.err, "Unhandled failure in database task.")
116
"""Schedules a "synchronise" task with the queue.
118
Tasks are processed in order, so this is a convenient way to ensure
119
that all previously added/deferred tasks have been processed.
121
:raise QueueOverflow: If the queue of tasks is full.
122
:return: :class:`Deferred` that will fire when this task is pulled out
123
of the queue. Processing of the queue will continue without pause.
126
if task in self.queue.pending:
127
self.queue.pending.remove(task)
129
done = Deferred(cancel)
137
@asynchronous(timeout=FOREVER)
138
def startService(self):
139
"""Open the queue and start processing database tasks.
143
super(DatabaseTasksService, self).startService()
144
self.queue.size = self.limit # Open queue to puts.
145
self.coop = cooperate(self._generateTasks())
147
@asynchronous(timeout=FOREVER)
148
def stopService(self):
149
"""Close the queue and finish processing outstanding database tasks.
151
:return: :class:`Deferred` which fires once all tasks have been run.
153
super(DatabaseTasksService, self).stopService()
154
# Feed the cooperative task so that it can shutdown.
155
self.queue.size += 1 # Prevent QueueOverflow.
156
self.queue.put(self.sentinel) # See _generateTasks.
157
self.queue.size = 0 # Now close queue to puts.
158
# This service has stopped when the coop task is done.
159
return self.coop.whenDone()
161
def _generateTasks(self):
162
"""Feed the cooperator.
164
This pulls tasks from the queue while this service is running and
165
executes them. If no tasks are pending it will wait for more.
167
Once shutdown of the service commences this will continue pulling and
168
executing tasks while there are tasks actually pending; it will not
169
wait for additional tasks to be enqueued.
172
sentinel = self.sentinel
175
if task is not sentinel:
178
# Execute tasks as long as we're running.
180
yield queue.get().addCallback(execute)
182
# Execute all remaining tasks.
183
while len(queue.pending) != 0:
184
yield queue.get().addCallback(execute)