~nataliabidart/ubuntuone-client/get-sharesdir

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Tarmac
  • Author(s): Facundo Batista
  • Date: 2011-02-07 19:46:02 UTC
  • mfrom: (825.2.13 unleash-the-queues-6)
  • Revision ID: tarmac-20110207194602-5jbtu0r0vle924ka
Refactored the ActionQueueCommand lifecycle (LP: #704473)

Show diffs side-by-side

added added

removed removed

Lines of Context:
84
84
    return wrapper
85
85
 
86
86
 
87
 
class ClientNoLongerThere(Exception):
88
 
    """AQ Client is no longer there.
89
 
 
90
 
    This is used for Commands that have work to do after starting and before
91
 
    actually using the client, and in this time frame the client can
92
 
    disappear (connection lost, etc.).
93
 
    """
94
 
 
95
 
 
96
 
class UploadCompressionCancelled(Exception):
97
 
    """Compression of a file for upload cancelled."""
98
 
 
99
 
 
100
 
class RequestCleanedUp(Exception):
101
 
    """The request was cancelled by ActionQueue.cleanup()."""
102
 
 
103
 
 
104
87
class PathLockingTree(object):
105
88
    """Tree that stores deferreds in the nodes."""
106
89
 
369
352
 
370
353
        try:
371
354
            if upload.cancelled:
372
 
                raise UploadCompressionCancelled("Cancelled")
 
355
                # avoid compression if command already cancelled
 
356
                return
373
357
            upload.log.debug('compressing: %r', filename)
374
358
            # we need to compress the file completely to figure out its
375
359
            # compressed size. So streaming is out :(
387
371
                    # anyway (being in /tmp and all)
388
372
                    break
389
373
                f.write(zipper.compress(data))
390
 
            if upload.cancelled:
391
 
                raise UploadCompressionCancelled("Cancelled")
392
374
            upload.deflated_size = f.tell()
393
375
            # close the compressed file (thus, if you actually want to stream
394
376
            # it out, it must have a name so it can be reopnened)
413
395
                upload.log.warn("Unable to build fileobj (%s: '%s') so "
414
396
                                "cancelling the upload.", type(e), e)
415
397
                upload.cancel()
416
 
                raise UploadCompressionCancelled("Cancelled because of "
417
 
                                                 "fileobj_factory failure.")
 
398
                return
418
399
 
419
400
            reactor.callInThread(self._compress, deferred, upload, fileobj)
420
401
        finally:
460
441
        if first_added:
461
442
            self.action_queue.event_queue.push('SYS_QUEUE_WAITING')
462
443
 
463
 
        command.locked_run()
464
 
 
465
444
    def unqueue(self, command):
466
445
        """Unqueue a command."""
467
446
        self.waiting.remove(command)
951
930
    def make_file(self, share_id, parent_id, name, marker, path):
952
931
        """See .interfaces.IMetaQueue."""
953
932
        return MakeFile(self.queue, share_id, parent_id,
954
 
                        name, marker, path).queue()
 
933
                        name, marker, path).go()
955
934
 
956
935
    def make_dir(self, share_id, parent_id, name, marker, path):
957
936
        """See .interfaces.IMetaQueue."""
958
937
        return MakeDir(self.queue, share_id, parent_id,
959
 
                       name, marker, path).queue()
 
938
                       name, marker, path).go()
960
939
 
961
940
    def move(self, share_id, node_id, old_parent_id, new_parent_id,
962
941
             new_name, path_from, path_to):
963
942
        """See .interfaces.IMetaQueue."""
964
943
        return Move(self.queue, share_id, node_id, old_parent_id,
965
 
                    new_parent_id, new_name, path_from, path_to).queue()
 
944
                    new_parent_id, new_name, path_from, path_to).go()
966
945
 
967
946
    def unlink(self, share_id, parent_id, node_id, path):
968
947
        """See .interfaces.IMetaQueue."""
969
 
        return Unlink(self.queue, share_id, parent_id, node_id, path).queue()
 
948
        return Unlink(self.queue, share_id, parent_id, node_id, path).go()
970
949
 
971
950
    def inquire_free_space(self, share_id):
972
951
        """See .interfaces.IMetaQueue."""
973
 
        return FreeSpaceInquiry(self.queue, share_id).queue()
 
952
        return FreeSpaceInquiry(self.queue, share_id).go()
974
953
 
975
954
    def inquire_account_info(self):
976
955
        """See .interfaces.IMetaQueue."""
977
 
        return AccountInquiry(self.queue).queue()
 
956
        return AccountInquiry(self.queue).go()
978
957
 
979
958
    def list_shares(self):
980
959
        """See .interfaces.IMetaQueue."""
981
 
        return ListShares(self.queue).queue()
 
960
        return ListShares(self.queue).go()
982
961
 
983
962
    def answer_share(self, share_id, answer):
984
963
        """See .interfaces.IMetaQueue."""
985
 
        return AnswerShare(self.queue, share_id, answer).queue()
 
964
        return AnswerShare(self.queue, share_id, answer).go()
986
965
 
987
966
    def create_share(self, node_id, share_to, name, access_level,
988
967
                     marker, path):
989
968
        """See .interfaces.IMetaQueue."""
990
969
        return CreateShare(self.queue, node_id, share_to, name,
991
 
                           access_level, marker, path).queue()
 
970
                           access_level, marker, path).go()
992
971
 
993
972
    def delete_share(self, share_id):
994
973
        """See .interfaces.IMetaQueue."""
995
 
        return DeleteShare(self.queue, share_id).queue()
 
974
        return DeleteShare(self.queue, share_id).go()
996
975
 
997
976
    def create_udf(self, path, name, marker):
998
977
        """See .interfaces.IMetaQueue."""
999
 
        return CreateUDF(self.queue, path, name, marker).queue()
 
978
        return CreateUDF(self.queue, path, name, marker).go()
1000
979
 
1001
980
    def list_volumes(self):
1002
981
        """See .interfaces.IMetaQueue."""
1003
 
        return ListVolumes(self.queue).queue()
 
982
        return ListVolumes(self.queue).go()
1004
983
 
1005
984
    def delete_volume(self, volume_id, path):
1006
985
        """See .interfaces.IMetaQueue."""
1007
 
        return DeleteVolume(self.queue, volume_id, path).queue()
 
986
        return DeleteVolume(self.queue, volume_id, path).go()
1008
987
 
1009
988
    def change_public_access(self, share_id, node_id, is_public):
1010
989
        """See .interfaces.IMetaQueue."""
1011
990
        return ChangePublicAccess(self.queue, share_id,
1012
 
                                  node_id, is_public).queue()
 
991
                                  node_id, is_public).go()
1013
992
 
1014
993
    def get_public_files(self):
1015
994
        """See .interfaces.IMetaQueue."""
1016
 
        return GetPublicFiles(self.queue).queue()
 
995
        return GetPublicFiles(self.queue).go()
1017
996
 
1018
997
    def download(self, share_id, node_id, server_hash, path, fileobj_factory):
1019
998
        """See .interfaces.IContentQueue.download."""
1020
999
        return Download(self.queue, share_id, node_id, server_hash,
1021
 
                        path, fileobj_factory).queue()
 
1000
                        path, fileobj_factory).go()
1022
1001
 
1023
1002
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
1024
1003
               size, path, fileobj_factory, tempfile_factory=None):
1025
1004
        """See .interfaces.IContentQueue."""
1026
1005
        return Upload(self.queue, share_id, node_id, previous_hash,
1027
1006
                      hash, crc32, size, path,
1028
 
                      fileobj_factory, tempfile_factory).queue()
 
1007
                      fileobj_factory, tempfile_factory).go()
1029
1008
 
1030
1009
    def _cancel_op(self, share_id, node_id, cmdclass):
1031
1010
        """Generalized form of cancel_upload and cancel_download."""
1056
1035
 
1057
1036
    def get_delta(self, volume_id, generation):
1058
1037
        """See .interfaces.IMetaQueue."""
1059
 
        return GetDelta(self.queue, volume_id, generation).queue()
 
1038
        return GetDelta(self.queue, volume_id, generation).go()
1060
1039
 
1061
1040
    def rescan_from_scratch(self, volume_id):
1062
1041
        """See .interfaces.IMetaQueue."""
1063
 
        return GetDeltaFromScratch(self.queue, volume_id).queue()
 
1042
        return GetDeltaFromScratch(self.queue, volume_id).go()
1064
1043
 
1065
1044
    def handle_SYS_ROOT_RECEIVED(self, root_id, mdid):
1066
1045
        """Demark the root node_id."""
1076
1055
        [x for x in protocol_errors._error_mapping.values()
1077
1056
         if x is not protocol_errors.InternalError] +
1078
1057
        [protocol_errors.RequestCancelledError,
1079
 
         twisted_errors.ConnectionDone, RequestCleanedUp]
 
1058
         twisted_errors.ConnectionDone, twisted_errors.ConnectionLost]
1080
1059
    )
1081
1060
 
1082
1061
    retryable_errors = (
1084
1063
        protocol_errors.QuotaExceededError,
1085
1064
        twisted_errors.ConnectionDone,
1086
1065
        twisted_errors.ConnectionLost,
1087
 
        ClientNoLongerThere,
1088
1066
    )
1089
1067
 
1090
1068
    logged_attrs = ()
1092
1070
    is_runnable = True
1093
1071
    uniqueness = None
1094
1072
 
1095
 
    __slots__ = ('_queue', 'start_done', 'running', 'pathlock_release', 'log',
1096
 
                 'markers_resolved_deferred', 'action_queue', 'cancelled')
 
1073
    __slots__ = ('_queue', 'running', 'pathlock_release', 'log',
 
1074
                 'markers_resolved_deferred', 'action_queue', 'cancelled',
 
1075
                 'wait_for_queue', 'wait_for_conditions')
1097
1076
 
1098
1077
    def __init__(self, request_queue):
1099
1078
        """Initialize a command instance."""
1100
1079
        self._queue = request_queue
1101
1080
        self.action_queue = request_queue.action_queue
1102
 
        self.start_done = False
1103
1081
        self.running = False
1104
1082
        self.log = None
1105
1083
        self.markers_resolved_deferred = defer.Deferred()
1106
1084
        self.pathlock_release = None
1107
1085
        self.cancelled = False
1108
1086
 
 
1087
        self.wait_for_queue = None
 
1088
        self.wait_for_conditions = None
 
1089
 
1109
1090
    def to_dict(self):
1110
1091
        """Dump logged attributes to a dict."""
1111
1092
        return dict((n, getattr(self, n, None)) for n in self.logged_attrs)
1115
1096
        share_id = getattr(self, "share_id", UNKNOWN)
1116
1097
        node_id = getattr(self, "node_id", None) or \
1117
1098
                      getattr(self, "marker", UNKNOWN)
1118
 
        return mklog(logger, self.__class__.__name__,
1119
 
                     share_id, node_id, **self.to_dict())
 
1099
        self.log = mklog(logger, self.__class__.__name__,
 
1100
                         share_id, node_id, **self.to_dict())
1120
1101
 
1121
1102
    @defer.inlineCallbacks
1122
1103
    def demark(self):
1163
1144
        """The command ended."""
1164
1145
        self.running = False
1165
1146
        self._queue.unqueue(self)
1166
 
        if self.pathlock_release is not None:
1167
 
            self.pathlock_release()
1168
 
 
1169
 
    def end_callback(self, arg):
1170
 
        """It worked!"""
1171
 
        if not self.running:
1172
 
            self.log.debug('not running, so no success')
1173
 
            return
1174
 
 
1175
 
        self.log.debug('success')
1176
 
        self.handle_success(arg)
1177
 
        self.finish()
1178
 
 
1179
 
    def end_errback(self, failure):
1180
 
        """It failed!"""
1181
 
        if self.cancelled:
1182
 
            self.log.debug('not errbacking because cancelled')
1183
 
            return
1184
 
 
1185
 
        error_message = failure.getErrorMessage()
1186
 
        if failure.check(*self.suppressed_error_messages):
1187
 
            self.log.warn('failure: %s', error_message)
1188
 
        else:
1189
 
            self.log.error('failure: %s', error_message)
1190
 
            self.log.debug('traceback follows:\n\n' + failure.getTraceback())
1191
 
        self.cleanup()
1192
 
 
1193
 
        if failure.check(*self.retryable_errors):
1194
 
            if self._queue.active:
1195
 
                reactor.callLater(0.1, self.run)
1196
 
            else:
1197
 
                self.log.debug('not retrying because queue not active')
1198
 
        else:
1199
 
            self.handle_failure(failure)
1200
 
            self.finish()
1201
 
 
1202
 
    def pre_queue_setup(self):
1203
 
        """Set up before the command gets really queued."""
1204
 
        self.log = self.make_logger()
1205
 
        self.demark()
1206
 
 
1207
 
    def queue(self):
1208
 
        """Queue the command."""
1209
 
        self.pre_queue_setup()
1210
 
        self.log.debug('queueing')
1211
 
        self._queue.queue(self)
 
1147
 
 
1148
    def _should_be_queued(self):
 
1149
        """Return True if the command should be queued."""
 
1150
        return True
1212
1151
 
1213
1152
    def cleanup(self):
1214
1153
        """Do whatever is needed to clean up from a failure.
1215
1154
 
1216
1155
        For example, stop producers and others that aren't cleaned up
1217
 
        appropriately on their own.
 
1156
        appropriately on their own.  Note that this may be called more
 
1157
        than once.
1218
1158
        """
1219
1159
        self.log.debug('cleanup')
1220
1160
 
1222
1162
        """Do the specialized pre-run setup."""
1223
1163
        return defer.succeed(None)
1224
1164
 
 
1165
    def resume(self):
 
1166
        """Unlock the command because the queue is back alive."""
 
1167
        if self.wait_for_queue is not None:
 
1168
            self.wait_for_queue.callback(True)
 
1169
            self.wait_for_queue = None
 
1170
 
 
1171
    def check_conditions(self):
 
1172
        """If conditions are ok, run the command again."""
 
1173
        if self.is_runnable and self.wait_for_conditions is not None:
 
1174
            self.wait_for_conditions.callback(True)
 
1175
            self.wait_for_conditions = None
 
1176
 
1225
1177
    @defer.inlineCallbacks
1226
 
    def locked_run(self):
1227
 
        """Lock and run."""
 
1178
    def go(self):
 
1179
        """Execute all the steps for a command."""
 
1180
        # create the log and demark
 
1181
        self.make_logger()
 
1182
        self.demark()
 
1183
 
 
1184
        # queue if should, otherwise all is done
 
1185
        if not self._should_be_queued():
 
1186
            return
 
1187
 
 
1188
        self.log.debug('queueing')
 
1189
        self._queue.queue(self)
 
1190
 
1228
1191
        self.pathlock_release = yield self._acquire_pathlock()
1229
1192
        if self.cancelled:
1230
 
            self.pathlock_release()
 
1193
            if self.pathlock_release is not None:
 
1194
                self.log.debug('releasing the pathlock because of cancelled')
 
1195
                self.pathlock_release()
1231
1196
            return
1232
1197
 
 
1198
        try:
 
1199
            yield self.run()
 
1200
        finally:
 
1201
            if self.pathlock_release is not None:
 
1202
                self.pathlock_release()
 
1203
 
 
1204
    @defer.inlineCallbacks
 
1205
    def run(self):
 
1206
        """Run the command."""
1233
1207
        self.running = True
1234
 
        if self._queue.active:
1235
 
            self.run()
1236
 
 
1237
 
    def resume(self):
1238
 
        """Continue running if was running before."""
1239
 
        if self.running and self._queue.active:
1240
 
            self.run()
1241
 
 
1242
 
    def check_conditions(self):
1243
 
        """If conditions are ok, run the command again."""
1244
 
        if not self.running and self._queue.active:
1245
 
            self.running = True
1246
 
            self.run()
1247
 
 
1248
 
    @defer.inlineCallbacks
1249
 
    def run(self):
1250
 
        """Do the deed."""
1251
 
        if not self.is_runnable:
1252
 
            self.log.debug('not running because of conditions')
1253
 
            self.running = False
1254
 
            return
1255
 
 
1256
 
        if not self._queue.active:
1257
 
            self.log.debug('not retrying because queue not active')
1258
 
            self.running = False
1259
 
            return
1260
 
 
1261
 
        try:
1262
 
            if self.start_done:
1263
 
                self.log.debug('retrying')
1264
 
            else:
1265
 
                self.log.debug('starting')
1266
 
                yield self._start()
1267
 
                self.start_done = True
 
1208
        self.log.debug('starting')
 
1209
        yield self._start()
 
1210
 
 
1211
        while True:
 
1212
            if self.cancelled:
 
1213
                self.log.debug('cancelled before trying to run')
 
1214
                break
 
1215
 
 
1216
            # if queue not active, wait for it and check again
 
1217
            if not self._queue.active:
 
1218
                self.log.debug('not running because of inactive queue')
 
1219
                self.wait_for_queue = defer.Deferred()
 
1220
                yield self.wait_for_queue
 
1221
                continue
 
1222
 
 
1223
            if not self.is_runnable:
 
1224
                self.log.debug('not running because of conditions')
 
1225
                self.wait_for_conditions = defer.Deferred()
 
1226
                yield self.wait_for_conditions
 
1227
                continue
 
1228
 
 
1229
            try:
1268
1230
                yield self.markers_resolved_deferred
1269
 
        except Exception, e:
1270
 
            yield self.end_errback(Failure(e))
1271
 
        else:
1272
 
            yield self._run_command()
1273
 
 
1274
 
    def _run_command(self):
1275
 
        """Really execute the command."""
1276
 
        self.log.debug('running')
1277
 
        d = self._run()
1278
 
        d.addCallbacks(self.end_callback, self.end_errback)
1279
 
        return d
 
1231
                self.log.debug('running')
 
1232
                result = yield self._run()
 
1233
 
 
1234
            except Exception, exc:
 
1235
                if self.cancelled:
 
1236
                    self.log.debug('cancelled while running')
 
1237
                    break
 
1238
                if exc.__class__ in self.suppressed_error_messages:
 
1239
                    self.log.warn('failure: %s', exc)
 
1240
                else:
 
1241
                    self.log.exception('failure: %s (traceback follows)', exc)
 
1242
                self.cleanup()
 
1243
 
 
1244
                if exc.__class__ in self.retryable_errors:
 
1245
                    self.log.debug('retrying')
 
1246
                    continue
 
1247
                else:
 
1248
                    self.handle_failure(Failure(exc))
 
1249
            else:
 
1250
                if self.cancelled:
 
1251
                    self.log.debug('cancelled while running')
 
1252
                    break
 
1253
                self.log.debug('success')
 
1254
                self.handle_success(result)
 
1255
 
 
1256
            # finish the command
 
1257
            self.finish()
 
1258
            return
1280
1259
 
1281
1260
    def cancel(self):
1282
1261
        """Cancel the command."""
1290
1269
 
1291
1270
    def handle_success(self, success):
1292
1271
        """Do anthing that's needed to handle success of the operation."""
1293
 
        return success
1294
1272
 
1295
1273
    def handle_failure(self, failure):
1296
 
        """Do anthing that's needed to handle failure of the operation.
1297
 
 
1298
 
        Note that cancellation and TRY_AGAIN are already handled.
1299
 
        """
1300
 
        return failure
 
1274
        """Do anthing that's needed to handle failure of the operation."""
1301
1275
 
1302
1276
    def __str__(self, str_attrs=None):
1303
1277
        """Return a str representation of the instance."""
1342
1316
                 new_generation=request.new_generation,
1343
1317
                 volume_id=self.share_id)
1344
1318
        self.action_queue.event_queue.push(self.ok_event_name, **d)
1345
 
        return request
1346
1319
 
1347
1320
    def handle_failure(self, failure):
1348
1321
        """It didn't work! Push the event."""
1410
1383
        d = dict(share_id=self.share_id, node_id=self.node_id,
1411
1384
                 new_generation=request.new_generation)
1412
1385
        self.action_queue.event_queue.push('AQ_MOVE_OK', **d)
1413
 
        return request
1414
1386
 
1415
1387
    def handle_failure(self, failure):
1416
1388
        """It didn't work! Push the event."""
1474
1446
        d = dict(share_id=self.share_id, parent_id=self.parent_id,
1475
1447
                 node_id=self.node_id, new_generation=request.new_generation)
1476
1448
        self.action_queue.event_queue.push('AQ_UNLINK_OK', **d)
1477
 
        return request
1478
1449
 
1479
1450
    def handle_failure(self, failure):
1480
1451
        """It didn't work! Push the event."""
1816
1787
        """Info for uniqueness."""
1817
1788
        return (self.__class__.__name__, self.volume_id)
1818
1789
 
1819
 
    def queue(self):
1820
 
        """Queue the command only if it should."""
1821
 
        # first start part to get the logger
1822
 
        self.pre_queue_setup()
1823
 
 
 
1790
    def _should_be_queued(self):
 
1791
        """Determine if the command should be queued or other removed."""
1824
1792
        if self.uniqueness in self._queue.hashed_waiting:
1825
1793
            # other GetDelta for same volume! leave the smaller one
1826
1794
            queued_command = self._queue.hashed_waiting[self.uniqueness]
1833
1801
            else:
1834
1802
                self.log.debug("not queueing self because there's other "
1835
1803
                               "command with less or same gen num")
1836
 
                return
 
1804
                return False
1837
1805
 
1838
1806
        # no similar command, or removed the previous command (if not running)
1839
 
        self.log.debug('queueing')
1840
 
        self._queue.queue(self)
 
1807
        return True
1841
1808
 
1842
1809
    def handle_success(self, request):
1843
1810
        """It worked! Push the success event."""
1862
1829
 
1863
1830
    def make_logger(self):
1864
1831
        """Create a logger for this object."""
1865
 
        return mklog(logger, 'GetDelta', self.volume_id,
1866
 
                     None, generation=self.generation)
 
1832
        self.log = mklog(logger, 'GetDelta', self.volume_id,
 
1833
                         None, generation=self.generation)
1867
1834
 
1868
1835
 
1869
1836
class GetDeltaFromScratch(ActionQueueCommand):
1883
1850
        """Info for uniqueness."""
1884
1851
        return (self.__class__.__name__, self.volume_id)
1885
1852
 
1886
 
    def queue(self):
1887
 
        """Queue the command only if it should."""
1888
 
        # first start part to get the logger
1889
 
        self.pre_queue_setup()
1890
 
 
 
1853
    def _should_be_queued(self):
 
1854
        """Determine if the command should be queued."""
1891
1855
        if self.uniqueness in self._queue.hashed_waiting:
1892
1856
            # other GetDeltaFromScratch for same volume! skip self
1893
1857
            m = "GetDeltaFromScratch already queued, not queueing self"
1894
1858
            self.log.debug(m)
1895
 
        else:
1896
 
            self.log.debug('queueing')
1897
 
            self._queue.queue(self)
 
1859
            return False
 
1860
 
 
1861
        return True
1898
1862
 
1899
1863
    def handle_success(self, request):
1900
1864
        """It worked! Push the success event."""
1914
1878
 
1915
1879
    def make_logger(self):
1916
1880
        """Create a logger for this object."""
1917
 
        return mklog(logger, 'GetDeltaFromScratch', self.volume_id, None)
 
1881
        self.log = mklog(logger, 'GetDeltaFromScratch', self.volume_id, None)
1918
1882
 
1919
1883
 
1920
1884
 
2081
2045
        """Info for uniqueness."""
2082
2046
        return (self.__class__.__name__, self.share_id, self.node_id)
2083
2047
 
2084
 
    def queue(self):
 
2048
    def _should_be_queued(self):
2085
2049
        """Queue but keeping uniqueness."""
2086
 
        # first start part to get the logger
2087
 
        self.pre_queue_setup()
2088
 
 
2089
2050
        for uniq in [(Upload.__name__, self.share_id, self.node_id),
2090
2051
                     (Download.__name__, self.share_id, self.node_id)]:
2091
2052
            if uniq in self._queue.hashed_waiting:
2093
2054
                self._queue.waiting.remove(previous_command)
2094
2055
                m = "removing previous command because uniqueness: %s"
2095
2056
                logger.debug(m, previous_command)
2096
 
 
2097
 
        self.log.debug('queueing')
2098
 
        self._queue.queue(self)
 
2057
        return True
2099
2058
 
2100
2059
    def _acquire_pathlock(self):
2101
2060
        """Acquire pathlock."""
2116
2075
 
2117
2076
    def _run(self):
2118
2077
        """Do the actual running."""
2119
 
        if self.cancelled:
2120
 
            return defer.fail(RequestCleanedUp('CANCELLED'))
2121
 
 
2122
2078
        # start or reset the file object, and get a new decompressor
2123
2079
        if self.fileobj is None:
2124
2080
            try:
2153
2109
        self.download_req = req
2154
2110
        downloading[self.share_id, self.node_id]['req'] = req
2155
2111
        d = req.deferred
2156
 
        d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
2157
 
                  if self.cancelled else x)
2158
2112
        d.addCallback(passit(
2159
2113
                lambda _: downloading.pop((self.share_id, self.node_id))))
2160
2114
        return d
2183
2137
        self._queue.transfers_semaphore.release()
2184
2138
        self.log.debug('semaphore released')
2185
2139
 
2186
 
        if failure.check(protocol_errors.RequestCancelledError,
2187
 
                         RequestCleanedUp):
 
2140
        if failure.check(protocol_errors.RequestCancelledError):
2188
2141
            self.action_queue.event_queue.push('AQ_DOWNLOAD_CANCELLED',
2189
2142
                                               share_id=self.share_id,
2190
2143
                                               node_id=self.node_id,
2297
2250
            return self.action_queue.have_sufficient_space_for_upload(
2298
2251
                                                    self.share_id, self.size)
2299
2252
 
2300
 
    def queue(self):
 
2253
    def _should_be_queued(self):
2301
2254
        """Queue but keeping uniqueness."""
2302
 
        # first start part to get the logger
2303
 
        self.pre_queue_setup()
2304
 
 
2305
2255
        for uniq in [(Upload.__name__, self.share_id, self.node_id),
2306
2256
                     (Download.__name__, self.share_id, self.node_id)]:
2307
2257
            if uniq in self._queue.hashed_waiting:
2309
2259
                self._queue.waiting.remove(previous_command)
2310
2260
                m = "removing previous command because uniqueness: %s"
2311
2261
                logger.debug(m, previous_command)
2312
 
 
2313
 
        self.log.debug('queueing')
2314
 
        self._queue.queue(self)
 
2262
        return True
2315
2263
 
2316
2264
    @property
2317
2265
    def uniqueness(self):
2345
2293
        upload = {"hash": self.hash, "req": self}
2346
2294
        self.action_queue.uploading[self.share_id, self.node_id] = upload
2347
2295
 
2348
 
        try:
2349
 
            yield self.action_queue.zip_queue.zip(self)
2350
 
        finally:
2351
 
            if self.cancelled:
2352
 
                raise RequestCleanedUp('CANCELLED')
 
2296
        yield self.action_queue.zip_queue.zip(self)
2353
2297
 
2354
2298
    def _run(self):
2355
2299
        """Do the actual running."""
2356
 
        if self.cancelled:
2357
 
            return defer.fail(RequestCleanedUp('CANCELLED'))
2358
 
 
2359
 
        # need to check if client is still there, as _start() may delay a lot
2360
 
        # the execution of this
2361
 
        if self.action_queue.client is None:
2362
 
            return defer.fail(ClientNoLongerThere('Detected in _run'))
2363
 
 
2364
2300
        uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
2365
2301
                     "req": self}
2366
2302
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
2379
2315
            self.crc32, self.size, self.deflated_size, f)
2380
2316
        self.upload_req = req
2381
2317
        d = req.deferred
2382
 
        d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
2383
 
                  if self.cancelled else x)
2384
2318
        d.addBoth(passit(lambda _: self.action_queue.uploading.pop(
2385
2319
                                        (self.share_id, self.node_id), None)))
2386
2320
        d.addBoth(passit(lambda _: self.tempfile.close()))