~thisfred/ubuntuone-client/lp-737150

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

Resolved two situations that hung some commands (LP: #724058)

Show diffs side-by-side

added added

removed removed

Lines of Context:
84
84
    return wrapper
85
85
 
86
86
 
 
87
class DeferredInterrupted(Exception):
 
88
    """To stop the run when pausing."""
 
89
 
 
90
 
 
91
class InterruptibleDeferred(defer.Deferred):
 
92
    """Receives a deferred, and wraps it, also behaving like a deferred.
 
93
 
 
94
    If the original deferred is triggered, that is passed, and can not be
 
95
    interrupted any more. If it's interrupted, then it silences the original
 
96
    deferred, no matter what.
 
97
    """
 
98
    def __init__(self, d):
 
99
        defer.Deferred.__init__(self)
 
100
        self.interrupted = False
 
101
 
 
102
        self.original_deferred = d
 
103
        d.addBoth(self.filter)
 
104
 
 
105
    def filter(self, result):
 
106
        """Pass the result if not interrupted."""
 
107
        if not self.interrupted:
 
108
            self.callback(result)
 
109
 
 
110
    def interrupt(self):
 
111
        """Interrupt only if original not called."""
 
112
        if not self.original_deferred.called:
 
113
            self.interrupted = True
 
114
            self.errback(DeferredInterrupted())
 
115
 
 
116
 
 
117
 
87
118
class PathLockingTree(object):
88
119
    """Tree that stores deferreds in the nodes."""
89
120
 
472
503
        """Stop the pool and cleanup the running commands."""
473
504
        self.active = False
474
505
        for command in self.waiting:
475
 
            command.cleanup()
 
506
            command.pause()
476
507
 
477
508
    def node_is_queued(self, cmdclass, share_id, node_id):
478
509
        """True if a command is queued for that node."""
1067
1098
 
1068
1099
    __slots__ = ('_queue', 'running', 'pathlock_release', 'log',
1069
1100
                 'markers_resolved_deferred', 'action_queue', 'cancelled',
1070
 
                 'wait_for_queue', 'wait_for_conditions')
 
1101
                 'wait_for_queue', 'wait_for_conditions', 'running_deferred')
1071
1102
 
1072
1103
    def __init__(self, request_queue):
1073
1104
        """Initialize a command instance."""
1081
1112
 
1082
1113
        self.wait_for_queue = None
1083
1114
        self.wait_for_conditions = None
 
1115
        self.running_deferred = None
1084
1116
 
1085
1117
    def to_dict(self):
1086
1118
        """Dump logged attributes to a dict."""
1151
1183
        appropriately on their own.  Note that this may be called more
1152
1184
        than once.
1153
1185
        """
1154
 
        self.log.debug('cleanup')
1155
1186
 
1156
1187
    def _start(self):
1157
1188
        """Do the specialized pre-run setup."""
1158
1189
        return defer.succeed(None)
1159
1190
 
 
1191
    def pause(self):
 
1192
        """Pause the command."""
 
1193
        self.log.debug('pausing')
 
1194
        if self.running_deferred is not None:
 
1195
            self.running_deferred.interrupt()
 
1196
        self.cleanup()
 
1197
 
1160
1198
    def resume(self):
1161
1199
        """Unlock the command because the queue is back alive."""
1162
1200
        if self.wait_for_queue is not None:
1234
1272
                yield self.markers_resolved_deferred
1235
1273
                self.log.debug('running')
1236
1274
                self.running = True
1237
 
                result = yield self._run()
 
1275
                d = self._run()
 
1276
                self.running_deferred = InterruptibleDeferred(d)
 
1277
                result = yield self.running_deferred
1238
1278
 
 
1279
            except DeferredInterrupted:
 
1280
                self.running_deferred = None
 
1281
                continue
1239
1282
            except Exception, exc:
 
1283
                self.running_deferred = None
1240
1284
                if self.cancelled:
1241
1285
                    self.log.debug('cancelled while running')
1242
1286
                    break
2093
2137
    def _start(self):
2094
2138
        """Just acquire the transfers semaphore."""
2095
2139
        self.tx_semaphore = yield self._queue.transfers_semaphore.acquire()
 
2140
        if self.cancelled:
 
2141
            # release the semaphore and stop working!
 
2142
            self.log.debug("semaphore released after acquiring, "
 
2143
                           "command cancelled")
 
2144
            self.tx_semaphore = self.tx_semaphore.release()
 
2145
            return
2096
2146
        self.log.debug('semaphore acquired')
2097
2147
 
2098
2148
    def finish(self):
2277
2327
 
2278
2328
    def cleanup(self):
2279
2329
        """Cleanup: stop the producer."""
2280
 
        super(Upload, self).cleanup()
 
2330
        self.log.debug('cleanup')
2281
2331
        if self.upload_req is not None and self.upload_req.producer is not None:
2282
2332
            self.log.debug('stopping the producer')
2283
2333
            self.upload_req.producer.stopProducing()
2286
2336
    def _start(self):
2287
2337
        """Do the specialized pre-run setup."""
2288
2338
        self.tx_semaphore = yield self._queue.transfers_semaphore.acquire()
 
2339
        if self.cancelled:
 
2340
            # release the semaphore and stop working!
 
2341
            self.log.debug("semaphore released after acquiring, "
 
2342
                           "command cancelled")
 
2343
            self.tx_semaphore = self.tx_semaphore.release()
 
2344
            return
2289
2345
        self.log.debug('semaphore acquired')
2290
2346
 
2291
2347
        yield self.action_queue.zip_queue.zip(self)