~nataliabidart/ubuntuone-client/split-oauth

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Tarmac
  • Author(s): Samuele Pedroni (Canonical Services Ltd.)
  • Date: 2010-05-18 14:05:52 UTC
  • mfrom: (511.1.17 transfer-progress-events)
  • Revision ID: dobey@wayofthemonkey.com-20100518140552-d3l55ob7yfdz0ekw
addressing the demands of Bug #570747, introduces internal events AQ_DOWNLOAD/UPLOAD_FILE_PROGRESS that then get converted into D-Bus signals Download/UploadFileProgress.

Show diffs side-by-side

added added

removed removed

Lines of Context:
66
66
# Regular expression to validate an e-mail address
67
67
EREGEX = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
68
68
 
 
69
# progress threshold to emit a download/upload progress event: 64Kb
 
70
TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024
69
71
 
70
72
def passit(func):
71
73
    """Pass the value on for the next deferred, while calling func with it."""
598
600
    """A wrapper around the file-like object used for Uploads.
599
601
 
600
602
    It can be used to keep track of the number of bytes that have been
601
 
    written to the store.
 
603
    written to the store and invokes a hook on progress.
602
604
 
603
605
    """
604
606
 
605
 
    __slots__ = ('fd', 'data_dict', 'n_bytes_read')
 
607
    __slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
606
608
 
607
 
    def __init__(self, fd, data_dict):
 
609
    def __init__(self, fd, data_dict, progress_hook):
608
610
        """
609
611
        fd is the file-like object used for uploads. data_dict is the
610
612
        entry in the uploading dictionary.
612
614
        self.fd = fd
613
615
        self.data_dict = data_dict
614
616
        self.n_bytes_read = 0
 
617
        self.progress_hook = progress_hook
615
618
 
616
619
    def read(self, size=None):
617
620
        """
623
626
        latter is done directly in the data_dict.
624
627
        """
625
628
        self.data_dict['n_bytes_written'] = self.n_bytes_read
 
629
        self.progress_hook(self.n_bytes_read)
 
630
        
626
631
        data = self.fd.read(size)
627
632
        self.n_bytes_read += len(data)
628
633
        return data
2192
2197
 
2193
2198
    __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
2194
2199
                 'fileobj', 'gunzip', 'marker_maybe', 'cancelled',
2195
 
                 'download_req')
 
2200
                 'download_req', 'deflated_size')
2196
2201
    logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory')
2197
2202
 
2198
2203
    def __init__(self, request_queue, share_id, node_id, server_hash,
2243
2248
                                                        'command': self}
2244
2249
        assert downloading[self.share_id, self.node_id]['command'] is self
2245
2250
        offset = downloading[self.share_id, self.node_id]['n_bytes_read']
2246
 
 
 
2251
        self.progress_start(offset)
2247
2252
        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
2248
2253
                                           share_id=self.share_id,
2249
2254
                                           node_id=self.node_id,
2318
2323
        self.fileobj.write(self.gunzip.decompress(bytes))
2319
2324
        self.fileobj.flush()     # not strictly necessary but nice to
2320
2325
                                 # see the downloaded size
2321
 
 
 
2326
        self.progress_hook(dloading['n_bytes_read'])
 
2327
 
 
2328
    def progress_start(self, n_bytes_read_already):
 
2329
        """Hook to start tracking progress."""
 
2330
        pass
 
2331
 
 
2332
    def progress_hook(self, n_bytes_read):
 
2333
        """Hook to track progress."""
 
2334
        pass
 
2335
        
2322
2336
    def nacb(self, **kwargs):
2323
2337
        """Set the node attrs in the 'currently downloading' dict."""
 
2338
        self.deflated_size = kwargs['deflated_size']
2324
2339
        self.action_queue.downloading[self.share_id,
2325
2340
                                      self.node_id].update(kwargs)
2326
2341
 
2353
2368
class Download(GetContentMixin, ActionQueueCommand):
2354
2369
    """Get the contents of a file."""
2355
2370
 
2356
 
    __slots__ = ()
 
2371
    __slots__ = ('n_bytes_read_last')
2357
2372
 
2358
2373
    is_dir = False
2359
2374
 
 
2375
    def progress_start(self, n_bytes_read_already):
 
2376
        """Start tracking progress.
 
2377
 
 
2378
        Consider that n_bytes_read_already have been already read.
 
2379
        """
 
2380
        self.n_bytes_read_last = n_bytes_read_already
 
2381
 
 
2382
    def progress_hook(self, n_bytes_read):
 
2383
        """Convert downloading progress into an event."""
 
2384
        n_bytes_read_last = self.n_bytes_read_last
 
2385
        self.n_bytes_read_last = n_bytes_read
 
2386
        # produce an event only if there has been a threshold-sized progress
 
2387
        if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
 
2388
            return
 
2389
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
 
2390
                                      share_id=self.share_id,
 
2391
                                      node_id=self.node_id,
 
2392
                                      n_bytes_read=n_bytes_read,
 
2393
                                      deflated_size=self.deflated_size)
2360
2394
 
2361
2395
class Upload(ActionQueueCommand):
2362
2396
    """Upload stuff to a file."""
2364
2398
    __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2365
2399
                 'size', 'fileobj_factory', 'tempfile_factory',
2366
2400
                 'deflated_size', 'tempfile', 'cancelled', 'upload_req',
2367
 
                 'marker_maybe')
 
2401
                 'marker_maybe', 'n_bytes_written_last')
2368
2402
 
2369
2403
    logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2370
2404
                    'size', 'fileobj_factory')
2389
2423
        self.marker_maybe = None
2390
2424
        if (self.share_id, self.node_id) in self.action_queue.uploading:
2391
2425
            self.action_queue.cancel_upload(self.share_id, self.node_id)
 
2426
        self.n_bytes_written_last = None # set by _run
2392
2427
 
2393
2428
    def _is_runnable(self):
2394
2429
        """Returns True if there is sufficient space available to complete
2451
2486
 
2452
2487
        if getattr(self.tempfile, 'name', None) is not None:
2453
2488
            self.tempfile = open(self.tempfile.name)
2454
 
        f = UploadProgressWrapper(self.tempfile, uploading)
 
2489
        self.n_bytes_written_last = 0
 
2490
        f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
2455
2491
        req = self.action_queue.client.put_content_request(
2456
2492
            self.share_id, self.node_id, self.previous_hash, self.hash,
2457
2493
            self.crc32, self.size, self.deflated_size, f)
2465
2501
        d.addBoth(passit(lambda _: self.tempfile.close()))
2466
2502
        return d
2467
2503
 
 
2504
    def progress_hook(self, n_bytes_written):
 
2505
        """Convert uploading progress into an event."""
 
2506
        n_bytes_written_last = self.n_bytes_written_last
 
2507
        self.n_bytes_written_last = n_bytes_written
 
2508
        # produce an event only if there has been a threshold-sized progress
 
2509
        if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
 
2510
            return
 
2511
        self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
 
2512
                                      share_id=self.share_id,
 
2513
                                      node_id=self.node_id,
 
2514
                                      n_bytes_written=n_bytes_written,
 
2515
                                      deflated_size=self.deflated_size)
 
2516
 
2468
2517
    def handle_success(self, _):
2469
2518
        """
2470
2519
        It worked! Push the event.