~barry/mailman/events-and-web

« back to all changes in this revision

Viewing changes to mailman/queue/__init__.py

  • Committer: Barry Warsaw
  • Date: 2008-10-07 02:07:04 UTC
  • mfrom: (6643.2.7 reorg2)
  • Revision ID: barry@list.org-20081007020704-2oyjxp08f54ap7f1
branch merge

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
    implements(ISwitchboard)
70
70
 
71
71
    def __init__(self, whichq, slice=None, numslices=1, recover=False):
 
72
        """Create a switchboard object.
 
73
 
 
74
        :param whichq: The queue directory.
 
75
        :type whichq: str
 
76
        :param slice: The slice number for this switchboard, or None.  If not
 
77
            None, it must be [0..`numslices`).
 
78
        :type slice: int or None
 
79
        :param numslices: The total number of slices to split this queue
 
80
            directory into.  It must be a power of 2.
 
81
        :type numslices: int
 
82
        :param recover: True if backup files should be recovered.
 
83
        :type recover: bool
 
84
        """
72
85
        self._whichq = whichq
73
86
        # Create the directory if it doesn't yet exist.
74
87
        Utils.makedirs(self._whichq, 0770)
84
97
 
85
98
    @property
86
99
    def queue_directory(self):
 
100
        """See `ISwitchboard`."""
87
101
        return self._whichq
88
102
 
89
103
    def enqueue(self, _msg, _metadata=None, **_kws):
 
104
        """See `ISwitchboard`."""
90
105
        if _metadata is None:
91
106
            _metadata = {}
92
107
        # Calculate the SHA hexdigest of the message to get a unique base
133
148
        return filebase
134
149
 
135
150
    def dequeue(self, filebase):
 
151
        """See `ISwitchboard`."""
136
152
        # Calculate the filename from the given filebase.
137
153
        filename = os.path.join(self._whichq, filebase + '.pck')
138
154
        backfile = os.path.join(self._whichq, filebase + '.bak')
174
190
 
175
191
    @property
176
192
    def files(self):
 
193
        """See `ISwitchboard`."""
177
194
        return self.get_files()
178
195
 
179
196
    def get_files(self, extension='.pck'):
 
197
        """See `ISwitchboard`."""
180
198
        times = {}
181
199
        lower = self._lower
182
200
        upper = self._upper
199
217
        return [times[key] for key in sorted(times)]
200
218
 
201
219
    def recover_backup_files(self):
 
220
        """See `ISwitchboard`."""
202
221
        # Move all .bak files in our slice to .pck.  It's impossible for both
203
222
        # to exist at the same time, so the move is enough to ensure that our
204
223
        # normal dequeuing process will handle them.
216
235
    SLEEPTIME = None
217
236
 
218
237
    def __init__(self, slice=None, numslices=1):
219
 
        self._kids = {}
 
238
        """Create a queue runner.
 
239
 
 
240
        :param slice: The slice number for this queue runner.  This is passed
 
241
            directly to the underlying `ISwitchboard` object.
 
242
        :type slice: int or None
 
243
        :param numslices: The number of slices for this queue.  Must be a
 
244
            power of 2.
 
245
        :type numslices: int
 
246
        """
220
247
        # Create our own switchboard.  Don't use the switchboard cache because
221
248
        # we want to provide slice and numslice arguments.
222
249
        self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
230
257
        return '<%s at %s>' % (self.__class__.__name__, id(self))
231
258
 
232
259
    def stop(self):
 
260
        """See `IRunner`."""
233
261
        self._stop = True
234
262
 
235
263
    def run(self):
 
264
        """See `IRunner`."""
236
265
        # Start the main loop for this queue runner.
237
266
        try:
238
 
            try:
239
 
                while True:
240
 
                    # Once through the loop that processes all the files in
241
 
                    # the queue directory.
242
 
                    filecnt = self._oneloop()
243
 
                    # Do the periodic work for the subclass.  BAW: this
244
 
                    # shouldn't be called here.  There should be one more
245
 
                    # _doperiodic() call at the end of the _oneloop() loop.
246
 
                    self._doperiodic()
247
 
                    # If the stop flag is set, we're done.
248
 
                    if self._stop:
249
 
                        break
250
 
                    # Give the runner an opportunity to snooze for a while,
251
 
                    # but pass it the file count so it can decide whether to
252
 
                    # do more work now or not.
253
 
                    self._snooze(filecnt)
254
 
            except KeyboardInterrupt:
255
 
                pass
 
267
            while True:
 
268
                # Once through the loop that processes all the files in the
 
269
                # queue directory.
 
270
                filecnt = self._one_iteration()
 
271
                # Do the periodic work for the subclass.
 
272
                self._do_periodic()
 
273
                # If the stop flag is set, we're done.
 
274
                if self._stop:
 
275
                    break
 
276
                # Give the runner an opportunity to snooze for a while, but
 
277
                # pass it the file count so it can decide whether to do more
 
278
                # work now or not.
 
279
                self._snooze(filecnt)
 
280
        except KeyboardInterrupt:
 
281
            pass
256
282
        finally:
257
 
            # We've broken out of our main loop, so we want to reap all the
258
 
            # subprocesses we've created and do any other necessary cleanups.
259
 
            self._cleanup()
 
283
            self._clean_up()
260
284
 
261
 
    def _oneloop(self):
 
285
    def _one_iteration(self):
 
286
        """See `IRunner`."""
262
287
        me = self.__class__.__name__
263
288
        dlog.debug('[%s] starting oneloop', me)
264
 
        # First, list all the files in our queue directory.
265
 
        # Switchboard.files() is guaranteed to hand us the files in FIFO
266
 
        # order.  Return an integer count of the number of files that were
267
 
        # available for this qrunner to process.
 
289
        # List all the files in our queue directory.  The switchboard is
 
290
        # guaranteed to hand us the files in FIFO order.
268
291
        files = self._switchboard.files
269
292
        for filebase in files:
270
293
            dlog.debug('[%s] processing filebase: %s', me, filebase)
271
294
            try:
272
295
                # Ask the switchboard for the message and metadata objects
273
 
                # associated with this filebase.
 
296
                # associated with this queue file.
274
297
                msg, msgdata = self._switchboard.dequeue(filebase)
275
298
            except Exception, e:
276
 
                # This used to just catch email.Errors.MessageParseError,
277
 
                # but other problems can occur in message parsing, e.g.
278
 
                # ValueError, and exceptions can occur in unpickling too.
279
 
                # We don't want the runner to die, so we just log and skip
280
 
                # this entry, but preserve it for analysis.
 
299
                # This used to just catch email.Errors.MessageParseError, but
 
300
                # other problems can occur in message parsing, e.g.
 
301
                # ValueError, and exceptions can occur in unpickling too.  We
 
302
                # don't want the runner to die, so we just log and skip this
 
303
                # entry, but preserve it for analysis.
281
304
                self._log(e)
282
305
                elog.error('Skipping and preserving unparseable message: %s',
283
306
                           filebase)
286
309
                continue
287
310
            try:
288
311
                dlog.debug('[%s] processing onefile', me)
289
 
                self._onefile(msg, msgdata)
 
312
                self._process_one_file(msg, msgdata)
290
313
                dlog.debug('[%s] finishing filebase: %s', me, filebase)
291
314
                self._switchboard.finish(filebase)
292
315
            except Exception, e:
297
320
                # cause the message to be stored in the shunt queue for human
298
321
                # intervention.
299
322
                self._log(e)
300
 
                # Put a marker in the metadata for unshunting
 
323
                # Put a marker in the metadata for unshunting.
301
324
                msgdata['whichq'] = self._switchboard.queue_directory
302
325
                # It is possible that shunting can throw an exception, e.g. a
303
326
                # permissions problem or a MemoryError due to a really large
317
340
                    self._switchboard.finish(filebase, preserve=True)
318
341
                config.db.abort()
319
342
            # Other work we want to do each time through the loop.
320
 
            dlog.debug('[%s] reaping', me)
321
 
            Utils.reap(self._kids, once=True)
322
343
            dlog.debug('[%s] doing periodic', me)
323
 
            self._doperiodic()
 
344
            self._do_periodic()
324
345
            dlog.debug('[%s] checking short circuit', me)
325
 
            if self._shortcircuit():
 
346
            if self._short_curcuit():
326
347
                dlog.debug('[%s] short circuiting', me)
327
348
                break
328
349
            dlog.debug('[%s] commiting', me)
330
351
        dlog.debug('[%s] ending oneloop: %s', me, len(files))
331
352
        return len(files)
332
353
 
333
 
    def _onefile(self, msg, msgdata):
 
354
    def _process_one_file(self, msg, msgdata):
 
355
        """See `IRunner`."""
334
356
        # Do some common sanity checking on the message metadata.  It's got to
335
357
        # be destined for a particular mailing list.  This switchboard is used
336
358
        # to shunt off badly formatted messages.  We don't want to just trash
337
359
        # them because they may be fixable with human intervention.  Just get
338
 
        # them out of our site though.
 
360
        # them out of our sight.
339
361
        #
340
362
        # Find out which mailing list this message is destined for.
341
363
        listname = msgdata.get('listname')
342
364
        mlist = config.db.list_manager.get(listname)
343
 
        if not mlist:
 
365
        if mlist is None:
344
366
            elog.error('Dequeuing message destined for missing list: %s',
345
367
                       listname)
346
368
            self._shunt.enqueue(msg, msgdata)
347
369
            return
348
 
        # Now process this message, keeping track of any subprocesses that may
349
 
        # have been spawned.  We'll reap those later.
350
 
        #
351
 
        # We also want to set up the language context for this message.  The
352
 
        # context will be the preferred language for the user if a member of
353
 
        # the list, or the list's preferred language.  However, we must take
 
370
        # Now process this message.  We also want to set up the language
 
371
        # context for this message.  The context will be the preferred
 
372
        # language for the user if the sender is a member of the list, or it
 
373
        # will be the list's preferred language.  However, we must take
354
374
        # special care to reset the defaults, otherwise subsequent messages
355
 
        # may be translated incorrectly.  BAW: I'm not sure I like this
356
 
        # approach, but I can't think of anything better right now.
 
375
        # may be translated incorrectly.
357
376
        sender = msg.get_sender()
358
377
        member = mlist.members.get_member(sender)
359
 
        if member:
360
 
            lang = member.preferred_language
361
 
        else:
362
 
            lang = mlist.preferred_language
363
 
        with i18n.using_language(lang):
364
 
            msgdata['lang'] = lang
 
378
        language = (member.preferred_language
 
379
                    if member is not None
 
380
                    else mlist.preferred_language)
 
381
        with i18n.using_language(language):
 
382
            msgdata['lang'] = language
365
383
            keepqueued = self._dispose(mlist, msg, msgdata)
366
 
        # Keep tabs on any child processes that got spawned.
367
 
        kids = msgdata.get('_kids')
368
 
        if kids:
369
 
            self._kids.update(kids)
370
384
        if keepqueued:
371
385
            self._switchboard.enqueue(msg, msgdata)
372
386
 
376
390
        traceback.print_exc(file=s)
377
391
        elog.error('%s', s.getvalue())
378
392
 
379
 
    #
380
 
    # Subclasses can override these methods.
381
 
    #
382
 
    def _cleanup(self):
383
 
        """Clean up upon exit from the main processing loop.
384
 
 
385
 
        Called when the Runner's main loop is stopped, this should perform
386
 
        any necessary resource deallocation.  Its return value is irrelevant.
387
 
        """
388
 
        Utils.reap(self._kids)
 
393
    def _clean_up(self):
 
394
        """See `IRunner`."""
389
395
 
390
396
    def _dispose(self, mlist, msg, msgdata):
391
 
        """Dispose of a single message destined for a mailing list.
392
 
 
393
 
        Called for each message that the Runner is responsible for, this is
394
 
        the primary overridable method for processing each message.
395
 
        Subclasses, must provide implementation for this method.
396
 
 
397
 
        mlist is the IMailingList instance this message is destined for.
398
 
 
399
 
        msg is the Message object representing the message.
400
 
 
401
 
        msgdata is a dictionary of message metadata.
402
 
        """
 
397
        """See `IRunner`."""
403
398
        raise NotImplementedError
404
399
 
405
 
    def _doperiodic(self):
406
 
        """Do some processing `every once in a while'.
407
 
 
408
 
        Called every once in a while both from the Runner's main loop, and
409
 
        from the Runner's hash slice processing loop.  You can do whatever
410
 
        special periodic processing you want here, and the return value is
411
 
        irrelevant.
412
 
        """
 
400
    def _do_periodic(self):
 
401
        """See `IRunner`."""
413
402
        pass
414
403
 
415
404
    def _snooze(self, filecnt):
416
 
        """Sleep for a little while.
417
 
 
418
 
        filecnt is the number of messages in the queue the last time through.
419
 
        Sub-runners can decide to continue to do work, or sleep for a while
420
 
        based on this value.  By default, we only snooze if there was nothing
421
 
        to do last time around.
422
 
        """
 
405
        """See `IRunner`."""
423
406
        if filecnt or float(self.SLEEPTIME) <= 0:
424
407
            return
425
408
        time.sleep(float(self.SLEEPTIME))
426
409
 
427
 
    def _shortcircuit(self):
428
 
        """Return a true value if the individual file processing loop should
429
 
        exit before it's finished processing each message in the current slice
430
 
        of hash space.  A false value tells _oneloop() to continue processing
431
 
        until the current snapshot of hash space is exhausted.
432
 
 
433
 
        You could, for example, implement a throttling algorithm here.
434
 
        """
 
410
    def _short_curcuit(self):
 
411
        """See `IRunner`."""
435
412
        return self._stop