66
66
# Regular expression to validate an e-mail address
67
67
EREGEX = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
69
# progress threshold to emit a download/upload progress event: 64Kb
70
TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024
71
73
"""Pass the value on for the next deferred, while calling func with it."""
598
600
"""A wrapper around the file-like object used for Uploads.
600
602
It can be used to keep track of the number of bytes that have been
601
written to the store.
603
written to the store and invokes a hook on progress.
605
__slots__ = ('fd', 'data_dict', 'n_bytes_read')
607
__slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
607
def __init__(self, fd, data_dict):
609
def __init__(self, fd, data_dict, progress_hook):
609
611
fd is the file-like object used for uploads. data_dict is the
610
612
entry in the uploading dictionary.
623
626
latter is done directly in the data_dict.
625
628
self.data_dict['n_bytes_written'] = self.n_bytes_read
629
self.progress_hook(self.n_bytes_read)
626
631
data = self.fd.read(size)
627
632
self.n_bytes_read += len(data)
915
921
client, self.client)
917
923
except request_error, failure:
918
925
self.event_queue.push(event_error, error=str(failure))
919
926
except protocol_errors.AuthenticationRequiredError, failure:
920
927
# we need to separate this case from the rest because an
921
928
# AuthenticationRequiredError is an StorageRequestError,
922
929
# and we treat it differently.
923
self.event_queue.push('SYS_UNKNOWN_ERROR')
930
event = 'SYS_UNKNOWN_ERROR'
931
self.event_queue.push(event)
924
932
except protocol_errors.StorageRequestError, failure:
925
self.event_queue.push('SYS_SERVER_ERROR', error=str(failure))
933
event = 'SYS_SERVER_ERROR'
934
self.event_queue.push(event, error=str(failure))
926
935
except Exception, failure:
927
self.event_queue.push('SYS_UNKNOWN_ERROR')
936
event = 'SYS_UNKNOWN_ERROR'
937
self.event_queue.push(event)
929
939
logger.info("The request '%s' finished OK.", req_name)
930
940
if event_ok is not None:
931
941
self.event_queue.push(event_ok)
933
943
if failure is not None:
934
logger.error("The request '%s' failed with the error:\n\n%s\n",
944
logger.info("The request '%s' failed with the error: %s "
945
"and was handled with the event: %s",
946
req_name, failure, event)
936
947
if fire_deferred:
937
948
# it looks like we won't be authenticating, so hook up the
938
949
# for-testing deferred now
2187
2198
__slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
2188
2199
'fileobj', 'gunzip', 'marker_maybe', 'cancelled',
2200
'download_req', 'deflated_size')
2190
2201
logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory')
2192
2203
def __init__(self, request_queue, share_id, node_id, server_hash,
2237
2248
'command': self}
2238
2249
assert downloading[self.share_id, self.node_id]['command'] is self
2239
2250
offset = downloading[self.share_id, self.node_id]['n_bytes_read']
2251
self.progress_start(offset)
2241
2252
self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
2242
2253
share_id=self.share_id,
2243
2254
node_id=self.node_id,
2312
2323
self.fileobj.write(self.gunzip.decompress(bytes))
2313
2324
self.fileobj.flush() # not strictly necessary but nice to
2314
2325
# see the downloaded size
2326
self.progress_hook(dloading['n_bytes_read'])
2328
def progress_start(self, n_bytes_read_already):
2329
"""Hook to start tracking progress."""
2332
def progress_hook(self, n_bytes_read):
2333
"""Hook to track progress."""
2316
2336
def nacb(self, **kwargs):
2317
2337
"""Set the node attrs in the 'currently downloading' dict."""
2338
self.deflated_size = kwargs['deflated_size']
2318
2339
self.action_queue.downloading[self.share_id,
2319
2340
self.node_id].update(kwargs)
2347
2368
class Download(GetContentMixin, ActionQueueCommand):
2348
2369
"""Get the contents of a file."""
2371
__slots__ = ('n_bytes_read_last')
2375
def progress_start(self, n_bytes_read_already):
2376
"""Start tracking progress.
2378
Consider that n_bytes_read_already have been already read.
2380
self.n_bytes_read_last = n_bytes_read_already
2382
def progress_hook(self, n_bytes_read):
2383
"""Convert downloading progress into an event."""
2384
n_bytes_read_last = self.n_bytes_read_last
2385
self.n_bytes_read_last = n_bytes_read
2386
# produce an event only if there has been a threshold-sized progress
2387
if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
2389
self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
2390
share_id=self.share_id,
2391
node_id=self.node_id,
2392
n_bytes_read=n_bytes_read,
2393
deflated_size=self.deflated_size)
2355
2395
class Upload(ActionQueueCommand):
2356
2396
"""Upload stuff to a file."""
2358
2398
__slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2359
2399
'size', 'fileobj_factory', 'tempfile_factory',
2360
2400
'deflated_size', 'tempfile', 'cancelled', 'upload_req',
2401
'marker_maybe', 'n_bytes_written_last')
2363
2403
logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2364
2404
'size', 'fileobj_factory')
2383
2423
self.marker_maybe = None
2384
2424
if (self.share_id, self.node_id) in self.action_queue.uploading:
2385
2425
self.action_queue.cancel_upload(self.share_id, self.node_id)
2426
self.n_bytes_written_last = None # set by _run
2387
2428
def _is_runnable(self):
2388
2429
"""Returns True if there is sufficient space available to complete
2446
2487
if getattr(self.tempfile, 'name', None) is not None:
2447
2488
self.tempfile = open(self.tempfile.name)
2448
f = UploadProgressWrapper(self.tempfile, uploading)
2489
self.n_bytes_written_last = 0
2490
f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
2449
2491
req = self.action_queue.client.put_content_request(
2450
2492
self.share_id, self.node_id, self.previous_hash, self.hash,
2451
2493
self.crc32, self.size, self.deflated_size, f)
2459
2501
d.addBoth(passit(lambda _: self.tempfile.close()))
2504
def progress_hook(self, n_bytes_written):
2505
"""Convert uploading progress into an event."""
2506
n_bytes_written_last = self.n_bytes_written_last
2507
self.n_bytes_written_last = n_bytes_written
2508
# produce an event only if there has been a threshold-sized progress
2509
if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
2511
self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
2512
share_id=self.share_id,
2513
node_id=self.node_id,
2514
n_bytes_written=n_bytes_written,
2515
deflated_size=self.deflated_size)
2462
2517
def handle_success(self, _):
2464
2519
It worked! Push the event.