~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2010-06-08 17:31:18 UTC
  • mto: This revision was merged to the branch mainline in revision 31.
  • Revision ID: james.westby@ubuntu.com-20100608173118-o8s897ll11rtne99
Tags: upstream-1.3.0
ImportĀ upstreamĀ versionĀ 1.3.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
66
66
# Regular expression to validate an e-mail address
67
67
EREGEX = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
68
68
 
 
69
# progress threshold to emit a download/upload progress event: 64Kb
 
70
TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024
69
71
 
70
72
def passit(func):
71
73
    """Pass the value on for the next deferred, while calling func with it."""
598
600
    """A wrapper around the file-like object used for Uploads.
599
601
 
600
602
    It can be used to keep track of the number of bytes that have been
601
 
    written to the store.
 
603
    written to the store and invokes a hook on progress.
602
604
 
603
605
    """
604
606
 
605
 
    __slots__ = ('fd', 'data_dict', 'n_bytes_read')
 
607
    __slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
606
608
 
607
 
    def __init__(self, fd, data_dict):
 
609
    def __init__(self, fd, data_dict, progress_hook):
608
610
        """
609
611
        fd is the file-like object used for uploads. data_dict is the
610
612
        entry in the uploading dictionary.
612
614
        self.fd = fd
613
615
        self.data_dict = data_dict
614
616
        self.n_bytes_read = 0
 
617
        self.progress_hook = progress_hook
615
618
 
616
619
    def read(self, size=None):
617
620
        """
623
626
        latter is done directly in the data_dict.
624
627
        """
625
628
        self.data_dict['n_bytes_written'] = self.n_bytes_read
 
629
        self.progress_hook(self.n_bytes_read)
 
630
        
626
631
        data = self.fd.read(size)
627
632
        self.n_bytes_read += len(data)
628
633
        return data
901
906
        client = self.client
902
907
        req_name = request.__name__
903
908
        failure = None
 
909
        event = None
904
910
        result = None
905
911
        try:
906
912
            try:
915
921
                                   client, self.client)
916
922
                    return
917
923
        except request_error, failure:
 
924
            event = event_error
918
925
            self.event_queue.push(event_error, error=str(failure))
919
926
        except protocol_errors.AuthenticationRequiredError, failure:
920
927
            # we need to separate this case from the rest because an
921
928
            # AuthenticationRequiredError is an StorageRequestError,
922
929
            # and we treat it differently.
923
 
            self.event_queue.push('SYS_UNKNOWN_ERROR')
 
930
            event = 'SYS_UNKNOWN_ERROR'
 
931
            self.event_queue.push(event)
924
932
        except protocol_errors.StorageRequestError, failure:
925
 
            self.event_queue.push('SYS_SERVER_ERROR', error=str(failure))
 
933
            event = 'SYS_SERVER_ERROR'
 
934
            self.event_queue.push(event, error=str(failure))
926
935
        except Exception, failure:
927
 
            self.event_queue.push('SYS_UNKNOWN_ERROR')
 
936
            event = 'SYS_UNKNOWN_ERROR'
 
937
            self.event_queue.push(event)
928
938
        else:
929
939
            logger.info("The request '%s' finished OK.", req_name)
930
940
            if event_ok is not None:
931
941
                self.event_queue.push(event_ok)
932
942
 
933
943
        if failure is not None:
934
 
            logger.error("The request '%s' failed with the error:\n\n%s\n",
935
 
                         req_name, failure)
 
944
            logger.info("The request '%s' failed with the error: %s "
 
945
                        "and was handled with the event: %s",
 
946
                         req_name, failure, event)
936
947
            if fire_deferred:
937
948
                # it looks like we won't be authenticating, so hook up the
938
949
                # for-testing deferred now
2186
2197
 
2187
2198
    __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
2188
2199
                 'fileobj', 'gunzip', 'marker_maybe', 'cancelled',
2189
 
                 'download_req')
 
2200
                 'download_req', 'deflated_size')
2190
2201
    logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory')
2191
2202
 
2192
2203
    def __init__(self, request_queue, share_id, node_id, server_hash,
2237
2248
                                                        'command': self}
2238
2249
        assert downloading[self.share_id, self.node_id]['command'] is self
2239
2250
        offset = downloading[self.share_id, self.node_id]['n_bytes_read']
2240
 
 
 
2251
        self.progress_start(offset)
2241
2252
        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
2242
2253
                                           share_id=self.share_id,
2243
2254
                                           node_id=self.node_id,
2312
2323
        self.fileobj.write(self.gunzip.decompress(bytes))
2313
2324
        self.fileobj.flush()     # not strictly necessary but nice to
2314
2325
                                 # see the downloaded size
2315
 
 
 
2326
        self.progress_hook(dloading['n_bytes_read'])
 
2327
 
 
2328
    def progress_start(self, n_bytes_read_already):
 
2329
        """Hook to start tracking progress."""
 
2330
        pass
 
2331
 
 
2332
    def progress_hook(self, n_bytes_read):
 
2333
        """Hook to track progress."""
 
2334
        pass
 
2335
        
2316
2336
    def nacb(self, **kwargs):
2317
2337
        """Set the node attrs in the 'currently downloading' dict."""
 
2338
        self.deflated_size = kwargs['deflated_size']
2318
2339
        self.action_queue.downloading[self.share_id,
2319
2340
                                      self.node_id].update(kwargs)
2320
2341
 
2347
2368
class Download(GetContentMixin, ActionQueueCommand):
2348
2369
    """Get the contents of a file."""
2349
2370
 
2350
 
    __slots__ = ()
 
2371
    __slots__ = ('n_bytes_read_last')
2351
2372
 
2352
2373
    is_dir = False
2353
2374
 
 
2375
    def progress_start(self, n_bytes_read_already):
 
2376
        """Start tracking progress.
 
2377
 
 
2378
        Consider that n_bytes_read_already have been already read.
 
2379
        """
 
2380
        self.n_bytes_read_last = n_bytes_read_already
 
2381
 
 
2382
    def progress_hook(self, n_bytes_read):
 
2383
        """Convert downloading progress into an event."""
 
2384
        n_bytes_read_last = self.n_bytes_read_last
 
2385
        self.n_bytes_read_last = n_bytes_read
 
2386
        # produce an event only if there has been a threshold-sized progress
 
2387
        if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
 
2388
            return
 
2389
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
 
2390
                                      share_id=self.share_id,
 
2391
                                      node_id=self.node_id,
 
2392
                                      n_bytes_read=n_bytes_read,
 
2393
                                      deflated_size=self.deflated_size)
2354
2394
 
2355
2395
class Upload(ActionQueueCommand):
2356
2396
    """Upload stuff to a file."""
2358
2398
    __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2359
2399
                 'size', 'fileobj_factory', 'tempfile_factory',
2360
2400
                 'deflated_size', 'tempfile', 'cancelled', 'upload_req',
2361
 
                 'marker_maybe')
 
2401
                 'marker_maybe', 'n_bytes_written_last')
2362
2402
 
2363
2403
    logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2364
2404
                    'size', 'fileobj_factory')
2383
2423
        self.marker_maybe = None
2384
2424
        if (self.share_id, self.node_id) in self.action_queue.uploading:
2385
2425
            self.action_queue.cancel_upload(self.share_id, self.node_id)
 
2426
        self.n_bytes_written_last = None # set by _run
2386
2427
 
2387
2428
    def _is_runnable(self):
2388
2429
        """Returns True if there is sufficient space available to complete
2445
2486
 
2446
2487
        if getattr(self.tempfile, 'name', None) is not None:
2447
2488
            self.tempfile = open(self.tempfile.name)
2448
 
        f = UploadProgressWrapper(self.tempfile, uploading)
 
2489
        self.n_bytes_written_last = 0
 
2490
        f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
2449
2491
        req = self.action_queue.client.put_content_request(
2450
2492
            self.share_id, self.node_id, self.previous_hash, self.hash,
2451
2493
            self.crc32, self.size, self.deflated_size, f)
2459
2501
        d.addBoth(passit(lambda _: self.tempfile.close()))
2460
2502
        return d
2461
2503
 
 
2504
    def progress_hook(self, n_bytes_written):
 
2505
        """Convert uploading progress into an event."""
 
2506
        n_bytes_written_last = self.n_bytes_written_last
 
2507
        self.n_bytes_written_last = n_bytes_written
 
2508
        # produce an event only if there has been a threshold-sized progress
 
2509
        if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
 
2510
            return
 
2511
        self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
 
2512
                                      share_id=self.share_id,
 
2513
                                      node_id=self.node_id,
 
2514
                                      n_bytes_written=n_bytes_written,
 
2515
                                      deflated_size=self.deflated_size)
 
2516
 
2462
2517
    def handle_success(self, _):
2463
2518
        """
2464
2519
        It worked! Push the event.