~didrocks/ubuntuone-client/use_result_var

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-02-11 16:18:11 UTC
  • mto: This revision was merged to the branch mainline in revision 67.
  • Revision ID: james.westby@ubuntu.com-20110211161811-n18dj9lde7dxqjzr
Tags: upstream-1.5.4
ImportĀ upstreamĀ versionĀ 1.5.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
70
70
TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024
71
71
 
72
72
# no more than this uploads are executed simultaneously
73
 
SIMULT_TRANSFERS = 10
 
73
SIMULT_TRANSFERS = 3
74
74
 
75
75
def passit(func):
76
76
    """Pass the value on for the next deferred, while calling func with it."""
84
84
    return wrapper
85
85
 
86
86
 
87
 
class ClientNoLongerThere(Exception):
88
 
    """AQ Client is no longer there.
89
 
 
90
 
    This is used for Commands that have work to do after starting and before
91
 
    actually using the client, and in this time frame the client can
92
 
    disappear (connection lost, etc.).
93
 
    """
94
 
 
95
 
 
96
 
class UploadCompressionCancelled(Exception):
97
 
    """Compression of a file for upload cancelled."""
98
 
 
99
 
 
100
 
class RequestCleanedUp(Exception):
101
 
    """The request was cancelled by ActionQueue.cleanup()."""
102
 
 
103
 
 
104
87
class PathLockingTree(object):
105
88
    """Tree that stores deferreds in the nodes."""
106
89
 
369
352
 
370
353
        try:
371
354
            if upload.cancelled:
372
 
                raise UploadCompressionCancelled("Cancelled")
 
355
                # avoid compression if command already cancelled
 
356
                return
373
357
            upload.log.debug('compressing: %r', filename)
374
358
            # we need to compress the file completely to figure out its
375
359
            # compressed size. So streaming is out :(
387
371
                    # anyway (being in /tmp and all)
388
372
                    break
389
373
                f.write(zipper.compress(data))
390
 
            if upload.cancelled:
391
 
                raise UploadCompressionCancelled("Cancelled")
392
374
            upload.deflated_size = f.tell()
393
375
            # close the compressed file (thus, if you actually want to stream
394
376
            # it out, it must have a name so it can be reopnened)
406
388
 
407
389
        yield self.acquire()
408
390
        try:
409
 
            fileobj = upload.fileobj_factory()
410
 
        except StandardError, e:
411
 
            # presumably the user deleted the file before we got to upload it
412
 
            upload.log.warn("Unable to build fileobj (%s: '%s') so cancelling "
413
 
                            "the upload.", type(e), e)
414
 
            upload.cancel()
415
 
            raise UploadCompressionCancelled("Cancelled because of "
416
 
                                             "fileobj_factory failure.")
 
391
            try:
 
392
                fileobj = upload.fileobj_factory()
 
393
            except StandardError, e:
 
394
                # maybe the user deleted the file before we got to upload it
 
395
                upload.log.warn("Unable to build fileobj (%s: '%s') so "
 
396
                                "cancelling the upload.", type(e), e)
 
397
                upload.cancel()
 
398
                return
417
399
 
418
 
        try:
419
400
            reactor.callInThread(self._compress, deferred, upload, fileobj)
420
401
        finally:
421
402
            self.release()
422
403
 
423
404
        # let's wait _compress to finish
424
 
        yield deferred
 
405
        try:
 
406
            yield deferred
 
407
        finally:
 
408
            fileobj.close()
425
409
 
426
410
 
427
411
class RequestQueue(object):
457
441
        if first_added:
458
442
            self.action_queue.event_queue.push('SYS_QUEUE_WAITING')
459
443
 
460
 
        command.locked_run()
461
 
 
462
444
    def unqueue(self, command):
463
445
        """Unqueue a command."""
464
446
        self.waiting.remove(command)
540
522
class UploadProgressWrapper(object):
541
523
    """A wrapper around the file-like object used for Uploads.
542
524
 
543
 
    It can be used to keep track of the number of bytes that have been
544
 
    written to the store and invokes a hook on progress.
 
525
    It adjusts automatically the transfer variables in the command.
545
526
 
546
 
    fd is the file-like object used for uploads. data_dict is the
547
 
    entry in the uploading dictionary.
 
527
    fd is the file-like object used for uploads.
548
528
    """
549
529
 
550
 
    __slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
 
530
    __slots__ = ('fd', 'command')
551
531
 
552
 
    def __init__(self, fd, data_dict, progress_hook):
 
532
    def __init__(self, fd, command):
553
533
        self.fd = fd
554
 
        self.data_dict = data_dict
555
 
        self.n_bytes_read = 0
556
 
        self.progress_hook = progress_hook
 
534
        self.command = command
557
535
 
558
536
    def read(self, size=None):
559
537
        """Read at most size bytes from the file-like object.
560
538
 
561
 
        Keep track of the number of bytes that have been read, and the
562
 
        number of bytes that have been written (assumed to be equal to
563
 
        the number of bytes read on the previews call to read). The
564
 
        latter is done directly in the data_dict.
 
539
        Keep track of the number of bytes that have been read.
565
540
        """
566
 
        self.data_dict['n_bytes_written'] = self.n_bytes_read
567
 
        self.progress_hook(self.n_bytes_read)
568
 
 
569
541
        data = self.fd.read(size)
570
 
        self.n_bytes_read += len(data)
 
542
        self.command.n_bytes_written += len(data)
 
543
        self.command.progress_hook()
571
544
        return data
572
545
 
573
546
    def __getattr__(self, attr):
615
588
        self.zip_queue = ZipQueue()
616
589
 
617
590
        self.estimated_free_space = {}
618
 
 
619
 
        self.uploading = {}
620
 
        self.downloading = {}
621
 
 
622
591
        event_queue.subscribe(self)
623
592
 
624
593
    def check_conditions(self):
948
917
    def make_file(self, share_id, parent_id, name, marker, path):
949
918
        """See .interfaces.IMetaQueue."""
950
919
        return MakeFile(self.queue, share_id, parent_id,
951
 
                        name, marker, path).queue()
 
920
                        name, marker, path).go()
952
921
 
953
922
    def make_dir(self, share_id, parent_id, name, marker, path):
954
923
        """See .interfaces.IMetaQueue."""
955
924
        return MakeDir(self.queue, share_id, parent_id,
956
 
                       name, marker, path).queue()
 
925
                       name, marker, path).go()
957
926
 
958
927
    def move(self, share_id, node_id, old_parent_id, new_parent_id,
959
928
             new_name, path_from, path_to):
960
929
        """See .interfaces.IMetaQueue."""
961
930
        return Move(self.queue, share_id, node_id, old_parent_id,
962
 
                    new_parent_id, new_name, path_from, path_to).queue()
 
931
                    new_parent_id, new_name, path_from, path_to).go()
963
932
 
964
933
    def unlink(self, share_id, parent_id, node_id, path):
965
934
        """See .interfaces.IMetaQueue."""
966
 
        return Unlink(self.queue, share_id, parent_id, node_id, path).queue()
 
935
        return Unlink(self.queue, share_id, parent_id, node_id, path).go()
967
936
 
968
937
    def inquire_free_space(self, share_id):
969
938
        """See .interfaces.IMetaQueue."""
970
 
        return FreeSpaceInquiry(self.queue, share_id).queue()
 
939
        return FreeSpaceInquiry(self.queue, share_id).go()
971
940
 
972
941
    def inquire_account_info(self):
973
942
        """See .interfaces.IMetaQueue."""
974
 
        return AccountInquiry(self.queue).queue()
 
943
        return AccountInquiry(self.queue).go()
975
944
 
976
945
    def list_shares(self):
977
946
        """See .interfaces.IMetaQueue."""
978
 
        return ListShares(self.queue).queue()
 
947
        return ListShares(self.queue).go()
979
948
 
980
949
    def answer_share(self, share_id, answer):
981
950
        """See .interfaces.IMetaQueue."""
982
 
        return AnswerShare(self.queue, share_id, answer).queue()
 
951
        return AnswerShare(self.queue, share_id, answer).go()
983
952
 
984
953
    def create_share(self, node_id, share_to, name, access_level,
985
954
                     marker, path):
986
955
        """See .interfaces.IMetaQueue."""
987
956
        return CreateShare(self.queue, node_id, share_to, name,
988
 
                           access_level, marker, path).queue()
 
957
                           access_level, marker, path).go()
989
958
 
990
959
    def delete_share(self, share_id):
991
960
        """See .interfaces.IMetaQueue."""
992
 
        return DeleteShare(self.queue, share_id).queue()
 
961
        return DeleteShare(self.queue, share_id).go()
993
962
 
994
963
    def create_udf(self, path, name, marker):
995
964
        """See .interfaces.IMetaQueue."""
996
 
        return CreateUDF(self.queue, path, name, marker).queue()
 
965
        return CreateUDF(self.queue, path, name, marker).go()
997
966
 
998
967
    def list_volumes(self):
999
968
        """See .interfaces.IMetaQueue."""
1000
 
        return ListVolumes(self.queue).queue()
 
969
        return ListVolumes(self.queue).go()
1001
970
 
1002
971
    def delete_volume(self, volume_id, path):
1003
972
        """See .interfaces.IMetaQueue."""
1004
 
        return DeleteVolume(self.queue, volume_id, path).queue()
 
973
        return DeleteVolume(self.queue, volume_id, path).go()
1005
974
 
1006
975
    def change_public_access(self, share_id, node_id, is_public):
1007
976
        """See .interfaces.IMetaQueue."""
1008
977
        return ChangePublicAccess(self.queue, share_id,
1009
 
                                  node_id, is_public).queue()
 
978
                                  node_id, is_public).go()
1010
979
 
1011
980
    def get_public_files(self):
1012
981
        """See .interfaces.IMetaQueue."""
1013
 
        return GetPublicFiles(self.queue).queue()
 
982
        return GetPublicFiles(self.queue).go()
1014
983
 
1015
984
    def download(self, share_id, node_id, server_hash, path, fileobj_factory):
1016
985
        """See .interfaces.IContentQueue.download."""
1017
986
        return Download(self.queue, share_id, node_id, server_hash,
1018
 
                        path, fileobj_factory).queue()
 
987
                        path, fileobj_factory).go()
1019
988
 
1020
989
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
1021
990
               size, path, fileobj_factory, tempfile_factory=None):
1022
991
        """See .interfaces.IContentQueue."""
1023
992
        return Upload(self.queue, share_id, node_id, previous_hash,
1024
993
                      hash, crc32, size, path,
1025
 
                      fileobj_factory, tempfile_factory).queue()
 
994
                      fileobj_factory, tempfile_factory).go()
1026
995
 
1027
996
    def _cancel_op(self, share_id, node_id, cmdclass):
1028
997
        """Generalized form of cancel_upload and cancel_download."""
1053
1022
 
1054
1023
    def get_delta(self, volume_id, generation):
1055
1024
        """See .interfaces.IMetaQueue."""
1056
 
        return GetDelta(self.queue, volume_id, generation).queue()
 
1025
        return GetDelta(self.queue, volume_id, generation).go()
1057
1026
 
1058
1027
    def rescan_from_scratch(self, volume_id):
1059
1028
        """See .interfaces.IMetaQueue."""
1060
 
        return GetDeltaFromScratch(self.queue, volume_id).queue()
 
1029
        return GetDeltaFromScratch(self.queue, volume_id).go()
1061
1030
 
1062
1031
    def handle_SYS_ROOT_RECEIVED(self, root_id, mdid):
1063
1032
        """Demark the root node_id."""
1073
1042
        [x for x in protocol_errors._error_mapping.values()
1074
1043
         if x is not protocol_errors.InternalError] +
1075
1044
        [protocol_errors.RequestCancelledError,
1076
 
         twisted_errors.ConnectionDone, RequestCleanedUp]
 
1045
         twisted_errors.ConnectionDone, twisted_errors.ConnectionLost]
1077
1046
    )
1078
1047
 
1079
1048
    retryable_errors = (
1081
1050
        protocol_errors.QuotaExceededError,
1082
1051
        twisted_errors.ConnectionDone,
1083
1052
        twisted_errors.ConnectionLost,
1084
 
        ClientNoLongerThere,
1085
1053
    )
1086
1054
 
1087
1055
    logged_attrs = ()
1089
1057
    is_runnable = True
1090
1058
    uniqueness = None
1091
1059
 
1092
 
    __slots__ = ('_queue', 'start_done', 'running', 'pathlock_release', 'log',
1093
 
                 'markers_resolved_deferred', 'action_queue', 'cancelled')
 
1060
    __slots__ = ('_queue', 'running', 'pathlock_release', 'log',
 
1061
                 'markers_resolved_deferred', 'action_queue', 'cancelled',
 
1062
                 'wait_for_queue', 'wait_for_conditions')
1094
1063
 
1095
1064
    def __init__(self, request_queue):
1096
1065
        """Initialize a command instance."""
1097
1066
        self._queue = request_queue
1098
1067
        self.action_queue = request_queue.action_queue
1099
 
        self.start_done = False
1100
1068
        self.running = False
1101
1069
        self.log = None
1102
1070
        self.markers_resolved_deferred = defer.Deferred()
1103
1071
        self.pathlock_release = None
1104
1072
        self.cancelled = False
1105
1073
 
 
1074
        self.wait_for_queue = None
 
1075
        self.wait_for_conditions = None
 
1076
 
1106
1077
    def to_dict(self):
1107
1078
        """Dump logged attributes to a dict."""
1108
1079
        return dict((n, getattr(self, n, None)) for n in self.logged_attrs)
1112
1083
        share_id = getattr(self, "share_id", UNKNOWN)
1113
1084
        node_id = getattr(self, "node_id", None) or \
1114
1085
                      getattr(self, "marker", UNKNOWN)
1115
 
        return mklog(logger, self.__class__.__name__,
1116
 
                     share_id, node_id, **self.to_dict())
 
1086
        self.log = mklog(logger, self.__class__.__name__,
 
1087
                         share_id, node_id, **self.to_dict())
1117
1088
 
1118
1089
    @defer.inlineCallbacks
1119
1090
    def demark(self):
1149
1120
                # as the attr changed (been demarked), need to reput itself
1150
1121
                # in the hashed_waiting, if was there before and not cancelled
1151
1122
                if old_uniqueness in self._queue.hashed_waiting:
1152
 
                    if not getattr(self, 'cancelled', False):
 
1123
                    if not self.cancelled:
1153
1124
                        del self._queue.hashed_waiting[old_uniqueness]
1154
1125
                        self._queue.hashed_waiting[self.uniqueness] = self
1155
1126
        else:
1160
1131
        """The command ended."""
1161
1132
        self.running = False
1162
1133
        self._queue.unqueue(self)
1163
 
        if self.pathlock_release is not None:
1164
 
            self.pathlock_release()
1165
 
 
1166
 
    def end_callback(self, arg):
1167
 
        """It worked!"""
1168
 
        if not self.running:
1169
 
            self.log.debug('not running, so no success')
1170
 
            return
1171
 
 
1172
 
        self.log.debug('success')
1173
 
        self.handle_success(arg)
1174
 
        self.finish()
1175
 
 
1176
 
    def end_errback(self, failure):
1177
 
        """It failed!"""
1178
 
        if self.cancelled:
1179
 
            self.log.debug('not errbacking because cancelled')
1180
 
            return
1181
 
 
1182
 
        error_message = failure.getErrorMessage()
1183
 
        if failure.check(*self.suppressed_error_messages):
1184
 
            self.log.warn('failure: %s', error_message)
1185
 
        else:
1186
 
            self.log.error('failure: %s', error_message)
1187
 
            self.log.debug('traceback follows:\n\n' + failure.getTraceback())
1188
 
        self.cleanup()
1189
 
 
1190
 
        if failure.check(*self.retryable_errors):
1191
 
            if self._queue.active:
1192
 
                reactor.callLater(0.1, self.run)
1193
 
            else:
1194
 
                self.log.debug('not retrying because queue not active')
1195
 
        else:
1196
 
            self.handle_failure(failure)
1197
 
            self.finish()
1198
 
 
1199
 
    def pre_queue_setup(self):
1200
 
        """Set up before the command gets really queued."""
1201
 
        self.log = self.make_logger()
1202
 
        self.demark()
1203
 
 
1204
 
    def queue(self):
1205
 
        """Queue the command."""
1206
 
        self.pre_queue_setup()
1207
 
        self.log.debug('queueing')
1208
 
        self._queue.queue(self)
 
1134
 
 
1135
    def _should_be_queued(self):
 
1136
        """Return True if the command should be queued."""
 
1137
        return True
1209
1138
 
1210
1139
    def cleanup(self):
1211
1140
        """Do whatever is needed to clean up from a failure.
1212
1141
 
1213
1142
        For example, stop producers and others that aren't cleaned up
1214
 
        appropriately on their own.
 
1143
        appropriately on their own.  Note that this may be called more
 
1144
        than once.
1215
1145
        """
1216
1146
        self.log.debug('cleanup')
1217
1147
 
1219
1149
        """Do the specialized pre-run setup."""
1220
1150
        return defer.succeed(None)
1221
1151
 
1222
 
    @defer.inlineCallbacks
1223
 
    def locked_run(self):
1224
 
        """Lock and run."""
1225
 
        self.pathlock_release = yield self._acquire_pathlock()
1226
 
        if self.cancelled:
1227
 
            self.pathlock_release()
1228
 
            return
1229
 
 
1230
 
        self.running = True
1231
 
        if self._queue.active:
1232
 
            self.run()
1233
 
 
1234
1152
    def resume(self):
1235
 
        """Continue running if was running before."""
1236
 
        if self.running and self._queue.active:
1237
 
            self.run()
 
1153
        """Unlock the command because the queue is back alive."""
 
1154
        if self.wait_for_queue is not None:
 
1155
            self.wait_for_queue.callback(True)
 
1156
            self.wait_for_queue = None
1238
1157
 
1239
1158
    def check_conditions(self):
1240
1159
        """If conditions are ok, run the command again."""
1241
 
        if not self.running and self._queue.active:
1242
 
            self.running = True
1243
 
            self.run()
 
1160
        if self.is_runnable and self.wait_for_conditions is not None:
 
1161
            self.wait_for_conditions.callback(True)
 
1162
            self.wait_for_conditions = None
 
1163
 
 
1164
    @defer.inlineCallbacks
 
1165
    def go(self):
 
1166
        """Execute all the steps for a command."""
 
1167
        # create the log
 
1168
        self.make_logger()
 
1169
 
 
1170
        # queue if should, otherwise all is done
 
1171
        if not self._should_be_queued():
 
1172
            return
 
1173
 
 
1174
        self.log.debug('queueing')
 
1175
        self._queue.queue(self)
 
1176
 
 
1177
        # set up basic marker failure handler and demark
 
1178
        def f(failure):
 
1179
            self.log.debug("failing because marker failed: %s", failure)
 
1180
            self.cancelled = True
 
1181
            self.cleanup()
 
1182
            self.handle_failure(failure)
 
1183
            self.finish()
 
1184
        self.markers_resolved_deferred.addErrback(f)
 
1185
        self.demark()
 
1186
 
 
1187
        self.pathlock_release = yield self._acquire_pathlock()
 
1188
        if self.cancelled:
 
1189
            if self.pathlock_release is not None:
 
1190
                self.log.debug('releasing the pathlock because of cancelled')
 
1191
                self.pathlock_release()
 
1192
            return
 
1193
 
 
1194
        try:
 
1195
            yield self.run()
 
1196
        finally:
 
1197
            if self.pathlock_release is not None:
 
1198
                self.pathlock_release()
1244
1199
 
1245
1200
    @defer.inlineCallbacks
1246
1201
    def run(self):
1247
 
        """Do the deed."""
1248
 
        if not self.is_runnable:
1249
 
            self.log.debug('not running because of conditions')
1250
 
            self.running = False
1251
 
            return
1252
 
 
1253
 
        try:
1254
 
            if self.start_done:
1255
 
                self.log.debug('retrying')
 
1202
        """Run the command."""
 
1203
        self.log.debug('starting')
 
1204
        yield self._start()
 
1205
 
 
1206
        while True:
 
1207
            if self.cancelled:
 
1208
                yield self.markers_resolved_deferred
 
1209
                self.log.debug('cancelled before trying to run')
 
1210
                break
 
1211
 
 
1212
            # if queue not active, wait for it and check again
 
1213
            if not self._queue.active:
 
1214
                self.log.debug('not running because of inactive queue')
 
1215
                self.wait_for_queue = defer.Deferred()
 
1216
                yield self.wait_for_queue
 
1217
                continue
 
1218
 
 
1219
            if not self.is_runnable:
 
1220
                self.log.debug('not running because of conditions')
 
1221
                self.wait_for_conditions = defer.Deferred()
 
1222
                yield self.wait_for_conditions
 
1223
                continue
 
1224
 
 
1225
            try:
 
1226
                yield self.markers_resolved_deferred
 
1227
                self.log.debug('running')
 
1228
                self.running = True
 
1229
                result = yield self._run()
 
1230
 
 
1231
            except Exception, exc:
 
1232
                if self.cancelled:
 
1233
                    self.log.debug('cancelled while running')
 
1234
                    break
 
1235
                if exc.__class__ in self.suppressed_error_messages:
 
1236
                    self.log.warn('failure: %s', exc)
 
1237
                else:
 
1238
                    self.log.exception('failure: %s (traceback follows)', exc)
 
1239
                self.cleanup()
 
1240
 
 
1241
                if exc.__class__ in self.retryable_errors:
 
1242
                    self.log.debug('retrying')
 
1243
                    continue
 
1244
                else:
 
1245
                    self.handle_failure(Failure(exc))
1256
1246
            else:
1257
 
                self.log.debug('starting')
1258
 
                yield self._start()
1259
 
                self.start_done = True
1260
 
                yield self.markers_resolved_deferred
1261
 
        except Exception, e:
1262
 
            yield self.end_errback(Failure(e))
1263
 
        else:
1264
 
            yield self._run_command()
 
1247
                if self.cancelled:
 
1248
                    self.log.debug('cancelled while running')
 
1249
                    break
 
1250
                self.log.debug('success')
 
1251
                self.handle_success(result)
1265
1252
 
1266
 
    def _run_command(self):
1267
 
        """Really execute the command."""
1268
 
        self.log.debug('running')
1269
 
        d = self._run()
1270
 
        d.addCallbacks(self.end_callback, self.end_errback)
1271
 
        return d
 
1253
            # finish the command
 
1254
            self.finish()
 
1255
            return
1272
1256
 
1273
1257
    def cancel(self):
1274
1258
        """Cancel the command."""
1282
1266
 
1283
1267
    def handle_success(self, success):
1284
1268
        """Do anthing that's needed to handle success of the operation."""
1285
 
        return success
1286
1269
 
1287
1270
    def handle_failure(self, failure):
1288
 
        """Do anthing that's needed to handle failure of the operation.
1289
 
 
1290
 
        Note that cancellation and TRY_AGAIN are already handled.
1291
 
        """
1292
 
        return failure
 
1271
        """Do anthing that's needed to handle failure of the operation."""
1293
1272
 
1294
1273
    def __str__(self, str_attrs=None):
1295
1274
        """Return a str representation of the instance."""
1334
1313
                 new_generation=request.new_generation,
1335
1314
                 volume_id=self.share_id)
1336
1315
        self.action_queue.event_queue.push(self.ok_event_name, **d)
1337
 
        return request
1338
1316
 
1339
1317
    def handle_failure(self, failure):
1340
1318
        """It didn't work! Push the event."""
1402
1380
        d = dict(share_id=self.share_id, node_id=self.node_id,
1403
1381
                 new_generation=request.new_generation)
1404
1382
        self.action_queue.event_queue.push('AQ_MOVE_OK', **d)
1405
 
        return request
1406
1383
 
1407
1384
    def handle_failure(self, failure):
1408
1385
        """It didn't work! Push the event."""
1466
1443
        d = dict(share_id=self.share_id, parent_id=self.parent_id,
1467
1444
                 node_id=self.node_id, new_generation=request.new_generation)
1468
1445
        self.action_queue.event_queue.push('AQ_UNLINK_OK', **d)
1469
 
        return request
1470
1446
 
1471
1447
    def handle_failure(self, failure):
1472
1448
        """It didn't work! Push the event."""
1808
1784
        """Info for uniqueness."""
1809
1785
        return (self.__class__.__name__, self.volume_id)
1810
1786
 
1811
 
    def queue(self):
1812
 
        """Queue the command only if it should."""
1813
 
        # first start part to get the logger
1814
 
        self.pre_queue_setup()
1815
 
 
 
1787
    def _should_be_queued(self):
 
1788
        """Determine if the command should be queued or other removed."""
1816
1789
        if self.uniqueness in self._queue.hashed_waiting:
1817
1790
            # other GetDelta for same volume! leave the smaller one
1818
1791
            queued_command = self._queue.hashed_waiting[self.uniqueness]
1825
1798
            else:
1826
1799
                self.log.debug("not queueing self because there's other "
1827
1800
                               "command with less or same gen num")
1828
 
                return
 
1801
                return False
1829
1802
 
1830
1803
        # no similar command, or removed the previous command (if not running)
1831
 
        self.log.debug('queueing')
1832
 
        self._queue.queue(self)
 
1804
        return True
1833
1805
 
1834
1806
    def handle_success(self, request):
1835
1807
        """It worked! Push the success event."""
1854
1826
 
1855
1827
    def make_logger(self):
1856
1828
        """Create a logger for this object."""
1857
 
        return mklog(logger, 'GetDelta', self.volume_id,
1858
 
                     None, generation=self.generation)
 
1829
        self.log = mklog(logger, 'GetDelta', self.volume_id,
 
1830
                         None, generation=self.generation)
1859
1831
 
1860
1832
 
1861
1833
class GetDeltaFromScratch(ActionQueueCommand):
1875
1847
        """Info for uniqueness."""
1876
1848
        return (self.__class__.__name__, self.volume_id)
1877
1849
 
1878
 
    def queue(self):
1879
 
        """Queue the command only if it should."""
1880
 
        # first start part to get the logger
1881
 
        self.pre_queue_setup()
1882
 
 
 
1850
    def _should_be_queued(self):
 
1851
        """Determine if the command should be queued."""
1883
1852
        if self.uniqueness in self._queue.hashed_waiting:
1884
1853
            # other GetDeltaFromScratch for same volume! skip self
1885
1854
            m = "GetDeltaFromScratch already queued, not queueing self"
1886
1855
            self.log.debug(m)
1887
 
        else:
1888
 
            self.log.debug('queueing')
1889
 
            self._queue.queue(self)
 
1856
            return False
 
1857
 
 
1858
        return True
1890
1859
 
1891
1860
    def handle_success(self, request):
1892
1861
        """It worked! Push the success event."""
1906
1875
 
1907
1876
    def make_logger(self):
1908
1877
        """Create a logger for this object."""
1909
 
        return mklog(logger, 'GetDeltaFromScratch', self.volume_id, None)
 
1878
        self.log = mklog(logger, 'GetDeltaFromScratch', self.volume_id, None)
1910
1879
 
1911
1880
 
1912
1881
 
2050
2019
    """Get the contents of a file."""
2051
2020
 
2052
2021
    __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
2053
 
                 'fileobj', 'gunzip', 'path', 'download_req',
2054
 
                 'deflated_size', 'n_bytes_read_last')
 
2022
                 'fileobj', 'gunzip', 'path', 'download_req', 'tx_semaphore',
 
2023
                 'deflated_size', 'n_bytes_read_last', 'n_bytes_read')
2055
2024
    logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory')
2056
2025
    possible_markers = 'node_id',
2057
2026
 
2063
2032
        self.server_hash = server_hash
2064
2033
        self.fileobj_factory = fileobj_factory
2065
2034
        self.fileobj = None
2066
 
        self.gunzip = zlib.decompressobj()
 
2035
        self.gunzip = None
 
2036
        self.action_queue.cancel_download(self.share_id, self.node_id)
 
2037
        self.path = path
2067
2038
        self.download_req = None
2068
 
        self.action_queue.cancel_download(self.share_id, self.node_id)
2069
 
        self.path = path
 
2039
        self.n_bytes_read = 0
 
2040
        self.n_bytes_read_last = 0
 
2041
        self.deflated_size = None
 
2042
        self.tx_semaphore = None
2070
2043
 
2071
2044
    @property
2072
2045
    def uniqueness(self):
2073
2046
        """Info for uniqueness."""
2074
2047
        return (self.__class__.__name__, self.share_id, self.node_id)
2075
2048
 
2076
 
    def queue(self):
 
2049
    def _should_be_queued(self):
2077
2050
        """Queue but keeping uniqueness."""
2078
 
        # first start part to get the logger
2079
 
        self.pre_queue_setup()
2080
 
 
2081
2051
        for uniq in [(Upload.__name__, self.share_id, self.node_id),
2082
2052
                     (Download.__name__, self.share_id, self.node_id)]:
2083
2053
            if uniq in self._queue.hashed_waiting:
2085
2055
                self._queue.waiting.remove(previous_command)
2086
2056
                m = "removing previous command because uniqueness: %s"
2087
2057
                logger.debug(m, previous_command)
2088
 
 
2089
 
        self.log.debug('queueing')
2090
 
        self._queue.queue(self)
 
2058
        return True
2091
2059
 
2092
2060
    def _acquire_pathlock(self):
2093
2061
        """Acquire pathlock."""
2103
2071
    @defer.inlineCallbacks
2104
2072
    def _start(self):
2105
2073
        """Just acquire the transfers semaphore."""
2106
 
        yield self._queue.transfers_semaphore.acquire()
 
2074
        self.tx_semaphore = yield self._queue.transfers_semaphore.acquire()
2107
2075
        self.log.debug('semaphore acquired')
2108
2076
 
 
2077
    def finish(self):
 
2078
        """Release the semaphore if already acquired."""
 
2079
        if self.tx_semaphore is not None:
 
2080
            self.tx_semaphore = self.tx_semaphore.release()
 
2081
            self.log.debug('semaphore released')
 
2082
        super(Download, self).finish()
 
2083
 
2109
2084
    def _run(self):
2110
2085
        """Do the actual running."""
2111
 
        if self.cancelled:
2112
 
            return defer.fail(RequestCleanedUp('CANCELLED'))
 
2086
        # start or reset the file object, and get a new decompressor
2113
2087
        if self.fileobj is None:
2114
2088
            try:
2115
2089
                self.fileobj = self.fileobj_factory()
2119
2093
                                       ' (file went away?)'
2120
2094
                                       ' so aborting the download.')
2121
2095
                return defer.fail(Failure(msg))
2122
 
        downloading = self.action_queue.downloading
2123
 
        if (self.share_id, self.node_id) not in downloading:
2124
 
            downloading[self.share_id, self.node_id] = {'n_bytes_read': 0,
2125
 
                                                        'command': self}
2126
 
        assert downloading[self.share_id, self.node_id]['command'] is self
2127
 
        offset = downloading[self.share_id, self.node_id]['n_bytes_read']
2128
 
        self.progress_start(offset)
 
2096
        else:
 
2097
            self.fileobj.seek(0, 0)
 
2098
            self.fileobj.truncate(0)
 
2099
            self.n_bytes_read = 0
 
2100
            self.n_bytes_read_last = 0
 
2101
        self.gunzip = zlib.decompressobj()
 
2102
 
2129
2103
        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
2130
2104
                                           share_id=self.share_id,
2131
2105
                                           node_id=self.node_id,
2133
2107
 
2134
2108
        req = self.action_queue.client.get_content_request(
2135
2109
            self.share_id, self.node_id, self.server_hash,
2136
 
            offset=offset,
2137
 
            callback=self.cb, node_attr_callback=self.nacb)
 
2110
            offset=self.n_bytes_read,
 
2111
            callback=self.downloaded_cb, node_attr_callback=self.node_attr_cb)
2138
2112
        self.download_req = req
2139
 
        downloading[self.share_id, self.node_id]['req'] = req
2140
 
        d = req.deferred
2141
 
        d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
2142
 
                  if self.cancelled else x)
2143
 
        d.addCallback(passit(
2144
 
                lambda _: downloading.pop((self.share_id, self.node_id))))
2145
 
        return d
 
2113
        return req.deferred
2146
2114
 
2147
2115
    def handle_success(self, _):
2148
2116
        """It worked! Push the event."""
2149
 
        self._queue.transfers_semaphore.release()
2150
 
        self.log.debug('semaphore released')
2151
2117
        self.sync()
2152
2118
        # send a COMMIT, the Nanny will issue the FINISHED if it's ok
2153
2119
        self.action_queue.event_queue.push('AQ_DOWNLOAD_COMMIT',
2157
2123
 
2158
2124
    def handle_failure(self, failure):
2159
2125
        """It didn't work! Push the event."""
2160
 
        self._queue.transfers_semaphore.release()
2161
 
        self.log.debug('semaphore released')
2162
 
 
2163
 
        downloading = self.action_queue.downloading
2164
 
        if (self.share_id, self.node_id) in downloading and \
2165
 
            downloading[self.share_id, self.node_id]['command'] is self:
2166
 
            del downloading[self.share_id, self.node_id]
2167
 
        self.reset_fileobj()
2168
 
        if failure.check(protocol_errors.RequestCancelledError,
2169
 
                         RequestCleanedUp):
 
2126
        if failure.check(protocol_errors.RequestCancelledError):
2170
2127
            self.action_queue.event_queue.push('AQ_DOWNLOAD_CANCELLED',
2171
2128
                                               share_id=self.share_id,
2172
2129
                                               node_id=self.node_id,
2182
2139
                                               node_id=self.node_id,
2183
2140
                                               server_hash=self.server_hash)
2184
2141
 
2185
 
    def reset_fileobj(self):
2186
 
        """Rewind and empty the file.
2187
 
 
2188
 
        Usefult for get it ready to try again if necessary.
2189
 
        """
2190
 
        self.log.debug('reset fileobj')
2191
 
        if self.fileobj is not None:
2192
 
            self.fileobj.seek(0, 0)
2193
 
            self.fileobj.truncate(0)
2194
 
 
2195
 
    def cb(self, bytes):
 
2142
    def downloaded_cb(self, bytes):
2196
2143
        """A streaming decompressor."""
2197
 
        dloading = self.action_queue.downloading[self.share_id,
2198
 
                                                 self.node_id]
2199
 
        dloading['n_bytes_read'] += len(bytes)
 
2144
        self.n_bytes_read += len(bytes)
2200
2145
        self.fileobj.write(self.gunzip.decompress(bytes))
2201
2146
        self.fileobj.flush()     # not strictly necessary but nice to
2202
2147
                                 # see the downloaded size
2203
 
        self.progress_hook(dloading['n_bytes_read'])
2204
 
 
2205
 
    def progress_start(self, n_bytes_read_already):
2206
 
        """Start tracking progress.
2207
 
 
2208
 
        Consider that n_bytes_read_already have been already read.
2209
 
        """
2210
 
        self.n_bytes_read_last = n_bytes_read_already
2211
 
 
2212
 
    def progress_hook(self, n_bytes_read):
2213
 
        """Convert downloading progress into an event."""
2214
 
        n_bytes_read_last = self.n_bytes_read_last
2215
 
        self.n_bytes_read_last = n_bytes_read
2216
 
        # produce an event only if there has been a threshold-sized progress
2217
 
        if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
2218
 
            return
2219
 
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
2220
 
                                      share_id=self.share_id,
2221
 
                                      node_id=self.node_id,
2222
 
                                      n_bytes_read=n_bytes_read,
2223
 
                                      deflated_size=self.deflated_size)
2224
 
 
2225
 
    def nacb(self, **kwargs):
2226
 
        """Set the node attrs in the 'currently downloading' dict."""
 
2148
        self.progress_hook()
 
2149
 
 
2150
    def progress_hook(self):
 
2151
        """Send event if accumulated enough progress."""
 
2152
        read_since_last = self.n_bytes_read - self.n_bytes_read_last
 
2153
        if read_since_last >= TRANSFER_PROGRESS_THRESHOLD:
 
2154
            event_data = dict(share_id=self.share_id, node_id=self.node_id,
 
2155
                              n_bytes_read=self.n_bytes_read,
 
2156
                              deflated_size=self.deflated_size)
 
2157
            self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
 
2158
                                               **event_data)
 
2159
            self.n_bytes_read_last = self.n_bytes_read
 
2160
 
 
2161
    def node_attr_cb(self, **kwargs):
 
2162
        """Update command information with node attributes."""
2227
2163
        self.deflated_size = kwargs['deflated_size']
2228
 
        self.action_queue.downloading[self.share_id,
2229
 
                                      self.node_id].update(kwargs)
2230
2164
 
2231
2165
    def sync(self):
2232
2166
        """Flush the buffers and sync them to disk if possible."""
2246
2180
 
2247
2181
    __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2248
2182
                 'size', 'fileobj_factory', 'tempfile_factory',
2249
 
                 'deflated_size', 'tempfile', 'upload_req',
2250
 
                 'n_bytes_written_last', 'path')
 
2183
                 'deflated_size', 'tempfile', 'upload_req', 'tx_semaphore',
 
2184
                 'n_bytes_written_last', 'path', 'n_bytes_written')
2251
2185
 
2252
2186
    logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2253
2187
                    'size', 'fileobj_factory')
2266
2200
        self.size = size
2267
2201
        self.fileobj_factory = fileobj_factory
2268
2202
        self.tempfile_factory = tempfile_factory
2269
 
        self.deflated_size = None
2270
2203
        self.tempfile = None
 
2204
        self.action_queue.cancel_upload(self.share_id, self.node_id)
 
2205
        self.path = path
2271
2206
        self.upload_req = None
2272
 
        if (self.share_id, self.node_id) in self.action_queue.uploading:
2273
 
            self.action_queue.cancel_upload(self.share_id, self.node_id)
2274
 
        self.n_bytes_written_last = None # set by _run
2275
 
        self.path = path
 
2207
        self.n_bytes_written_last = 0
 
2208
        self.n_bytes_written = 0
 
2209
        self.deflated_size = None
 
2210
        self.tx_semaphore = None
2276
2211
 
2277
2212
    @property
2278
2213
    def is_runnable(self):
2288
2223
            return self.action_queue.have_sufficient_space_for_upload(
2289
2224
                                                    self.share_id, self.size)
2290
2225
 
2291
 
    def queue(self):
 
2226
    def _should_be_queued(self):
2292
2227
        """Queue but keeping uniqueness."""
2293
 
        # first start part to get the logger
2294
 
        self.pre_queue_setup()
2295
 
 
2296
2228
        for uniq in [(Upload.__name__, self.share_id, self.node_id),
2297
2229
                     (Download.__name__, self.share_id, self.node_id)]:
2298
2230
            if uniq in self._queue.hashed_waiting:
2300
2232
                self._queue.waiting.remove(previous_command)
2301
2233
                m = "removing previous command because uniqueness: %s"
2302
2234
                logger.debug(m, previous_command)
2303
 
 
2304
 
        self.log.debug('queueing')
2305
 
        self._queue.queue(self)
 
2235
        return True
2306
2236
 
2307
2237
    @property
2308
2238
    def uniqueness(self):
2330
2260
    @defer.inlineCallbacks
2331
2261
    def _start(self):
2332
2262
        """Do the specialized pre-run setup."""
2333
 
        yield self._queue.transfers_semaphore.acquire()
 
2263
        self.tx_semaphore = yield self._queue.transfers_semaphore.acquire()
2334
2264
        self.log.debug('semaphore acquired')
2335
2265
 
2336
 
        upload = {"hash": self.hash, "req": self}
2337
 
        self.action_queue.uploading[self.share_id, self.node_id] = upload
 
2266
        yield self.action_queue.zip_queue.zip(self)
2338
2267
 
2339
 
        try:
2340
 
            yield self.action_queue.zip_queue.zip(self)
2341
 
        finally:
2342
 
            if self.cancelled:
2343
 
                raise RequestCleanedUp('CANCELLED')
 
2268
    def finish(self):
 
2269
        """Release the semaphore if already acquired."""
 
2270
        if self.tx_semaphore is not None:
 
2271
            self.tx_semaphore = self.tx_semaphore.release()
 
2272
            self.log.debug('semaphore released')
 
2273
        super(Upload, self).finish()
2344
2274
 
2345
2275
    def _run(self):
2346
2276
        """Do the actual running."""
2347
 
        if self.cancelled:
2348
 
            return defer.fail(RequestCleanedUp('CANCELLED'))
2349
 
 
2350
 
        # need to check if client is still there, as _start() may delay a lot
2351
 
        # the execution of this
2352
 
        if self.action_queue.client is None:
2353
 
            return defer.fail(ClientNoLongerThere('Detected in _run'))
2354
 
 
2355
 
        uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
2356
 
                     "req": self}
2357
 
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
2358
 
 
2359
2277
        self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
2360
2278
                                           share_id=self.share_id,
2361
2279
                                           node_id=self.node_id,
2364
2282
        if getattr(self.tempfile, 'name', None) is not None:
2365
2283
            self.tempfile = open(self.tempfile.name)
2366
2284
        self.n_bytes_written_last = 0
2367
 
        f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
 
2285
        f = UploadProgressWrapper(self.tempfile, self)
2368
2286
        req = self.action_queue.client.put_content_request(
2369
2287
            self.share_id, self.node_id, self.previous_hash, self.hash,
2370
2288
            self.crc32, self.size, self.deflated_size, f)
2371
2289
        self.upload_req = req
2372
2290
        d = req.deferred
2373
 
        d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
2374
 
                  if self.cancelled else x)
2375
 
        d.addBoth(passit(lambda _: self.action_queue.uploading.pop(
2376
 
                                        (self.share_id, self.node_id), None)))
2377
2291
        d.addBoth(passit(lambda _: self.tempfile.close()))
2378
2292
        return d
2379
2293
 
2380
 
    def progress_hook(self, n_bytes_written):
2381
 
        """Convert uploading progress into an event."""
2382
 
        n_bytes_written_last = self.n_bytes_written_last
2383
 
        self.n_bytes_written_last = n_bytes_written
2384
 
        # produce an event only if there has been a threshold-sized progress
2385
 
        if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
2386
 
            return
2387
 
        self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
2388
 
                                      share_id=self.share_id,
2389
 
                                      node_id=self.node_id,
2390
 
                                      n_bytes_written=n_bytes_written,
2391
 
                                      deflated_size=self.deflated_size)
 
2294
    def progress_hook(self):
 
2295
        """Send event if accumulated enough progress."""
 
2296
        written_since_last = self.n_bytes_written - self.n_bytes_written_last
 
2297
        if  written_since_last >= TRANSFER_PROGRESS_THRESHOLD:
 
2298
            event_data = dict(share_id=self.share_id, node_id=self.node_id,
 
2299
                              n_bytes_written=self.n_bytes_written,
 
2300
                              deflated_size=self.deflated_size)
 
2301
            self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
 
2302
                                               **event_data)
 
2303
            self.n_bytes_written_last = self.n_bytes_written
2392
2304
 
2393
2305
    def handle_success(self, request):
2394
2306
        """It worked! Push the event."""
2395
 
        self._queue.transfers_semaphore.release()
2396
 
        self.log.debug('semaphore released')
2397
 
 
2398
2307
        # remove the temporary file if have one
2399
2308
        if getattr(self.tempfile, 'name', None) is not None:
2400
2309
            os.unlink(self.tempfile.name)
2406
2315
 
2407
2316
    def handle_failure(self, failure):
2408
2317
        """It didn't work! Push the event."""
2409
 
        self._queue.transfers_semaphore.release()
2410
 
        self.log.debug('semaphore released')
2411
 
 
2412
2318
        if getattr(self.tempfile, 'name', None) is not None:
2413
2319
            os.unlink(self.tempfile.name)
2414
2320