~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise-201201132228

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Package Import Robot
  • Author(s): Rodney Dawes
  • Date: 2011-12-21 15:46:25 UTC
  • mfrom: (1.1.56)
  • Revision ID: package-import@ubuntu.com-20111221154625-ujvunri4frsecj2k
Tags: 2.99.0-0ubuntu1
* New upstream release.
  - Verify timestamp to avoid invalid auth failures (LP: #692597)
  - Files in new UDFs not uploaded due to filtering (LP: #869920)
* debian/patches:
  - Remove upstreamed patches

Show diffs side-by-side

added added

removed removed

Lines of Context:
51
51
from twisted.python.failure import Failure, DefaultException
52
52
 
53
53
from oauth import oauth
 
54
from ubuntu_sso.utils import timestamp_checker
54
55
from ubuntuone import clientdefs
55
56
from ubuntuone.platform import platform, remove_file
56
57
from ubuntuone.storageprotocol import protocol_pb2, content_hash
57
58
from ubuntuone.storageprotocol import errors as protocol_errors
58
59
from ubuntuone.storageprotocol.client import (
59
 
    ThrottlingStorageClient, ThrottlingStorageClientFactory
 
60
    ThrottlingStorageClient,
 
61
    ThrottlingStorageClientFactory,
60
62
)
61
63
from ubuntuone.storageprotocol.context import get_ssl_context
62
64
from ubuntuone.syncdaemon.interfaces import IActionQueue, IMarker
63
65
from ubuntuone.syncdaemon.logger import mklog, TRACE
 
66
from ubuntuone.syncdaemon.volume_manager import ACCESS_LEVEL_RW
64
67
from ubuntuone.syncdaemon import config
65
68
 
66
69
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
424
427
        the most of the file reading.
425
428
        """
426
429
        filename = getattr(fileobj, 'name', '<?>')
 
430
        tempfile = None
427
431
        failed = False
428
432
 
429
433
        try:
433
437
            upload.log.debug('compressing: %r', filename)
434
438
            # we need to compress the file completely to figure out its
435
439
            # compressed size. So streaming is out :(
436
 
            upload.tempfile = f = NamedTemporaryFile()
 
440
            tempfile = NamedTemporaryFile()
437
441
            zipper = zlib.compressobj()
438
442
            magic_hasher = content_hash.magic_hash_factory()
439
443
            while not upload.cancelled:
440
444
                data = fileobj.read(4096)
441
445
                if not data:
442
 
                    f.write(zipper.flush())
 
446
                    tempfile.write(zipper.flush())
443
447
                    # no flush/sync because we don't need this to persist
444
448
                    # on disk; if the machine goes down, we'll lose it
445
449
                    # anyway (being in /tmp and all)
446
450
                    break
447
 
                f.write(zipper.compress(data))
 
451
                tempfile.write(zipper.compress(data))
448
452
                magic_hasher.update(data)
449
 
            upload.deflated_size = f.tell()
 
453
            # ensure that the contents are phisically in the file, some
 
454
            # operating systems will not ensure this, even in the same process
 
455
            tempfile.flush()
 
456
            upload.deflated_size = tempfile.tell()
450
457
 
451
458
            upload.magic_hash = magic_hasher.content_hash()
452
459
        except Exception, e:  # pylint: disable-msg=W0703
453
460
            failed = True
 
461
            if tempfile is not None:
 
462
                tempfile.close()
 
463
                remove_file(tempfile.name)
454
464
            reactor.callFromThread(deferred.errback, e)
455
465
        finally:
456
466
            # avoid triggering the deferred if already failed!
457
467
            if not failed:
 
468
                upload.tempfile = tempfile
458
469
                reactor.callFromThread(deferred.callback, True)
459
470
 
460
471
    @defer.inlineCallbacks
817
828
        ssl_context = get_ssl_context(self.disable_ssl_verify)
818
829
        if self.use_ssl:
819
830
            self.connector = reactor.connectSSL(host, port, factory=self,
820
 
                                                contextFactory=ssl_context,
821
 
                                                timeout=self.connection_timeout)
 
831
                                            contextFactory=ssl_context,
 
832
                                            timeout=self.connection_timeout)
822
833
        else:
823
834
            self.connector = reactor.connectTCP(host, port, self,
824
 
                                                timeout=self.connection_timeout)
 
835
                                            timeout=self.connection_timeout)
825
836
 
826
837
    def connect(self):
827
838
        """Start the circus going."""
967
978
            request=self.client.protocol_version,
968
979
            request_error=protocol_errors.UnsupportedVersionError,
969
980
            event_error='SYS_PROTOCOL_VERSION_ERROR',
970
 
            event_ok='SYS_PROTOCOL_VERSION_OK'
971
 
        )
 
981
            event_ok='SYS_PROTOCOL_VERSION_OK')
972
982
        return check_version_d
973
983
 
974
984
    @defer.inlineCallbacks
990
1000
            request_error=StandardError,
991
1001
            event_error='SYS_SET_CAPABILITIES_ERROR',
992
1002
            event_ok=None,
993
 
            args=('query_caps', caps, error_msg)
994
 
        )
 
1003
            args=('query_caps', caps, error_msg))
995
1004
        req = yield query_caps_d
996
1005
 
997
1006
        # req can be None if set capabilities failed, error is handled by
1005
1014
            request_error=StandardError,
1006
1015
            event_error='SYS_SET_CAPABILITIES_ERROR',
1007
1016
            event_ok='SYS_SET_CAPABILITIES_OK',
1008
 
            args=('set_caps', caps, error_msg)
1009
 
        )
 
1017
            args=('set_caps', caps, error_msg))
1010
1018
        yield set_caps_d
1011
1019
 
1012
1020
    @defer.inlineCallbacks
1019
1027
            request_error=protocol_errors.AuthenticationFailedError,
1020
1028
            event_error='SYS_AUTH_ERROR', event_ok='SYS_AUTH_OK',
1021
1029
            # XXX: handle self.token is None or self.consumer is None?
1022
 
            args=(self.consumer, self.token, metadata)
1023
 
        )
 
1030
            args=(self.consumer, self.token, metadata))
1024
1031
        req = yield authenticate_d
1025
1032
 
1026
1033
        # req can be None if the auth failed, but it's handled by
1166
1173
        [x for x in protocol_errors._error_mapping.values()
1167
1174
         if x is not protocol_errors.InternalError] +
1168
1175
        [protocol_errors.RequestCancelledError,
1169
 
         twisted_errors.ConnectionDone, twisted_errors.ConnectionLost]
1170
 
    )
 
1176
         twisted_errors.ConnectionDone, twisted_errors.ConnectionLost])
1171
1177
 
1172
1178
    retryable_errors = (
1173
1179
        protocol_errors.TryAgainError,
1442
1448
        self.parent_id = parent_id
1443
1449
        # Unicode boundary! the name is Unicode in protocol and server, but
1444
1450
        # here we use bytes for paths
1445
 
        self.name = name.decode("utf8")
 
1451
        self.name = name.decode("utf-8")
1446
1452
        self.marker = marker
1447
1453
        self.path = path
1448
1454
 
1506
1512
        self.new_parent_id = new_parent_id
1507
1513
        # Unicode boundary! the name is Unicode in protocol and server, but
1508
1514
        # here we use bytes for paths
1509
 
        self.new_name = new_name.decode("utf8")
 
1515
        self.new_name = new_name.decode("utf-8")
1510
1516
        self.path_from = path_from
1511
1517
        self.path_to = path_to
1512
1518
 
1735
1741
        if share_to and re.match(EREGEX, share_to):
1736
1742
            self.use_http = True
1737
1743
 
1738
 
    def _create_share_http(self, node_id, user, name, read_only, deferred):
 
1744
    def _create_share_http(self, node_id, user, name, read_only):
1739
1745
        """Create a share using the HTTP Web API method."""
1740
1746
 
1741
1747
        url = "https://one.ubuntu.com/files/api/offer_share/"
1742
1748
        method = oauth.OAuthSignatureMethod_PLAINTEXT()
 
1749
        timestamp = timestamp_checker.get_faithful_time()
 
1750
        parameters = {"oauth_timestamp": timestamp}
1743
1751
        request = oauth.OAuthRequest.from_consumer_and_token(
1744
1752
            http_url=url,
1745
1753
            http_method="POST",
 
1754
            parameters=parameters,
1746
1755
            oauth_consumer=self.action_queue.consumer,
1747
1756
            token=self.action_queue.token)
1748
1757
        request.sign_request(method, self.action_queue.consumer,
1754
1763
        pdata = urlencode(data)
1755
1764
        headers = request.to_header()
1756
1765
        req = Request(url, pdata, headers)
1757
 
        try:
1758
 
            urlopen(req)
1759
 
        except HTTPError, e:
1760
 
            reactor.callFromThread(deferred.errback, Failure(e))
1761
 
 
1762
 
        reactor.callFromThread(deferred.callback, None)
 
1766
        urlopen(req)
1763
1767
 
1764
1768
    def _run(self):
1765
1769
        """Do the actual running."""
1766
1770
        if self.use_http:
1767
1771
            # External user, do the HTTP REST method
1768
 
            deferred = defer.Deferred()
1769
 
            d = threads.deferToThread(self._create_share_http,
1770
 
                                      self.node_id, self.share_to,
1771
 
                                      self.name, self.access_level != 'Modify',
1772
 
                                      deferred)
1773
 
            d.addErrback(deferred.errback)
1774
 
            return deferred
 
1772
            return threads.deferToThread(self._create_share_http,
 
1773
                                         self.node_id, self.share_to,
 
1774
                                         self.name,
 
1775
                                         self.access_level != ACCESS_LEVEL_RW)
1775
1776
        else:
1776
1777
            return self.action_queue.client.create_share(self.node_id,
1777
1778
                                                         self.share_to,
1976
1977
                    self.log.debug(m, queued_command)
1977
1978
                    self._queue.remove(queued_command)
1978
1979
            else:
1979
 
                self.log.debug("not queueing self because there's other "
1980
 
                               "command with less or same gen num")
1981
 
                return False
 
1980
                if not queued_command.running:
 
1981
                    self.log.debug("not queueing self because there's other "
 
1982
                                   "(not running) command with less or "
 
1983
                                   "same gen num")
 
1984
                    return False
1982
1985
 
1983
1986
        # no similar command, or removed the previous command (if not running)
1984
1987
        return True
2085
2088
 
2086
2089
        url = "https://one.ubuntu.com/files/api/set_public/%s" % (node_key,)
2087
2090
        method = oauth.OAuthSignatureMethod_PLAINTEXT()
 
2091
        timestamp = timestamp_checker.get_faithful_time()
 
2092
        parameters = {"oauth_timestamp": timestamp}
2088
2093
        request = oauth.OAuthRequest.from_consumer_and_token(
2089
2094
            http_url=url,
2090
2095
            http_method="POST",
 
2096
            parameters=parameters,
2091
2097
            oauth_consumer=self.action_queue.consumer,
2092
2098
            token=self.action_queue.token)
2093
2099
        request.sign_request(method, self.action_queue.consumer,
2137
2143
        """Get public files list using the HTTP Web API method."""
2138
2144
 
2139
2145
        method = oauth.OAuthSignatureMethod_PLAINTEXT()
 
2146
        timestamp = timestamp_checker.get_faithful_time()
 
2147
        parameters = {"oauth_timestamp": timestamp}
2140
2148
        request = oauth.OAuthRequest.from_consumer_and_token(
2141
2149
            http_url=self._url,
2142
2150
            http_method="GET",
 
2151
            parameters=parameters,
2143
2152
            oauth_consumer=self.action_queue.consumer,
2144
2153
            token=self.action_queue.token)
2145
2154
        request.sign_request(method, self.action_queue.consumer,
2377
2386
 
2378
2387
    __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2379
2388
                 'size', 'fileobj_factory', 'magic_hash',
2380
 
                 'deflated_size', 'tempfile', 'upload_req', 'tx_semaphore',
2381
 
                 'n_bytes_written_last', 'path', 'n_bytes_written', 'upload_id')
 
2389
                 'deflated_size', 'tempfile', 'upload_req',
 
2390
                 'tx_semaphore', 'n_bytes_written_last', 'path',
 
2391
                 'n_bytes_written', 'upload_id')
2382
2392
 
2383
2393
    logged_attrs = ActionQueueCommand.logged_attrs + (
2384
2394
                    'share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2460
2470
    def cleanup(self):
2461
2471
        """Cleanup: stop the producer."""
2462
2472
        self.log.debug('cleanup')
2463
 
        if self.upload_req is not None and self.upload_req.producer is not None:
 
2473
        if self.upload_req is not None and \
 
2474
        self.upload_req.producer is not None:
2464
2475
            self.log.debug('stopping the producer')
2465
2476
            self.upload_req.producer.stopProducing()
2466
2477