1
# Copyright (C) 2001-2012 by the Free Software Foundation, Inc.
3
# This file is part of GNU Mailman.
5
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free
7
# Software Foundation, either version 3 of the License, or (at your option)
10
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15
# You should have received a copy of the GNU General Public License along with
16
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18
"""The process runner base class."""
20
from __future__ import absolute_import, print_function, unicode_literals
32
from cStringIO import StringIO
33
from lazr.config import as_boolean, as_timedelta
34
from zope.component import getUtility
35
from zope.event import notify
36
from zope.interface import implementer
38
from mailman.config import config
39
from mailman.core.i18n import _
40
from mailman.core.switchboard import Switchboard
41
from mailman.interfaces.languages import ILanguageManager
42
from mailman.interfaces.listmanager import IListManager
43
from mailman.interfaces.runner import IRunner, RunnerCrashEvent
44
from mailman.utilities.string import expand
47
dlog = logging.getLogger('mailman.debug')
48
elog = logging.getLogger('mailman.error')
54
intercept_signals = True
56
def __init__(self, name, slice=None):
59
:param slice: The slice number for this runner. This is passed
60
directly to the underlying `ISwitchboard` object. This is ignored
61
for runners that don't manage a queue.
62
:type slice: int or None
64
# Grab the configuration section.
66
section = getattr(config, 'runner.' + name)
67
substitutions = config.paths
68
substitutions['name'] = name
69
self.queue_directory = expand(section.path, substitutions)
70
numslices = int(section.instances)
71
self.switchboard = Switchboard(
72
name, self.queue_directory, slice, numslices, True)
73
self.sleep_time = as_timedelta(section.sleep_time)
74
# sleep_time is a timedelta; turn it into a float for time.sleep().
75
self.sleep_float = (86400 * self.sleep_time.days +
76
self.sleep_time.seconds +
77
self.sleep_time.microseconds / 1.0e6)
78
self.max_restarts = int(section.max_restarts)
79
self.start = as_boolean(section.start)
83
return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
91
# Start the main loop for this runner.
94
# Once through the loop that processes all the files in the
96
filecnt = self._one_iteration()
97
# Do the periodic work for the subclass.
99
# If the stop flag is set, we're done.
102
# Give the runner an opportunity to snooze for a while, but
103
# pass it the file count so it can decide whether to do more
105
self._snooze(filecnt)
106
except KeyboardInterrupt:
111
def _one_iteration(self):
113
me = self.__class__.__name__
114
dlog.debug('[%s] starting oneloop', me)
115
# List all the files in our queue directory. The switchboard is
116
# guaranteed to hand us the files in FIFO order.
117
files = self.switchboard.files
118
for filebase in files:
119
dlog.debug('[%s] processing filebase: %s', me, filebase)
121
# Ask the switchboard for the message and metadata objects
122
# associated with this queue file.
123
msg, msgdata = self.switchboard.dequeue(filebase)
124
except Exception as error:
125
# This used to just catch email.Errors.MessageParseError, but
126
# other problems can occur in message parsing, e.g.
127
# ValueError, and exceptions can occur in unpickling too. We
128
# don't want the runner to die, so we just log and skip this
129
# entry, but preserve it for analysis.
131
elog.error('Skipping and preserving unparseable message: %s',
133
self.switchboard.finish(filebase, preserve=True)
137
dlog.debug('[%s] processing onefile', me)
138
self._process_one_file(msg, msgdata)
139
dlog.debug('[%s] finishing filebase: %s', me, filebase)
140
self.switchboard.finish(filebase)
141
except Exception as error:
142
# All runners that implement _dispose() must guarantee that
143
# exceptions are caught and dealt with properly. Still, there
144
# may be a bug in the infrastructure, and we do not want those
145
# to cause messages to be lost. Any uncaught exceptions will
146
# cause the message to be stored in the shunt queue for human
149
# Put a marker in the metadata for unshunting.
150
msgdata['whichq'] = self.switchboard.name
151
# It is possible that shunting can throw an exception, e.g. a
152
# permissions problem or a MemoryError due to a really large
153
# message. Try to be graceful.
155
shunt = config.switchboards['shunt']
156
new_filebase = shunt.enqueue(msg, msgdata)
157
elog.error('SHUNTING: %s', new_filebase)
158
self.switchboard.finish(filebase)
159
except Exception as error:
160
# The message wasn't successfully shunted. Log the
161
# exception and try to preserve the original queue entry
162
# for possible analysis.
165
'SHUNTING FAILED, preserving original entry: %s',
167
self.switchboard.finish(filebase, preserve=True)
169
# Other work we want to do each time through the loop.
170
dlog.debug('[%s] doing periodic', me)
172
dlog.debug('[%s] committing transaction', me)
174
dlog.debug('[%s] checking short circuit', me)
175
if self._short_circuit():
176
dlog.debug('[%s] short circuiting', me)
178
dlog.debug('[%s] ending oneloop: %s', me, len(files))
181
def _process_one_file(self, msg, msgdata):
183
# Do some common sanity checking on the message metadata. It's got to
184
# be destined for a particular mailing list. This switchboard is used
185
# to shunt off badly formatted messages. We don't want to just trash
186
# them because they may be fixable with human intervention. Just get
187
# them out of our sight.
189
# Find out which mailing list this message is destined for.
191
listname = msgdata.get('listname', missing)
193
if listname is missing
194
else getUtility(IListManager).get(unicode(listname)))
197
'%s runner "%s" shunting message for missing list: %s',
198
msg['message-id'], self.name,
199
('n/a' if listname is missing else listname))
200
config.switchboards['shunt'].enqueue(msg, msgdata)
202
# Now process this message. We also want to set up the language
203
# context for this message. The context will be the preferred
204
# language for the user if the sender is a member of the list, or it
205
# will be the list's preferred language. However, we must take
206
# special care to reset the defaults, otherwise subsequent messages
207
# may be translated incorrectly.
209
language_manager = getUtility(ILanguageManager)
210
language = language_manager[config.mailman.default_language]
212
member = mlist.members.get_member(msg.sender)
213
language = (member.preferred_language
214
if member is not None
215
else mlist.preferred_language)
217
language = mlist.preferred_language
218
with _.using(language.code):
219
msgdata['lang'] = language.code
221
keepqueued = self._dispose(mlist, msg, msgdata)
222
except Exception as error:
223
# Trigger the Zope event and re-raise
224
notify(RunnerCrashEvent(self, mlist, msg, msgdata, error))
227
self.switchboard.enqueue(msg, msgdata)
230
elog.error('Uncaught runner exception: %s', exc)
232
traceback.print_exc(file=s)
233
elog.error('%s', s.getvalue())
239
def _dispose(self, mlist, msg, msgdata):
241
raise NotImplementedError
243
def _do_periodic(self):
247
def _snooze(self, filecnt):
249
if filecnt or self.sleep_float <= 0:
251
time.sleep(self.sleep_float)
253
def _short_circuit(self):