~allenap/maas/neighbours-service-live

« back to all changes in this revision

Viewing changes to src/maasserver/utils/dbtasks.py

  • Committer: Gavin Panella
  • Date: 2015-10-14 19:48:39 UTC
  • mfrom: (3852.1.524 maas)
  • Revision ID: gavin.panella@canonical.com-20151014194839-xgjmom6qzxyj71pn
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# encoding: utf-8
 
2
# Copyright 2015 Canonical Ltd.  This software is licensed under the
 
3
# GNU Affero General Public License version 3 (see the file LICENSE).
 
4
 
 
5
"""Database Tasks Service.
 
6
 
 
7
A service that runs deferred database operations, and then ensures they're
 
8
finished before stopping.
 
9
"""
 
10
 
 
11
from __future__ import (
 
12
    absolute_import,
 
13
    print_function,
 
14
    unicode_literals,
 
15
    )
 
16
 
 
17
str = None
 
18
 
 
19
__metaclass__ = type
 
20
__all__ = [
 
21
    "DatabaseTaskAlreadyRunning",
 
22
    "DatabaseTasksService",
 
23
]
 
24
 
 
25
from maasserver.utils.threads import deferToDatabase
 
26
from provisioningserver.utils.twisted import (
 
27
    asynchronous,
 
28
    FOREVER,
 
29
)
 
30
from twisted.application.service import Service
 
31
from twisted.internet.defer import (
 
32
    Deferred,
 
33
    DeferredQueue,
 
34
)
 
35
from twisted.internet.task import cooperate
 
36
from twisted.python import log
 
37
 
 
38
 
 
39
class DatabaseTaskAlreadyRunning(Exception):
 
40
    """The database task is running and can no longer be cancelled."""
 
41
 
 
42
 
 
43
class DatabaseTasksService(Service, object):
 
44
    """Run deferred database operations one at a time.
 
45
 
 
46
    Once the service is started, `deferTask` and `addTask` can be used to
 
47
    queue up execution of a database task.
 
48
 
 
49
    The former — `deferTask` — will return a `Deferred` that fires with the
 
50
    result of the database task. Errors arising from this task become the
 
51
    responsibility of the caller.
 
52
 
 
53
    The latter — `addTask` — returns nothing, and will log errors arising from
 
54
    the database task.
 
55
 
 
56
    Before this service has been started, and as soon as shutdown has
 
57
    commenced, database tasks will be rejected by `deferTask` and `addTask`.
 
58
 
 
59
    """
 
60
 
 
61
    sentinel = object()
 
62
 
 
63
    def __init__(self, limit=100):
 
64
        """Initialise a new `DatabaseTasksService`.
 
65
 
 
66
        :param limit: The maximum number of database tasks to defer before
 
67
            rejecting additional tasks.
 
68
        """
 
69
        super(DatabaseTasksService, self).__init__()
 
70
        # Start with a queue that rejects puts.
 
71
        self.queue = DeferredQueue(size=0, backlog=1)
 
72
        self.limit = limit
 
73
 
 
74
    @asynchronous
 
75
    def deferTask(self, func, *args, **kwargs):
 
76
        """Schedules `func` to run later.
 
77
 
 
78
        :raise QueueOverflow: If the queue of tasks is full.
 
79
        :return: :class:`Deferred`, which fires with the result of the running
 
80
            the task in a database thread. This can be cancelled while the
 
81
            database task is still enqueued, but will refuse to cancel once
 
82
            the task is running, instead raising `DatabaseTaskAlreadyRunning`.
 
83
        """
 
84
        def cancel(done):
 
85
            if task in self.queue.pending:
 
86
                self.queue.pending.remove(task)
 
87
            else:
 
88
                raise DatabaseTaskAlreadyRunning()
 
89
 
 
90
        done = Deferred(cancel)
 
91
 
 
92
        def task():
 
93
            d = deferToDatabase(func, *args, **kwargs)
 
94
            d.chainDeferred(done)
 
95
            return d
 
96
 
 
97
        self.queue.put(task)
 
98
        return done
 
99
 
 
100
    @asynchronous(timeout=FOREVER)
 
101
    def addTask(self, func, *args, **kwargs):
 
102
        """Schedules `func` to run later.
 
103
 
 
104
        Failures arising from the running the task in a database thread will
 
105
        be logged.
 
106
 
 
107
        :raise QueueOverflow: If the queue of tasks is full.
 
108
        :return: `None`
 
109
        """
 
110
        done = self.deferTask(func, *args, **kwargs)
 
111
        done.addErrback(log.err, "Unhandled failure in database task.")
 
112
        return None
 
113
 
 
114
    @asynchronous
 
115
    def syncTask(self):
 
116
        """Schedules a "synchronise" task with the queue.
 
117
 
 
118
        Tasks are processed in order, so this is a convenient way to ensure
 
119
        that all previously added/deferred tasks have been processed.
 
120
 
 
121
        :raise QueueOverflow: If the queue of tasks is full.
 
122
        :return: :class:`Deferred` that will fire when this task is pulled out
 
123
            of the queue. Processing of the queue will continue without pause.
 
124
        """
 
125
        def cancel(done):
 
126
            if task in self.queue.pending:
 
127
                self.queue.pending.remove(task)
 
128
 
 
129
        done = Deferred(cancel)
 
130
 
 
131
        def task():
 
132
            done.callback(self)
 
133
 
 
134
        self.queue.put(task)
 
135
        return done
 
136
 
 
137
    @asynchronous(timeout=FOREVER)
 
138
    def startService(self):
 
139
        """Open the queue and start processing database tasks.
 
140
 
 
141
        :return: `None`
 
142
        """
 
143
        super(DatabaseTasksService, self).startService()
 
144
        self.queue.size = self.limit  # Open queue to puts.
 
145
        self.coop = cooperate(self._generateTasks())
 
146
 
 
147
    @asynchronous(timeout=FOREVER)
 
148
    def stopService(self):
 
149
        """Close the queue and finish processing outstanding database tasks.
 
150
 
 
151
        :return: :class:`Deferred` which fires once all tasks have been run.
 
152
        """
 
153
        super(DatabaseTasksService, self).stopService()
 
154
        # Feed the cooperative task so that it can shutdown.
 
155
        self.queue.size += 1  # Prevent QueueOverflow.
 
156
        self.queue.put(self.sentinel)  # See _generateTasks.
 
157
        self.queue.size = 0  # Now close queue to puts.
 
158
        # This service has stopped when the coop task is done.
 
159
        return self.coop.whenDone()
 
160
 
 
161
    def _generateTasks(self):
 
162
        """Feed the cooperator.
 
163
 
 
164
        This pulls tasks from the queue while this service is running and
 
165
        executes them. If no tasks are pending it will wait for more.
 
166
 
 
167
        Once shutdown of the service commences this will continue pulling and
 
168
        executing tasks while there are tasks actually pending; it will not
 
169
        wait for additional tasks to be enqueued.
 
170
        """
 
171
        queue = self.queue
 
172
        sentinel = self.sentinel
 
173
 
 
174
        def execute(task):
 
175
            if task is not sentinel:
 
176
                return task()
 
177
 
 
178
        # Execute tasks as long as we're running.
 
179
        while self.running:
 
180
            yield queue.get().addCallback(execute)
 
181
 
 
182
        # Execute all remaining tasks.
 
183
        while len(queue.pending) != 0:
 
184
            yield queue.get().addCallback(execute)