51
51
from twisted.python.failure import Failure, DefaultException
53
53
from oauth import oauth
54
from ubuntu_sso.utils import timestamp_checker
54
55
from ubuntuone import clientdefs
55
56
from ubuntuone.platform import platform, remove_file
56
57
from ubuntuone.storageprotocol import protocol_pb2, content_hash
57
58
from ubuntuone.storageprotocol import errors as protocol_errors
58
59
from ubuntuone.storageprotocol.client import (
59
ThrottlingStorageClient, ThrottlingStorageClientFactory
60
ThrottlingStorageClient,
61
ThrottlingStorageClientFactory,
61
63
from ubuntuone.storageprotocol.context import get_ssl_context
62
64
from ubuntuone.syncdaemon.interfaces import IActionQueue, IMarker
63
65
from ubuntuone.syncdaemon.logger import mklog, TRACE
66
from ubuntuone.syncdaemon.volume_manager import ACCESS_LEVEL_RW
64
67
from ubuntuone.syncdaemon import config
66
69
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
433
437
upload.log.debug('compressing: %r', filename)
434
438
# we need to compress the file completely to figure out its
435
439
# compressed size. So streaming is out :(
436
upload.tempfile = f = NamedTemporaryFile()
440
tempfile = NamedTemporaryFile()
437
441
zipper = zlib.compressobj()
438
442
magic_hasher = content_hash.magic_hash_factory()
439
443
while not upload.cancelled:
440
444
data = fileobj.read(4096)
442
f.write(zipper.flush())
446
tempfile.write(zipper.flush())
443
447
# no flush/sync because we don't need this to persist
444
448
# on disk; if the machine goes down, we'll lose it
445
449
# anyway (being in /tmp and all)
447
f.write(zipper.compress(data))
451
tempfile.write(zipper.compress(data))
448
452
magic_hasher.update(data)
449
upload.deflated_size = f.tell()
453
# ensure that the contents are phisically in the file, some
454
# operating systems will not ensure this, even in the same process
456
upload.deflated_size = tempfile.tell()
451
458
upload.magic_hash = magic_hasher.content_hash()
452
459
except Exception, e: # pylint: disable-msg=W0703
461
if tempfile is not None:
463
remove_file(tempfile.name)
454
464
reactor.callFromThread(deferred.errback, e)
456
466
# avoid triggering the deferred if already failed!
468
upload.tempfile = tempfile
458
469
reactor.callFromThread(deferred.callback, True)
460
471
@defer.inlineCallbacks
817
828
ssl_context = get_ssl_context(self.disable_ssl_verify)
819
830
self.connector = reactor.connectSSL(host, port, factory=self,
820
contextFactory=ssl_context,
821
timeout=self.connection_timeout)
831
contextFactory=ssl_context,
832
timeout=self.connection_timeout)
823
834
self.connector = reactor.connectTCP(host, port, self,
824
timeout=self.connection_timeout)
835
timeout=self.connection_timeout)
826
837
def connect(self):
827
838
"""Start the circus going."""
967
978
request=self.client.protocol_version,
968
979
request_error=protocol_errors.UnsupportedVersionError,
969
980
event_error='SYS_PROTOCOL_VERSION_ERROR',
970
event_ok='SYS_PROTOCOL_VERSION_OK'
981
event_ok='SYS_PROTOCOL_VERSION_OK')
972
982
return check_version_d
974
984
@defer.inlineCallbacks
990
1000
request_error=StandardError,
991
1001
event_error='SYS_SET_CAPABILITIES_ERROR',
993
args=('query_caps', caps, error_msg)
1003
args=('query_caps', caps, error_msg))
995
1004
req = yield query_caps_d
997
1006
# req can be None if set capabilities failed, error is handled by
1005
1014
request_error=StandardError,
1006
1015
event_error='SYS_SET_CAPABILITIES_ERROR',
1007
1016
event_ok='SYS_SET_CAPABILITIES_OK',
1008
args=('set_caps', caps, error_msg)
1017
args=('set_caps', caps, error_msg))
1010
1018
yield set_caps_d
1012
1020
@defer.inlineCallbacks
1019
1027
request_error=protocol_errors.AuthenticationFailedError,
1020
1028
event_error='SYS_AUTH_ERROR', event_ok='SYS_AUTH_OK',
1021
1029
# XXX: handle self.token is None or self.consumer is None?
1022
args=(self.consumer, self.token, metadata)
1030
args=(self.consumer, self.token, metadata))
1024
1031
req = yield authenticate_d
1026
1033
# req can be None if the auth failed, but it's handled by
1166
1173
[x for x in protocol_errors._error_mapping.values()
1167
1174
if x is not protocol_errors.InternalError] +
1168
1175
[protocol_errors.RequestCancelledError,
1169
twisted_errors.ConnectionDone, twisted_errors.ConnectionLost]
1176
twisted_errors.ConnectionDone, twisted_errors.ConnectionLost])
1172
1178
retryable_errors = (
1173
1179
protocol_errors.TryAgainError,
1442
1448
self.parent_id = parent_id
1443
1449
# Unicode boundary! the name is Unicode in protocol and server, but
1444
1450
# here we use bytes for paths
1445
self.name = name.decode("utf8")
1451
self.name = name.decode("utf-8")
1446
1452
self.marker = marker
1447
1453
self.path = path
1506
1512
self.new_parent_id = new_parent_id
1507
1513
# Unicode boundary! the name is Unicode in protocol and server, but
1508
1514
# here we use bytes for paths
1509
self.new_name = new_name.decode("utf8")
1515
self.new_name = new_name.decode("utf-8")
1510
1516
self.path_from = path_from
1511
1517
self.path_to = path_to
1735
1741
if share_to and re.match(EREGEX, share_to):
1736
1742
self.use_http = True
1738
def _create_share_http(self, node_id, user, name, read_only, deferred):
1744
def _create_share_http(self, node_id, user, name, read_only):
1739
1745
"""Create a share using the HTTP Web API method."""
1741
1747
url = "https://one.ubuntu.com/files/api/offer_share/"
1742
1748
method = oauth.OAuthSignatureMethod_PLAINTEXT()
1749
timestamp = timestamp_checker.get_faithful_time()
1750
parameters = {"oauth_timestamp": timestamp}
1743
1751
request = oauth.OAuthRequest.from_consumer_and_token(
1745
1753
http_method="POST",
1754
parameters=parameters,
1746
1755
oauth_consumer=self.action_queue.consumer,
1747
1756
token=self.action_queue.token)
1748
1757
request.sign_request(method, self.action_queue.consumer,
1754
1763
pdata = urlencode(data)
1755
1764
headers = request.to_header()
1756
1765
req = Request(url, pdata, headers)
1759
except HTTPError, e:
1760
reactor.callFromThread(deferred.errback, Failure(e))
1762
reactor.callFromThread(deferred.callback, None)
1764
1768
def _run(self):
1765
1769
"""Do the actual running."""
1766
1770
if self.use_http:
1767
1771
# External user, do the HTTP REST method
1768
deferred = defer.Deferred()
1769
d = threads.deferToThread(self._create_share_http,
1770
self.node_id, self.share_to,
1771
self.name, self.access_level != 'Modify',
1773
d.addErrback(deferred.errback)
1772
return threads.deferToThread(self._create_share_http,
1773
self.node_id, self.share_to,
1775
self.access_level != ACCESS_LEVEL_RW)
1776
1777
return self.action_queue.client.create_share(self.node_id,
1976
1977
self.log.debug(m, queued_command)
1977
1978
self._queue.remove(queued_command)
1979
self.log.debug("not queueing self because there's other "
1980
"command with less or same gen num")
1980
if not queued_command.running:
1981
self.log.debug("not queueing self because there's other "
1982
"(not running) command with less or "
1983
1986
# no similar command, or removed the previous command (if not running)
2086
2089
url = "https://one.ubuntu.com/files/api/set_public/%s" % (node_key,)
2087
2090
method = oauth.OAuthSignatureMethod_PLAINTEXT()
2091
timestamp = timestamp_checker.get_faithful_time()
2092
parameters = {"oauth_timestamp": timestamp}
2088
2093
request = oauth.OAuthRequest.from_consumer_and_token(
2090
2095
http_method="POST",
2096
parameters=parameters,
2091
2097
oauth_consumer=self.action_queue.consumer,
2092
2098
token=self.action_queue.token)
2093
2099
request.sign_request(method, self.action_queue.consumer,
2137
2143
"""Get public files list using the HTTP Web API method."""
2139
2145
method = oauth.OAuthSignatureMethod_PLAINTEXT()
2146
timestamp = timestamp_checker.get_faithful_time()
2147
parameters = {"oauth_timestamp": timestamp}
2140
2148
request = oauth.OAuthRequest.from_consumer_and_token(
2141
2149
http_url=self._url,
2142
2150
http_method="GET",
2151
parameters=parameters,
2143
2152
oauth_consumer=self.action_queue.consumer,
2144
2153
token=self.action_queue.token)
2145
2154
request.sign_request(method, self.action_queue.consumer,
2378
2387
__slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2379
2388
'size', 'fileobj_factory', 'magic_hash',
2380
'deflated_size', 'tempfile', 'upload_req', 'tx_semaphore',
2381
'n_bytes_written_last', 'path', 'n_bytes_written', 'upload_id')
2389
'deflated_size', 'tempfile', 'upload_req',
2390
'tx_semaphore', 'n_bytes_written_last', 'path',
2391
'n_bytes_written', 'upload_id')
2383
2393
logged_attrs = ActionQueueCommand.logged_attrs + (
2384
2394
'share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2460
2470
def cleanup(self):
2461
2471
"""Cleanup: stop the producer."""
2462
2472
self.log.debug('cleanup')
2463
if self.upload_req is not None and self.upload_req.producer is not None:
2473
if self.upload_req is not None and \
2474
self.upload_req.producer is not None:
2464
2475
self.log.debug('stopping the producer')
2465
2476
self.upload_req.producer.stopProducing()