~barry/mailman/events-and-web

« back to all changes in this revision

Viewing changes to src/mailman/core/runner.py

  • Committer: klm
  • Date: 1998-01-07 21:21:35 UTC
  • Revision ID: vcs-imports@canonical.com-19980107212135-sv0y521ps0xye37r
Initial revision

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2001-2012 by the Free Software Foundation, Inc.
2
 
#
3
 
# This file is part of GNU Mailman.
4
 
#
5
 
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
 
# the terms of the GNU General Public License as published by the Free
7
 
# Software Foundation, either version 3 of the License, or (at your option)
8
 
# any later version.
9
 
#
10
 
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
 
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13
 
# more details.
14
 
#
15
 
# You should have received a copy of the GNU General Public License along with
16
 
# GNU Mailman.  If not, see <http://www.gnu.org/licenses/>.
17
 
 
18
 
"""The process runner base class."""
19
 
 
20
 
from __future__ import absolute_import, print_function, unicode_literals
21
 
 
22
 
__metaclass__ = type
23
 
__all__ = [
24
 
    'Runner',
25
 
    ]
26
 
 
27
 
 
28
 
import time
29
 
import logging
30
 
import traceback
31
 
 
32
 
from cStringIO import StringIO
33
 
from lazr.config import as_boolean, as_timedelta
34
 
from zope.component import getUtility
35
 
from zope.event import notify
36
 
from zope.interface import implementer
37
 
 
38
 
from mailman.config import config
39
 
from mailman.core.i18n import _
40
 
from mailman.core.switchboard import Switchboard
41
 
from mailman.interfaces.languages import ILanguageManager
42
 
from mailman.interfaces.listmanager import IListManager
43
 
from mailman.interfaces.runner import IRunner, RunnerCrashEvent
44
 
from mailman.utilities.string import expand
45
 
 
46
 
 
47
 
dlog = logging.getLogger('mailman.debug')
48
 
elog = logging.getLogger('mailman.error')
49
 
 
50
 
 
51
 
 
52
 
@implementer(IRunner)
53
 
class Runner:
54
 
    intercept_signals = True
55
 
 
56
 
    def __init__(self, name, slice=None):
57
 
        """Create a runner.
58
 
 
59
 
        :param slice: The slice number for this runner.  This is passed
60
 
            directly to the underlying `ISwitchboard` object.  This is ignored
61
 
            for runners that don't manage a queue.
62
 
        :type slice: int or None
63
 
        """
64
 
        # Grab the configuration section.
65
 
        self.name = name
66
 
        section = getattr(config, 'runner.' + name)
67
 
        substitutions = config.paths
68
 
        substitutions['name'] = name
69
 
        self.queue_directory = expand(section.path, substitutions)
70
 
        numslices = int(section.instances)
71
 
        self.switchboard = Switchboard(
72
 
            name, self.queue_directory, slice, numslices, True)
73
 
        self.sleep_time = as_timedelta(section.sleep_time)
74
 
        # sleep_time is a timedelta; turn it into a float for time.sleep().
75
 
        self.sleep_float = (86400 * self.sleep_time.days +
76
 
                            self.sleep_time.seconds +
77
 
                            self.sleep_time.microseconds / 1.0e6)
78
 
        self.max_restarts = int(section.max_restarts)
79
 
        self.start = as_boolean(section.start)
80
 
        self._stop = False
81
 
 
82
 
    def __repr__(self):
83
 
        return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
84
 
 
85
 
    def stop(self):
86
 
        """See `IRunner`."""
87
 
        self._stop = True
88
 
 
89
 
    def run(self):
90
 
        """See `IRunner`."""
91
 
        # Start the main loop for this runner.
92
 
        try:
93
 
            while True:
94
 
                # Once through the loop that processes all the files in the
95
 
                # queue directory.
96
 
                filecnt = self._one_iteration()
97
 
                # Do the periodic work for the subclass.
98
 
                self._do_periodic()
99
 
                # If the stop flag is set, we're done.
100
 
                if self._stop:
101
 
                    break
102
 
                # Give the runner an opportunity to snooze for a while, but
103
 
                # pass it the file count so it can decide whether to do more
104
 
                # work now or not.
105
 
                self._snooze(filecnt)
106
 
        except KeyboardInterrupt:
107
 
            pass
108
 
        finally:
109
 
            self._clean_up()
110
 
 
111
 
    def _one_iteration(self):
112
 
        """See `IRunner`."""
113
 
        me = self.__class__.__name__
114
 
        dlog.debug('[%s] starting oneloop', me)
115
 
        # List all the files in our queue directory.  The switchboard is
116
 
        # guaranteed to hand us the files in FIFO order.
117
 
        files = self.switchboard.files
118
 
        for filebase in files:
119
 
            dlog.debug('[%s] processing filebase: %s', me, filebase)
120
 
            try:
121
 
                # Ask the switchboard for the message and metadata objects
122
 
                # associated with this queue file.
123
 
                msg, msgdata = self.switchboard.dequeue(filebase)
124
 
            except Exception as error:
125
 
                # This used to just catch email.Errors.MessageParseError, but
126
 
                # other problems can occur in message parsing, e.g.
127
 
                # ValueError, and exceptions can occur in unpickling too.  We
128
 
                # don't want the runner to die, so we just log and skip this
129
 
                # entry, but preserve it for analysis.
130
 
                self._log(error)
131
 
                elog.error('Skipping and preserving unparseable message: %s',
132
 
                           filebase)
133
 
                self.switchboard.finish(filebase, preserve=True)
134
 
                config.db.abort()
135
 
                continue
136
 
            try:
137
 
                dlog.debug('[%s] processing onefile', me)
138
 
                self._process_one_file(msg, msgdata)
139
 
                dlog.debug('[%s] finishing filebase: %s', me, filebase)
140
 
                self.switchboard.finish(filebase)
141
 
            except Exception as error:
142
 
                # All runners that implement _dispose() must guarantee that
143
 
                # exceptions are caught and dealt with properly.  Still, there
144
 
                # may be a bug in the infrastructure, and we do not want those
145
 
                # to cause messages to be lost.  Any uncaught exceptions will
146
 
                # cause the message to be stored in the shunt queue for human
147
 
                # intervention.
148
 
                self._log(error)
149
 
                # Put a marker in the metadata for unshunting.
150
 
                msgdata['whichq'] = self.switchboard.name
151
 
                # It is possible that shunting can throw an exception, e.g. a
152
 
                # permissions problem or a MemoryError due to a really large
153
 
                # message.  Try to be graceful.
154
 
                try:
155
 
                    shunt = config.switchboards['shunt']
156
 
                    new_filebase = shunt.enqueue(msg, msgdata)
157
 
                    elog.error('SHUNTING: %s', new_filebase)
158
 
                    self.switchboard.finish(filebase)
159
 
                except Exception as error:
160
 
                    # The message wasn't successfully shunted.  Log the
161
 
                    # exception and try to preserve the original queue entry
162
 
                    # for possible analysis.
163
 
                    self._log(error)
164
 
                    elog.error(
165
 
                        'SHUNTING FAILED, preserving original entry: %s',
166
 
                        filebase)
167
 
                    self.switchboard.finish(filebase, preserve=True)
168
 
                config.db.abort()
169
 
            # Other work we want to do each time through the loop.
170
 
            dlog.debug('[%s] doing periodic', me)
171
 
            self._do_periodic()
172
 
            dlog.debug('[%s] committing transaction', me)
173
 
            config.db.commit()
174
 
            dlog.debug('[%s] checking short circuit', me)
175
 
            if self._short_circuit():
176
 
                dlog.debug('[%s] short circuiting', me)
177
 
                break
178
 
        dlog.debug('[%s] ending oneloop: %s', me, len(files))
179
 
        return len(files)
180
 
 
181
 
    def _process_one_file(self, msg, msgdata):
182
 
        """See `IRunner`."""
183
 
        # Do some common sanity checking on the message metadata.  It's got to
184
 
        # be destined for a particular mailing list.  This switchboard is used
185
 
        # to shunt off badly formatted messages.  We don't want to just trash
186
 
        # them because they may be fixable with human intervention.  Just get
187
 
        # them out of our sight.
188
 
        #
189
 
        # Find out which mailing list this message is destined for.
190
 
        missing = object()
191
 
        listname = msgdata.get('listname', missing)
192
 
        mlist = (None
193
 
                 if listname is missing
194
 
                 else getUtility(IListManager).get(unicode(listname)))
195
 
        if mlist is None:
196
 
            elog.error(
197
 
                '%s runner "%s" shunting message for missing list: %s',
198
 
                msg['message-id'], self.name,
199
 
                ('n/a' if listname is missing else listname))
200
 
            config.switchboards['shunt'].enqueue(msg, msgdata)
201
 
            return
202
 
        # Now process this message.  We also want to set up the language
203
 
        # context for this message.  The context will be the preferred
204
 
        # language for the user if the sender is a member of the list, or it
205
 
        # will be the list's preferred language.  However, we must take
206
 
        # special care to reset the defaults, otherwise subsequent messages
207
 
        # may be translated incorrectly.
208
 
        if mlist is None:
209
 
            language_manager = getUtility(ILanguageManager)
210
 
            language = language_manager[config.mailman.default_language]
211
 
        elif msg.sender:
212
 
            member = mlist.members.get_member(msg.sender)
213
 
            language = (member.preferred_language
214
 
                        if member is not None
215
 
                        else mlist.preferred_language)
216
 
        else:
217
 
            language = mlist.preferred_language
218
 
        with _.using(language.code):
219
 
            msgdata['lang'] = language.code
220
 
            try:
221
 
                keepqueued = self._dispose(mlist, msg, msgdata)
222
 
            except Exception as error:
223
 
                # Trigger the Zope event and re-raise
224
 
                notify(RunnerCrashEvent(self, mlist, msg, msgdata, error))
225
 
                raise
226
 
        if keepqueued:
227
 
            self.switchboard.enqueue(msg, msgdata)
228
 
 
229
 
    def _log(self, exc):
230
 
        elog.error('Uncaught runner exception: %s', exc)
231
 
        s = StringIO()
232
 
        traceback.print_exc(file=s)
233
 
        elog.error('%s', s.getvalue())
234
 
 
235
 
    def _clean_up(self):
236
 
        """See `IRunner`."""
237
 
        pass
238
 
 
239
 
    def _dispose(self, mlist, msg, msgdata):
240
 
        """See `IRunner`."""
241
 
        raise NotImplementedError
242
 
 
243
 
    def _do_periodic(self):
244
 
        """See `IRunner`."""
245
 
        pass
246
 
 
247
 
    def _snooze(self, filecnt):
248
 
        """See `IRunner`."""
249
 
        if filecnt or self.sleep_float <= 0:
250
 
            return
251
 
        time.sleep(self.sleep_float)
252
 
 
253
 
    def _short_circuit(self):
254
 
        """See `IRunner`."""
255
 
        return self._stop