~divmod-dev/divmod.org/dangling-1091

« back to all changes in this revision

Viewing changes to Axiom/axiom/batch.py

  • Committer: exarkun
  • Date: 2006-06-05 13:27:26 UTC
  • Revision ID: svn-v4:866e43f7-fbfc-0310-8f2a-ec88d1da2979:trunk:6995
Merge reliable-remote-listeners-1078

Author: exarkun
Reviewer: glyph
Fixes: #1078

Refactor error handling in the batch code so that it is shared by local and remote
processing.  Add tests for the remote case.

Show diffs side-by-side

added added

removed removed

Lines of Context:
41
41
 
42
42
class _ProcessingFailure(Exception):
43
43
    """
44
 
    Raised when processItem raises any exception.
 
44
    Raised when processItem raises any exception.  This is never raised
 
45
    directly, but instances of the three subclasses are.
45
46
    """
46
47
    def __init__(self, reliableListener, workUnit, failure):
47
48
        Exception.__init__(self)
55
56
        self.failure.cleanFailure()
56
57
 
57
58
 
 
59
    def mark(self):
 
60
        """
 
61
        Mark the unit of work as failed in the database and update the listener
 
62
        so as to skip it next time.
 
63
        """
 
64
        self.reliableListener.lastRun = extime.Time()
 
65
        BatchProcessingError(
 
66
            store=self.reliableListener.store,
 
67
            processor=self.reliableListener.processor,
 
68
            listener=self.reliableListener.listener,
 
69
            item=self.workUnit,
 
70
            error=self.failure.getErrorMessage())
 
71
 
 
72
 
58
73
 
59
74
class _ForwardProcessingFailure(_ProcessingFailure):
60
75
    """
62
77
    from the mark.
63
78
    """
64
79
 
 
80
    def mark(self):
 
81
        _ProcessingFailure.mark(self)
 
82
        self.reliableListener.forwardMark = self.workUnit.storeID
 
83
 
65
84
 
66
85
 
67
86
class _BackwardProcessingFailure(_ProcessingFailure):
69
88
    An error occurred in a reliable listener while processing items backwards
70
89
    from the mark.
71
90
    """
 
91
    def mark(self):
 
92
        _ProcessingFailure.mark(self)
 
93
        self.reliableListener.backwardMark = self.workUnit.storeID
72
94
 
73
95
 
74
96
 
286
308
 
287
309
    def timedEventErrorHandler(self, timedEvent, failureObj):
288
310
        failureObj.trap(_ProcessingFailure)
289
 
        workUnit = failureObj.value.workUnit
290
 
        listener = failureObj.value.reliableListener
291
 
        processingFailure = failureObj.value.failure
292
 
 
293
311
        log.msg("Batch processing failure")
294
 
        log.err(processingFailure)
295
 
        BatchProcessingError(
296
 
            store=self.store,
297
 
            processor=listener.processor,
298
 
            listener=listener.listener,
299
 
            item=workUnit,
300
 
            error=processingFailure.getErrorMessage())
301
 
 
302
 
        if failureObj.check(_ForwardProcessingFailure):
303
 
            listener.forwardMark = workUnit.storeID
304
 
        elif failureObj.check(_BackwardProcessingFailure):
305
 
            listener.backwardMark = workUnit.storeID
306
 
 
 
312
        log.err(failureObj.value.failure)
 
313
        failureObj.value.mark()
307
314
        return extime.Time() + datetime.timedelta(milliseconds=self.busyInterval)
308
315
 
309
316
 
1047
1054
        return self.store.powerupsFor(iaxiom.IBatchProcessor)
1048
1055
 
1049
1056
 
 
1057
    def processWhileRunning(self):
 
1058
        """
 
1059
        Run tasks until stopService is called.
 
1060
        """
 
1061
        task = self.step()
 
1062
        for result, more in task:
 
1063
            yield result
 
1064
            if not self.running:
 
1065
                break
 
1066
            if more:
 
1067
                delay = 0.1
 
1068
            else:
 
1069
                delay = 10.0
 
1070
            yield self.deferLater(delay)
 
1071
 
 
1072
 
1050
1073
    def step(self):
1051
 
        while self.running:
 
1074
        while True:
1052
1075
            items = list(self.items())
1053
1076
 
1054
1077
            if VERBOSE:
1055
1078
                log.msg("Found %d processors for %s" % (len(items), self.store))
1056
1079
 
 
1080
            ran = False
1057
1081
            more = False
1058
 
            while items and self.running:
 
1082
            while items:
 
1083
                ran = True
1059
1084
                item = items.pop()
1060
1085
                if VERBOSE:
1061
1086
                    log.msg("Stepping processor %r (suspended is %r)" % (item, self.suspended))
1064
1089
                except _ProcessingFailure, e:
1065
1090
                    log.msg("%r failed while processing %r:" % (e.reliableListener, e.workUnit))
1066
1091
                    log.err(e.failure)
 
1092
                    e.mark()
1067
1093
 
1068
1094
                    # _Fuck_.  /Fuck/.  If user-code in or below (*fuck*)
1069
1095
                    # item.step creates a Failure on any future iteration
1076
1102
                else:
1077
1103
                    if itemHasMore:
1078
1104
                        more = True
1079
 
                yield None
1080
 
            yield self.deferLater([10.0, 0.1][more])
1081
 
 
1082
 
 
1083
 
    def ensafen(self, iterator, onError):
1084
 
        """
1085
 
        Create an iterator which yields the same elements C{iterator} would
1086
 
        have produced, but catch any exceptions it raises (except for
1087
 
        C{StopIteration}) and invoke C{onError} to handle them.
1088
 
        """
1089
 
        while True:
1090
 
            try:
1091
 
                yield iterator.next()
1092
 
            except StopIteration:
1093
 
                break
1094
 
            except:
1095
 
                f = failure.Failure()
1096
 
                onError(f)
1097
 
 
1098
 
 
1099
 
    def _desist(self, err):
1100
 
        log.msg("Batch processor for %r encountered an error" % (self.store,))
1101
 
        log.err(err)
1102
 
        self.disownServiceParent()
1103
 
        raise StopIteration
 
1105
                yield None, more
 
1106
            if not ran:
 
1107
                yield None, more
1104
1108
 
1105
1109
 
1106
1110
    def startService(self):
1107
1111
        service.Service.startService(self)
1108
 
        self.parent.cooperator.coiterate(self.ensafen(self.step(), self._desist))
 
1112
        self.parent.cooperator.coiterate(self.processWhileRunning())
1109
1113
 
1110
1114
 
1111
1115
    def stopService(self):