540
522
class UploadProgressWrapper(object):
541
523
"""A wrapper around the file-like object used for Uploads.
543
It can be used to keep track of the number of bytes that have been
544
written to the store and invokes a hook on progress.
525
It adjusts automatically the transfer variables in the command.
546
fd is the file-like object used for uploads. data_dict is the
547
entry in the uploading dictionary.
527
fd is the file-like object used for uploads.
550
__slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
530
__slots__ = ('fd', 'command')
552
def __init__(self, fd, data_dict, progress_hook):
532
def __init__(self, fd, command):
554
self.data_dict = data_dict
555
self.n_bytes_read = 0
556
self.progress_hook = progress_hook
534
self.command = command
558
536
def read(self, size=None):
559
537
"""Read at most size bytes from the file-like object.
561
Keep track of the number of bytes that have been read, and the
562
number of bytes that have been written (assumed to be equal to
563
the number of bytes read on the previews call to read). The
564
latter is done directly in the data_dict.
539
Keep track of the number of bytes that have been read.
566
self.data_dict['n_bytes_written'] = self.n_bytes_read
567
self.progress_hook(self.n_bytes_read)
569
541
data = self.fd.read(size)
570
self.n_bytes_read += len(data)
542
self.command.n_bytes_written += len(data)
543
self.command.progress_hook()
573
546
def __getattr__(self, attr):
948
917
def make_file(self, share_id, parent_id, name, marker, path):
949
918
"""See .interfaces.IMetaQueue."""
950
919
return MakeFile(self.queue, share_id, parent_id,
951
name, marker, path).queue()
920
name, marker, path).go()
953
922
def make_dir(self, share_id, parent_id, name, marker, path):
954
923
"""See .interfaces.IMetaQueue."""
955
924
return MakeDir(self.queue, share_id, parent_id,
956
name, marker, path).queue()
925
name, marker, path).go()
958
927
def move(self, share_id, node_id, old_parent_id, new_parent_id,
959
928
new_name, path_from, path_to):
960
929
"""See .interfaces.IMetaQueue."""
961
930
return Move(self.queue, share_id, node_id, old_parent_id,
962
new_parent_id, new_name, path_from, path_to).queue()
931
new_parent_id, new_name, path_from, path_to).go()
964
933
def unlink(self, share_id, parent_id, node_id, path):
965
934
"""See .interfaces.IMetaQueue."""
966
return Unlink(self.queue, share_id, parent_id, node_id, path).queue()
935
return Unlink(self.queue, share_id, parent_id, node_id, path).go()
968
937
def inquire_free_space(self, share_id):
969
938
"""See .interfaces.IMetaQueue."""
970
return FreeSpaceInquiry(self.queue, share_id).queue()
939
return FreeSpaceInquiry(self.queue, share_id).go()
972
941
def inquire_account_info(self):
973
942
"""See .interfaces.IMetaQueue."""
974
return AccountInquiry(self.queue).queue()
943
return AccountInquiry(self.queue).go()
976
945
def list_shares(self):
977
946
"""See .interfaces.IMetaQueue."""
978
return ListShares(self.queue).queue()
947
return ListShares(self.queue).go()
980
949
def answer_share(self, share_id, answer):
981
950
"""See .interfaces.IMetaQueue."""
982
return AnswerShare(self.queue, share_id, answer).queue()
951
return AnswerShare(self.queue, share_id, answer).go()
984
953
def create_share(self, node_id, share_to, name, access_level,
986
955
"""See .interfaces.IMetaQueue."""
987
956
return CreateShare(self.queue, node_id, share_to, name,
988
access_level, marker, path).queue()
957
access_level, marker, path).go()
990
959
def delete_share(self, share_id):
991
960
"""See .interfaces.IMetaQueue."""
992
return DeleteShare(self.queue, share_id).queue()
961
return DeleteShare(self.queue, share_id).go()
994
963
def create_udf(self, path, name, marker):
995
964
"""See .interfaces.IMetaQueue."""
996
return CreateUDF(self.queue, path, name, marker).queue()
965
return CreateUDF(self.queue, path, name, marker).go()
998
967
def list_volumes(self):
999
968
"""See .interfaces.IMetaQueue."""
1000
return ListVolumes(self.queue).queue()
969
return ListVolumes(self.queue).go()
1002
971
def delete_volume(self, volume_id, path):
1003
972
"""See .interfaces.IMetaQueue."""
1004
return DeleteVolume(self.queue, volume_id, path).queue()
973
return DeleteVolume(self.queue, volume_id, path).go()
1006
975
def change_public_access(self, share_id, node_id, is_public):
1007
976
"""See .interfaces.IMetaQueue."""
1008
977
return ChangePublicAccess(self.queue, share_id,
1009
node_id, is_public).queue()
978
node_id, is_public).go()
1011
980
def get_public_files(self):
1012
981
"""See .interfaces.IMetaQueue."""
1013
return GetPublicFiles(self.queue).queue()
982
return GetPublicFiles(self.queue).go()
1015
984
def download(self, share_id, node_id, server_hash, path, fileobj_factory):
1016
985
"""See .interfaces.IContentQueue.download."""
1017
986
return Download(self.queue, share_id, node_id, server_hash,
1018
path, fileobj_factory).queue()
987
path, fileobj_factory).go()
1020
989
def upload(self, share_id, node_id, previous_hash, hash, crc32,
1021
990
size, path, fileobj_factory, tempfile_factory=None):
1022
991
"""See .interfaces.IContentQueue."""
1023
992
return Upload(self.queue, share_id, node_id, previous_hash,
1024
993
hash, crc32, size, path,
1025
fileobj_factory, tempfile_factory).queue()
994
fileobj_factory, tempfile_factory).go()
1027
996
def _cancel_op(self, share_id, node_id, cmdclass):
1028
997
"""Generalized form of cancel_upload and cancel_download."""
1089
1057
is_runnable = True
1090
1058
uniqueness = None
1092
__slots__ = ('_queue', 'start_done', 'running', 'pathlock_release', 'log',
1093
'markers_resolved_deferred', 'action_queue', 'cancelled')
1060
__slots__ = ('_queue', 'running', 'pathlock_release', 'log',
1061
'markers_resolved_deferred', 'action_queue', 'cancelled',
1062
'wait_for_queue', 'wait_for_conditions')
1095
1064
def __init__(self, request_queue):
1096
1065
"""Initialize a command instance."""
1097
1066
self._queue = request_queue
1098
1067
self.action_queue = request_queue.action_queue
1099
self.start_done = False
1100
1068
self.running = False
1101
1069
self.log = None
1102
1070
self.markers_resolved_deferred = defer.Deferred()
1103
1071
self.pathlock_release = None
1104
1072
self.cancelled = False
1074
self.wait_for_queue = None
1075
self.wait_for_conditions = None
1106
1077
def to_dict(self):
1107
1078
"""Dump logged attributes to a dict."""
1108
1079
return dict((n, getattr(self, n, None)) for n in self.logged_attrs)
1160
1131
"""The command ended."""
1161
1132
self.running = False
1162
1133
self._queue.unqueue(self)
1163
if self.pathlock_release is not None:
1164
self.pathlock_release()
1166
def end_callback(self, arg):
1168
if not self.running:
1169
self.log.debug('not running, so no success')
1172
self.log.debug('success')
1173
self.handle_success(arg)
1176
def end_errback(self, failure):
1179
self.log.debug('not errbacking because cancelled')
1182
error_message = failure.getErrorMessage()
1183
if failure.check(*self.suppressed_error_messages):
1184
self.log.warn('failure: %s', error_message)
1186
self.log.error('failure: %s', error_message)
1187
self.log.debug('traceback follows:\n\n' + failure.getTraceback())
1190
if failure.check(*self.retryable_errors):
1191
if self._queue.active:
1192
reactor.callLater(0.1, self.run)
1194
self.log.debug('not retrying because queue not active')
1196
self.handle_failure(failure)
1199
def pre_queue_setup(self):
1200
"""Set up before the command gets really queued."""
1201
self.log = self.make_logger()
1205
"""Queue the command."""
1206
self.pre_queue_setup()
1207
self.log.debug('queueing')
1208
self._queue.queue(self)
1135
def _should_be_queued(self):
1136
"""Return True if the command should be queued."""
1210
1139
def cleanup(self):
1211
1140
"""Do whatever is needed to clean up from a failure.
1213
1142
For example, stop producers and others that aren't cleaned up
1214
appropriately on their own.
1143
appropriately on their own. Note that this may be called more
1216
1146
self.log.debug('cleanup')
1219
1149
"""Do the specialized pre-run setup."""
1220
1150
return defer.succeed(None)
1222
@defer.inlineCallbacks
1223
def locked_run(self):
1225
self.pathlock_release = yield self._acquire_pathlock()
1227
self.pathlock_release()
1231
if self._queue.active:
1234
1152
def resume(self):
1235
"""Continue running if was running before."""
1236
if self.running and self._queue.active:
1153
"""Unlock the command because the queue is back alive."""
1154
if self.wait_for_queue is not None:
1155
self.wait_for_queue.callback(True)
1156
self.wait_for_queue = None
1239
1158
def check_conditions(self):
1240
1159
"""If conditions are ok, run the command again."""
1241
if not self.running and self._queue.active:
1160
if self.is_runnable and self.wait_for_conditions is not None:
1161
self.wait_for_conditions.callback(True)
1162
self.wait_for_conditions = None
1164
@defer.inlineCallbacks
1166
"""Execute all the steps for a command."""
1170
# queue if should, otherwise all is done
1171
if not self._should_be_queued():
1174
self.log.debug('queueing')
1175
self._queue.queue(self)
1177
# set up basic marker failure handler and demark
1179
self.log.debug("failing because marker failed: %s", failure)
1180
self.cancelled = True
1182
self.handle_failure(failure)
1184
self.markers_resolved_deferred.addErrback(f)
1187
self.pathlock_release = yield self._acquire_pathlock()
1189
if self.pathlock_release is not None:
1190
self.log.debug('releasing the pathlock because of cancelled')
1191
self.pathlock_release()
1197
if self.pathlock_release is not None:
1198
self.pathlock_release()
1245
1200
@defer.inlineCallbacks
1248
if not self.is_runnable:
1249
self.log.debug('not running because of conditions')
1250
self.running = False
1255
self.log.debug('retrying')
1202
"""Run the command."""
1203
self.log.debug('starting')
1208
yield self.markers_resolved_deferred
1209
self.log.debug('cancelled before trying to run')
1212
# if queue not active, wait for it and check again
1213
if not self._queue.active:
1214
self.log.debug('not running because of inactive queue')
1215
self.wait_for_queue = defer.Deferred()
1216
yield self.wait_for_queue
1219
if not self.is_runnable:
1220
self.log.debug('not running because of conditions')
1221
self.wait_for_conditions = defer.Deferred()
1222
yield self.wait_for_conditions
1226
yield self.markers_resolved_deferred
1227
self.log.debug('running')
1229
result = yield self._run()
1231
except Exception, exc:
1233
self.log.debug('cancelled while running')
1235
if exc.__class__ in self.suppressed_error_messages:
1236
self.log.warn('failure: %s', exc)
1238
self.log.exception('failure: %s (traceback follows)', exc)
1241
if exc.__class__ in self.retryable_errors:
1242
self.log.debug('retrying')
1245
self.handle_failure(Failure(exc))
1257
self.log.debug('starting')
1259
self.start_done = True
1260
yield self.markers_resolved_deferred
1261
except Exception, e:
1262
yield self.end_errback(Failure(e))
1264
yield self._run_command()
1248
self.log.debug('cancelled while running')
1250
self.log.debug('success')
1251
self.handle_success(result)
1266
def _run_command(self):
1267
"""Really execute the command."""
1268
self.log.debug('running')
1270
d.addCallbacks(self.end_callback, self.end_errback)
1253
# finish the command
1273
1257
def cancel(self):
1274
1258
"""Cancel the command."""
2182
2139
node_id=self.node_id,
2183
2140
server_hash=self.server_hash)
2185
def reset_fileobj(self):
2186
"""Rewind and empty the file.
2188
Usefult for get it ready to try again if necessary.
2190
self.log.debug('reset fileobj')
2191
if self.fileobj is not None:
2192
self.fileobj.seek(0, 0)
2193
self.fileobj.truncate(0)
2195
def cb(self, bytes):
2142
def downloaded_cb(self, bytes):
2196
2143
"""A streaming decompressor."""
2197
dloading = self.action_queue.downloading[self.share_id,
2199
dloading['n_bytes_read'] += len(bytes)
2144
self.n_bytes_read += len(bytes)
2200
2145
self.fileobj.write(self.gunzip.decompress(bytes))
2201
2146
self.fileobj.flush() # not strictly necessary but nice to
2202
2147
# see the downloaded size
2203
self.progress_hook(dloading['n_bytes_read'])
2205
def progress_start(self, n_bytes_read_already):
2206
"""Start tracking progress.
2208
Consider that n_bytes_read_already have been already read.
2210
self.n_bytes_read_last = n_bytes_read_already
2212
def progress_hook(self, n_bytes_read):
2213
"""Convert downloading progress into an event."""
2214
n_bytes_read_last = self.n_bytes_read_last
2215
self.n_bytes_read_last = n_bytes_read
2216
# produce an event only if there has been a threshold-sized progress
2217
if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
2219
self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
2220
share_id=self.share_id,
2221
node_id=self.node_id,
2222
n_bytes_read=n_bytes_read,
2223
deflated_size=self.deflated_size)
2225
def nacb(self, **kwargs):
2226
"""Set the node attrs in the 'currently downloading' dict."""
2148
self.progress_hook()
2150
def progress_hook(self):
2151
"""Send event if accumulated enough progress."""
2152
read_since_last = self.n_bytes_read - self.n_bytes_read_last
2153
if read_since_last >= TRANSFER_PROGRESS_THRESHOLD:
2154
event_data = dict(share_id=self.share_id, node_id=self.node_id,
2155
n_bytes_read=self.n_bytes_read,
2156
deflated_size=self.deflated_size)
2157
self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
2159
self.n_bytes_read_last = self.n_bytes_read
2161
def node_attr_cb(self, **kwargs):
2162
"""Update command information with node attributes."""
2227
2163
self.deflated_size = kwargs['deflated_size']
2228
self.action_queue.downloading[self.share_id,
2229
self.node_id].update(kwargs)
2231
2165
def sync(self):
2232
2166
"""Flush the buffers and sync them to disk if possible."""
2330
2260
@defer.inlineCallbacks
2331
2261
def _start(self):
2332
2262
"""Do the specialized pre-run setup."""
2333
yield self._queue.transfers_semaphore.acquire()
2263
self.tx_semaphore = yield self._queue.transfers_semaphore.acquire()
2334
2264
self.log.debug('semaphore acquired')
2336
upload = {"hash": self.hash, "req": self}
2337
self.action_queue.uploading[self.share_id, self.node_id] = upload
2266
yield self.action_queue.zip_queue.zip(self)
2340
yield self.action_queue.zip_queue.zip(self)
2343
raise RequestCleanedUp('CANCELLED')
2269
"""Release the semaphore if already acquired."""
2270
if self.tx_semaphore is not None:
2271
self.tx_semaphore = self.tx_semaphore.release()
2272
self.log.debug('semaphore released')
2273
super(Upload, self).finish()
2345
2275
def _run(self):
2346
2276
"""Do the actual running."""
2348
return defer.fail(RequestCleanedUp('CANCELLED'))
2350
# need to check if client is still there, as _start() may delay a lot
2351
# the execution of this
2352
if self.action_queue.client is None:
2353
return defer.fail(ClientNoLongerThere('Detected in _run'))
2355
uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
2357
self.action_queue.uploading[self.share_id, self.node_id] = uploading
2359
2277
self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
2360
2278
share_id=self.share_id,
2361
2279
node_id=self.node_id,
2364
2282
if getattr(self.tempfile, 'name', None) is not None:
2365
2283
self.tempfile = open(self.tempfile.name)
2366
2284
self.n_bytes_written_last = 0
2367
f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
2285
f = UploadProgressWrapper(self.tempfile, self)
2368
2286
req = self.action_queue.client.put_content_request(
2369
2287
self.share_id, self.node_id, self.previous_hash, self.hash,
2370
2288
self.crc32, self.size, self.deflated_size, f)
2371
2289
self.upload_req = req
2372
2290
d = req.deferred
2373
d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
2374
if self.cancelled else x)
2375
d.addBoth(passit(lambda _: self.action_queue.uploading.pop(
2376
(self.share_id, self.node_id), None)))
2377
2291
d.addBoth(passit(lambda _: self.tempfile.close()))
2380
def progress_hook(self, n_bytes_written):
2381
"""Convert uploading progress into an event."""
2382
n_bytes_written_last = self.n_bytes_written_last
2383
self.n_bytes_written_last = n_bytes_written
2384
# produce an event only if there has been a threshold-sized progress
2385
if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
2387
self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
2388
share_id=self.share_id,
2389
node_id=self.node_id,
2390
n_bytes_written=n_bytes_written,
2391
deflated_size=self.deflated_size)
2294
def progress_hook(self):
2295
"""Send event if accumulated enough progress."""
2296
written_since_last = self.n_bytes_written - self.n_bytes_written_last
2297
if written_since_last >= TRANSFER_PROGRESS_THRESHOLD:
2298
event_data = dict(share_id=self.share_id, node_id=self.node_id,
2299
n_bytes_written=self.n_bytes_written,
2300
deflated_size=self.deflated_size)
2301
self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
2303
self.n_bytes_written_last = self.n_bytes_written
2393
2305
def handle_success(self, request):
2394
2306
"""It worked! Push the event."""
2395
self._queue.transfers_semaphore.release()
2396
self.log.debug('semaphore released')
2398
2307
# remove the temporary file if have one
2399
2308
if getattr(self.tempfile, 'name', None) is not None:
2400
2309
os.unlink(self.tempfile.name)