69
69
implements(ISwitchboard)
71
71
def __init__(self, whichq, slice=None, numslices=1, recover=False):
72
"""Create a switchboard object.
74
:param whichq: The queue directory.
76
:param slice: The slice number for this switchboard, or None. If not
77
None, it must be [0..`numslices`).
78
:type slice: int or None
79
:param numslices: The total number of slices to split this queue
80
directory into. It must be a power of 2.
82
:param recover: True if backup files should be recovered.
72
85
self._whichq = whichq
73
86
# Create the directory if it doesn't yet exist.
74
87
Utils.makedirs(self._whichq, 0770)
218
237
def __init__(self, slice=None, numslices=1):
238
"""Create a queue runner.
240
:param slice: The slice number for this queue runner. This is passed
241
directly to the underlying `ISwitchboard` object.
242
:type slice: int or None
243
:param numslices: The number of slices for this queue. Must be a
220
247
# Create our own switchboard. Don't use the switchboard cache because
221
248
# we want to provide slice and numslice arguments.
222
249
self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
230
257
return '<%s at %s>' % (self.__class__.__name__, id(self))
233
261
self._stop = True
236
265
# Start the main loop for this queue runner.
240
# Once through the loop that processes all the files in
241
# the queue directory.
242
filecnt = self._oneloop()
243
# Do the periodic work for the subclass. BAW: this
244
# shouldn't be called here. There should be one more
245
# _doperiodic() call at the end of the _oneloop() loop.
247
# If the stop flag is set, we're done.
250
# Give the runner an opportunity to snooze for a while,
251
# but pass it the file count so it can decide whether to
252
# do more work now or not.
253
self._snooze(filecnt)
254
except KeyboardInterrupt:
268
# Once through the loop that processes all the files in the
270
filecnt = self._one_iteration()
271
# Do the periodic work for the subclass.
273
# If the stop flag is set, we're done.
276
# Give the runner an opportunity to snooze for a while, but
277
# pass it the file count so it can decide whether to do more
279
self._snooze(filecnt)
280
except KeyboardInterrupt:
257
# We've broken out of our main loop, so we want to reap all the
258
# subprocesses we've created and do any other necessary cleanups.
285
def _one_iteration(self):
262
287
me = self.__class__.__name__
263
288
dlog.debug('[%s] starting oneloop', me)
264
# First, list all the files in our queue directory.
265
# Switchboard.files() is guaranteed to hand us the files in FIFO
266
# order. Return an integer count of the number of files that were
267
# available for this qrunner to process.
289
# List all the files in our queue directory. The switchboard is
290
# guaranteed to hand us the files in FIFO order.
268
291
files = self._switchboard.files
269
292
for filebase in files:
270
293
dlog.debug('[%s] processing filebase: %s', me, filebase)
272
295
# Ask the switchboard for the message and metadata objects
273
# associated with this filebase.
296
# associated with this queue file.
274
297
msg, msgdata = self._switchboard.dequeue(filebase)
275
298
except Exception, e:
276
# This used to just catch email.Errors.MessageParseError,
277
# but other problems can occur in message parsing, e.g.
278
# ValueError, and exceptions can occur in unpickling too.
279
# We don't want the runner to die, so we just log and skip
280
# this entry, but preserve it for analysis.
299
# This used to just catch email.Errors.MessageParseError, but
300
# other problems can occur in message parsing, e.g.
301
# ValueError, and exceptions can occur in unpickling too. We
302
# don't want the runner to die, so we just log and skip this
303
# entry, but preserve it for analysis.
282
305
elog.error('Skipping and preserving unparseable message: %s',
317
340
self._switchboard.finish(filebase, preserve=True)
318
341
config.db.abort()
319
342
# Other work we want to do each time through the loop.
320
dlog.debug('[%s] reaping', me)
321
Utils.reap(self._kids, once=True)
322
343
dlog.debug('[%s] doing periodic', me)
324
345
dlog.debug('[%s] checking short circuit', me)
325
if self._shortcircuit():
346
if self._short_curcuit():
326
347
dlog.debug('[%s] short circuiting', me)
328
349
dlog.debug('[%s] commiting', me)
330
351
dlog.debug('[%s] ending oneloop: %s', me, len(files))
331
352
return len(files)
333
def _onefile(self, msg, msgdata):
354
def _process_one_file(self, msg, msgdata):
334
356
# Do some common sanity checking on the message metadata. It's got to
335
357
# be destined for a particular mailing list. This switchboard is used
336
358
# to shunt off badly formatted messages. We don't want to just trash
337
359
# them because they may be fixable with human intervention. Just get
338
# them out of our site though.
360
# them out of our sight.
340
362
# Find out which mailing list this message is destined for.
341
363
listname = msgdata.get('listname')
342
364
mlist = config.db.list_manager.get(listname)
344
366
elog.error('Dequeuing message destined for missing list: %s',
346
368
self._shunt.enqueue(msg, msgdata)
348
# Now process this message, keeping track of any subprocesses that may
349
# have been spawned. We'll reap those later.
351
# We also want to set up the language context for this message. The
352
# context will be the preferred language for the user if a member of
353
# the list, or the list's preferred language. However, we must take
370
# Now process this message. We also want to set up the language
371
# context for this message. The context will be the preferred
372
# language for the user if the sender is a member of the list, or it
373
# will be the list's preferred language. However, we must take
354
374
# special care to reset the defaults, otherwise subsequent messages
355
# may be translated incorrectly. BAW: I'm not sure I like this
356
# approach, but I can't think of anything better right now.
375
# may be translated incorrectly.
357
376
sender = msg.get_sender()
358
377
member = mlist.members.get_member(sender)
360
lang = member.preferred_language
362
lang = mlist.preferred_language
363
with i18n.using_language(lang):
364
msgdata['lang'] = lang
378
language = (member.preferred_language
379
if member is not None
380
else mlist.preferred_language)
381
with i18n.using_language(language):
382
msgdata['lang'] = language
365
383
keepqueued = self._dispose(mlist, msg, msgdata)
366
# Keep tabs on any child processes that got spawned.
367
kids = msgdata.get('_kids')
369
self._kids.update(kids)
371
385
self._switchboard.enqueue(msg, msgdata)
376
390
traceback.print_exc(file=s)
377
391
elog.error('%s', s.getvalue())
380
# Subclasses can override these methods.
383
"""Clean up upon exit from the main processing loop.
385
Called when the Runner's main loop is stopped, this should perform
386
any necessary resource deallocation. Its return value is irrelevant.
388
Utils.reap(self._kids)
390
396
def _dispose(self, mlist, msg, msgdata):
391
"""Dispose of a single message destined for a mailing list.
393
Called for each message that the Runner is responsible for, this is
394
the primary overridable method for processing each message.
395
Subclasses, must provide implementation for this method.
397
mlist is the IMailingList instance this message is destined for.
399
msg is the Message object representing the message.
401
msgdata is a dictionary of message metadata.
403
398
raise NotImplementedError
405
def _doperiodic(self):
406
"""Do some processing `every once in a while'.
408
Called every once in a while both from the Runner's main loop, and
409
from the Runner's hash slice processing loop. You can do whatever
410
special periodic processing you want here, and the return value is
400
def _do_periodic(self):
415
404
def _snooze(self, filecnt):
416
"""Sleep for a little while.
418
filecnt is the number of messages in the queue the last time through.
419
Sub-runners can decide to continue to do work, or sleep for a while
420
based on this value. By default, we only snooze if there was nothing
421
to do last time around.
423
406
if filecnt or float(self.SLEEPTIME) <= 0:
425
408
time.sleep(float(self.SLEEPTIME))
427
def _shortcircuit(self):
428
"""Return a true value if the individual file processing loop should
429
exit before it's finished processing each message in the current slice
430
of hash space. A false value tells _oneloop() to continue processing
431
until the current snapshot of hash space is exhausted.
433
You could, for example, implement a throttling algorithm here.
410
def _short_curcuit(self):
435
412
return self._stop