~hadware/magicicada-server/trusty-support

« back to all changes in this revision

Viewing changes to src/server/content.py

  • Committer: Facundo Batista
  • Date: 2015-08-05 13:10:02 UTC
  • Revision ID: facundo@taniquetil.com.ar-20150805131002-he7b7k704d8o7js6
First released version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2008-2015 Canonical
 
2
#
 
3
# This program is free software: you can redistribute it and/or modify
 
4
# it under the terms of the GNU Affero General Public License as
 
5
# published by the Free Software Foundation, either version 3 of the
 
6
# License, or (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU Affero General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Affero General Public License
 
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 
15
#
 
16
# For further info, check  http://launchpad.net/filesync-server
 
17
 
 
18
"""Provides a layer to handle all the database objects from twisted.
 
19
 
 
20
This layer is the main interface to the RPC DAL.
 
21
"""
 
22
 
 
23
import calendar
 
24
import logging
 
25
import posixpath as pypath
 
26
import sys
 
27
import uuid
 
28
import weakref
 
29
import zlib
 
30
 
 
31
import twisted.internet.error
 
32
import twisted.web.error
 
33
 
 
34
from twisted.internet import defer
 
35
from twisted.python.failure import Failure
 
36
from twisted.web import http
 
37
 
 
38
from s3lib.s3lib import ProducerStopped
 
39
from s3lib.producers import S3Producer, NullConsumer
 
40
from backends.filesync.data import errors as dataerrors
 
41
from config import config
 
42
from ubuntuone import txutils
 
43
from ubuntuone.storage.server import errors, upload
 
44
from ubuntuone.storageprotocol import protocol_pb2
 
45
 
 
46
ZERO_LENGTH_CONTENT_KEY = ""
 
47
 
 
48
 
 
49
class NetworkRetry(txutils.NetworkRetry):
 
50
    """NetworkRetry subclass that raise RetryLimitReached."""
 
51
 
 
52
    def __init__(self, *args, **kwargs):
 
53
        super(NetworkRetry, self).__init__(*args, **kwargs)
 
54
        self.logger = logging.getLogger('storage.server')
 
55
 
 
56
    def _handle_retried_exc(self, exc, retry_count, func, args, kwargs):
 
57
        """Handle a exception inside the retry loop. (do nothing by default)"""
 
58
        self.logger.warning("Retrying: %s with: %s, %s - failed with: "
 
59
                            "%s('%s') - retry_count: %d", func, args, kwargs,
 
60
                            exc.__class__.__name__, exc, retry_count)
 
61
 
 
62
    def _handle_exc(self, exc):
 
63
        """Handle a exception raised by the last retry.
 
64
 
 
65
        This subclass returns a RetryLimitReached failure.
 
66
        """
 
67
        msg = ("Maximum retries (%i) reached. Please try again.\n"
 
68
               "Original exception: %s: %s" % (self.retries,
 
69
                                               exc.__class__.__name__,
 
70
                                               str(exc)))
 
71
        return defer.fail(Failure(dataerrors.RetryLimitReached(msg),
 
72
                                  dataerrors.RetryLimitReached,
 
73
                                  sys.exc_info()[2]))
 
74
 
 
75
 
 
76
class MultipartRetry(NetworkRetry):
 
77
    """NetworkRetry subclass that handle S3 errors."""
 
78
 
 
79
    def _handle_retried_exc(self, exc, retry_count, func, args, kwargs):
 
80
        """Handle a exception inside the retry loop.
 
81
 
 
82
        if it's a 404 error, bail out.
 
83
 
 
84
        This method is always called from a try/except.
 
85
        """
 
86
        if isinstance(exc, twisted.web.error.Error):
 
87
            # propagate 404 errors
 
88
            if exc.status == '404':
 
89
                raise
 
90
        super(MultipartRetry, self)._handle_retried_exc(
 
91
            exc, retry_count, func, args, kwargs)
 
92
 
 
93
 
 
94
class FalseProducer(object):
 
95
    """Not really a producer, just deliver all the data when asked to.
 
96
 
 
97
    It has all the methods to comply the Push or Pull Producer Interface,
 
98
    but the only one implemented is resumeProducing: sends all the data.
 
99
    """
 
100
 
 
101
    def __init__(self, data):
 
102
        self.data = data
 
103
        self.deferred = defer.Deferred()
 
104
        self.consumer = None
 
105
 
 
106
    def resumeProducing(self):
 
107
        """Resume producing, just send all the data."""
 
108
        if self.consumer:
 
109
            self.consumer.write(self.data)
 
110
        self.deferred.callback(True)
 
111
 
 
112
    def stopProducing(self):
 
113
        """Stop producing."""
 
114
 
 
115
    def pauseProducing(self):
 
116
        """Pause producing."""
 
117
 
 
118
 
 
119
class Node(object):
 
120
    """StorageObject proxy."""
 
121
 
 
122
    s3_retries = config.api_server.s3_retries
 
123
    s3_retry_wait = config.api_server.s3_retry_wait
 
124
 
 
125
    def __init__(self, manager, node):
 
126
        """Create a Node.
 
127
 
 
128
        @param manager: the ContentManager which created this object
 
129
        @param node: a dao.StorageNode
 
130
        """
 
131
        self.manager = manager
 
132
        self.id = node['id']
 
133
        self.volume_id = node['volume_id']
 
134
        self.path = node['path']
 
135
        self.name = node['name']
 
136
        self.parent_id = node['parent_id']
 
137
        self.is_file = node['is_file']
 
138
        self.content_hash = node['content_hash']
 
139
        self.size = node['size'] or 0
 
140
        self.crc32 = node['crc32'] or 0
 
141
        self.deflated_size = node['deflated_size'] or 0
 
142
        self.is_live = node['is_live']
 
143
        self.generation = node['generation']
 
144
        self.is_public = node['is_public']
 
145
        last_modif = node['last_modified']
 
146
 
 
147
        # special cases for no content
 
148
        if node['storage_key'] is None:
 
149
            self.has_content = False
 
150
            self.storage_key = ZERO_LENGTH_CONTENT_KEY
 
151
        else:
 
152
            self.has_content = node['has_content']
 
153
            self.storage_key = node['storage_key']
 
154
 
 
155
        self.last_modified = calendar.timegm(last_modif.timetuple())
 
156
        self.node = node
 
157
        self.logger = logging.getLogger('storage.server')
 
158
 
 
159
    @defer.inlineCallbacks
 
160
    def get_content(self, start=None, end=None, previous_hash=None, user=None):
 
161
        """Get the content for this node.
 
162
 
 
163
        @param start: the start offset
 
164
        @param end: the end offset
 
165
        @param previous_hash: not used for FileNode.
 
166
        @param user: the user doing the request, useful for logging.
 
167
        """
 
168
        if not self.is_file:
 
169
            raise TypeError("Content can be retrieved only on Files.")
 
170
        storage_key = self.storage_key
 
171
        size = self.deflated_size
 
172
        if storage_key == ZERO_LENGTH_CONTENT_KEY:
 
173
            # we send the compressed empty string
 
174
            producer = FalseProducer(zlib.compress(""))
 
175
            defer.returnValue(producer)
 
176
            return
 
177
        if start is not None or end is not None:
 
178
            headers = {'Range': 'bytes=%d-%d' % (
 
179
                start if start is not None else 0,
 
180
                end if end is not None else size)}
 
181
        else:
 
182
            headers = None
 
183
 
 
184
        retrier = NetworkRetry(catch=twisted.internet.error.TimeoutError,
 
185
                               retries=self.s3_retries,
 
186
                               retry_wait=self.s3_retry_wait)
 
187
        producer = yield retrier(self._get_producer,
 
188
                                 config.api_server.s3_bucket,
 
189
                                 str(storage_key), headers, user)
 
190
        defer.returnValue(producer)
 
191
 
 
192
    def _get_from_s3(self, bucket, key, headers, streaming):
 
193
        """Inner function to handle s3.get."""
 
194
        s3 = self.manager.factory.s3()
 
195
        return s3.get(bucket, key, headers=headers, streaming=streaming)
 
196
 
 
197
    @defer.inlineCallbacks
 
198
    def _get_producer(self, bucket, storage_key,
 
199
                      headers, user, streaming=True):
 
200
        """Return the content Producer."""
 
201
        try:
 
202
            get_response = self._get_from_s3(bucket, str(storage_key),
 
203
                                             headers, streaming)
 
204
            producer = yield get_response.deferred
 
205
        except twisted.web.error.Error, exc:
 
206
            # if fallback bucket is configured, and the file was
 
207
            # not found, try that bucket.
 
208
            if config.api_server.s3_fallback_bucket and exc.status == '404':
 
209
                get_response = self._get_from_s3(
 
210
                    config.api_server.s3_fallback_bucket,
 
211
                    str(storage_key), headers, streaming)
 
212
                try:
 
213
                    producer = yield get_response.deferred
 
214
                except twisted.web.error.Error, exc:
 
215
                    yield self._handle_s3_errors(Failure(exc), user)
 
216
            else:
 
217
                yield self._handle_s3_errors(Failure(exc), user)
 
218
 
 
219
        producer.deferred.addErrback(self._handle_s3_errors, user)
 
220
        defer.returnValue(producer)
 
221
 
 
222
    def _context_msg(self, user):
 
223
        """Return a string with the context."""
 
224
        if user.protocols:
 
225
            session_ids = ','.join([str(p.session_id) for p in user.protocols])
 
226
        else:
 
227
            session_ids = 'No sessions?'
 
228
        context = dict(user_id=user.id,
 
229
                       username=user.username.replace('%', '%%'),
 
230
                       session_ids=session_ids, node_id=str(self.id))
 
231
        return ("%(session_ids)s - %(username)s (%(user_id)s) - "
 
232
                "node_id=%(node_id)s" % context)
 
233
 
 
234
    def _handle_s3_errors(self, failure, user):
 
235
        """Transform s3 errors in something more appropiate.
 
236
 
 
237
        s3 errors include all the errors in twisted.web.error, and also
 
238
        specifically twisted.internet.error.TimeoutError.
 
239
 
 
240
        Always return a failure or raise an exception.
 
241
        """
 
242
        if not failure.check(twisted.web.error.Error,
 
243
                             twisted.internet.error.TimeoutError):
 
244
            # not an s3 error
 
245
            return failure
 
246
 
 
247
        ctx = self._context_msg(user)
 
248
        if failure.check(twisted.internet.error.TimeoutError):
 
249
            self.logger.warning("%s - s3 threw TimeoutError while sending data"
 
250
                                " (returning TRY_AGAIN)", ctx)
 
251
            raise errors.S3DownloadError(
 
252
                failure.value, "TimeoutError while downloading data.")
 
253
 
 
254
        # ok, a web error
 
255
        status = failure.value.status
 
256
        if status == str(http.REQUESTED_RANGE_NOT_SATISFIABLE):
 
257
            self.logger.warning("%s - s3 returned a status code of 416.", ctx)
 
258
            raise errors.NotAvailable(failure.getErrorMessage())
 
259
        if status == str(http.NOT_FOUND):
 
260
            self.logger.warning("%s - s3 returned a status code of 404.", ctx)
 
261
            raise errors.NotAvailable(failure.getErrorMessage())
 
262
        else:
 
263
            self.logger.warning("%s - s3 threw %s while sending data "
 
264
                                "(returning TRY_AGAIN) with response: %r",
 
265
                                ctx, status, failure.value.response)
 
266
            msg = "Web error (status %s) while downloading data."
 
267
            raise errors.S3DownloadError(
 
268
                failure.value, msg % failure.value.status)
 
269
 
 
270
 
 
271
class DBUploadJob(object):
 
272
    """A proxy for Upload model objects."""
 
273
 
 
274
    def __init__(self, user, volume_id, node_id, uploadjob_id, uploaded_bytes,
 
275
                 multipart_id, multipart_key, chunk_count, inflated_size,
 
276
                 crc32, hash_context, magic_hash_context, decompress_context,
 
277
                 when_last_active):
 
278
        self.__dict__ = locals()
 
279
 
 
280
    @classmethod
 
281
    def get(cls, user, volume_id, node_id, uploadjob_id, hash_value, crc32,
 
282
            inflated_size, deflated_size):
 
283
        """Get a multipart upload job."""
 
284
        data = dict(user=user, volume_id=volume_id,
 
285
                    node_id=node_id)
 
286
        kwargs = dict(user_id=user.id, volume_id=volume_id, node_id=node_id,
 
287
                      uploadjob_id=uploadjob_id,
 
288
                      hash_value=hash_value, crc32=crc32,
 
289
                      inflated_size=inflated_size, deflated_size=deflated_size)
 
290
        d = user.rpc_dal.call('get_uploadjob', **kwargs)
 
291
        d.addCallback(lambda r: r.update(data) or r)
 
292
        d.addCallback(lambda r: cls(**r))
 
293
        return d
 
294
 
 
295
    @classmethod
 
296
    def make(cls, user, volume_id, node_id, previous_hash,
 
297
             hash_value, crc32, inflated_size, deflated_size, multipart_key):
 
298
        """Make an upload job."""
 
299
        data = dict(user=user, volume_id=volume_id,
 
300
                    node_id=node_id, multipart_key=multipart_key)
 
301
        kwargs = dict(user_id=user.id, volume_id=volume_id, node_id=node_id,
 
302
                      previous_hash=previous_hash,
 
303
                      hash_value=hash_value, crc32=crc32,
 
304
                      inflated_size=inflated_size,
 
305
                      deflated_size=deflated_size, multipart_key=multipart_key)
 
306
        d = user.rpc_dal.call('make_uploadjob', **kwargs)
 
307
        d.addCallback(lambda r: r.update(data) or r)
 
308
        d.addCallback(lambda r: cls(**r))
 
309
        return d
 
310
 
 
311
    def set_multipart_id(self, multipart_id):
 
312
        """Set the multipart id for the upload job."""
 
313
        self.multipart_id = multipart_id
 
314
        return self.user.rpc_dal.call('set_uploadjob_multipart_id',
 
315
                                      user_id=self.user.id,
 
316
                                      uploadjob_id=self.uploadjob_id,
 
317
                                      multipart_id=multipart_id)
 
318
 
 
319
    def add_part(self, chunk_size, inflated_size, crc32,
 
320
                 hash_context, magic_hash_context, decompress_context):
 
321
        """Add a part to an upload job."""
 
322
        kwargs = dict(user_id=self.user.id, uploadjob_id=self.uploadjob_id,
 
323
                      chunk_size=chunk_size, inflated_size=inflated_size,
 
324
                      crc32=crc32, hash_context=hash_context,
 
325
                      magic_hash_context=magic_hash_context,
 
326
                      decompress_context=decompress_context)
 
327
        return self.user.rpc_dal.call('add_part_to_uploadjob', **kwargs)
 
328
 
 
329
    def delete(self):
 
330
        """Delete an upload job."""
 
331
        return self.user.rpc_dal.call('delete_uploadjob', user_id=self.user.id,
 
332
                                      uploadjob_id=self.uploadjob_id)
 
333
 
 
334
    @defer.inlineCallbacks
 
335
    def touch(self):
 
336
        """Touch an upload job."""
 
337
        r = yield self.user.rpc_dal.call('touch_uploadjob',
 
338
                                         user_id=self.user.id,
 
339
                                         uploadjob_id=self.uploadjob_id)
 
340
        self.when_last_active = r['when_last_active']
 
341
 
 
342
 
 
343
class BaseUploadJob(object):
 
344
    """Main interface for Uploads."""
 
345
 
 
346
    _inflated_size_hint_mismatch = "Inflated size does not match hint."
 
347
    _deflated_size_hint_mismatch = "Deflated size does not match hint."
 
348
    _content_hash_hint_mismatch = "Content hash does not match hint."
 
349
    _magic_hash_hint_mismatch = "Magic hash does not match hint."
 
350
    _crc32_hint_mismatch = "Crc32 does not match hint."
 
351
 
 
352
    def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
 
353
                 inflated_size_hint, deflated_size_hint, session_id,
 
354
                 blob_exists, magic_hash):
 
355
        node_hash = file_node.content_hash
 
356
        if (node_hash or previous_hash) and \
 
357
                node_hash != previous_hash and node_hash != hash_hint:
 
358
            raise errors.ConflictError("Previous hash does not match.")
 
359
 
 
360
        self.user = user
 
361
        self.session_id = session_id
 
362
        self.magic_hash = magic_hash
 
363
        self.producer = None
 
364
        self.factory = None
 
365
        self.deferred = defer.Deferred()
 
366
        self.s3 = None
 
367
        self._initial_data = True
 
368
        # this are used to track keys uploaded into s3
 
369
        self._storage_key = None
 
370
        self.canceling = False
 
371
        self.logger = logging.getLogger('storage.server')
 
372
 
 
373
        self.original_file_hash = node_hash
 
374
        self.hash_hint = hash_hint
 
375
        self.crc32_hint = crc32_hint
 
376
        self.inflated_size_hint = inflated_size_hint
 
377
        self.deflated_size_hint = deflated_size_hint
 
378
        self.file_node = file_node
 
379
        self.blob_exists = blob_exists
 
380
 
 
381
    @property
 
382
    def buffers_size(self):
 
383
        """The total size of the buffer in this upload."""
 
384
        raise NotImplementedError("subclass responsability.")
 
385
 
 
386
    @property
 
387
    def upload_id(self):
 
388
        """Return the upload_id for this upload job."""
 
389
        raise NotImplementedError("subclass responsability.")
 
390
 
 
391
    @property
 
392
    def offset(self):
 
393
        """The offset of this upload."""
 
394
        raise NotImplementedError("subclass responsability.")
 
395
 
 
396
    @property
 
397
    def inflated_size(self):
 
398
        """The inflated size of this upload."""
 
399
        return self.producer.inflated_size
 
400
 
 
401
    @property
 
402
    def crc32(self):
 
403
        """The crc32 of this upload."""
 
404
        return self.producer.crc32
 
405
 
 
406
    @property
 
407
    def hash_object(self):
 
408
        """The hash_object of this upload."""
 
409
        return self.producer.hash_object
 
410
 
 
411
    @property
 
412
    def magic_hash_object(self):
 
413
        """The magic_hash_object of this upload."""
 
414
        return self.producer.magic_hash_object
 
415
 
 
416
    @defer.inlineCallbacks
 
417
    def connect(self):
 
418
        """Setup the producer and consumer (S3 connection)."""
 
419
        if self.s3 is None:
 
420
            self.s3 = self.user.manager.factory.s3()
 
421
        if self.blob_exists:
 
422
            # we have a storage object like this already
 
423
            # wrap the S3Producer to only hash and discard the bytes
 
424
            self.producer = upload.ProxyHashingProducer(self.producer)
 
425
            self.factory = NullConsumer()
 
426
            self.factory.registerProducer(self.producer)
 
427
            self.producer.resumeProducing()
 
428
            self.deferred.callback(None)
 
429
        else:
 
430
            # we need to upload this content, get ready for it
 
431
            d = self._start_receiving()
 
432
            d.addErrback(self._handle_connection_done)
 
433
            d.addErrback(self._handle_s3_errors)
 
434
            yield d
 
435
            # at this point, self.factory is set...if it's not it's ok to
 
436
            # blowup
 
437
            self.factory.deferred.addErrback(self._handle_connection_done)
 
438
            self.factory.deferred.addErrback(self._handle_s3_errors)
 
439
            self.factory.deferred.chainDeferred(self.deferred)
 
440
 
 
441
    def _start_receiving(self):
 
442
        """Prepare the upload job to start receiving streaming bytes."""
 
443
        # generate a new storage_key for this upload
 
444
        self._storage_key = uuid.uuid4()
 
445
        # replace self.producer with a hashing proxy
 
446
        self.producer = upload.ProxyHashingProducer(self.producer)
 
447
        self.factory = self.s3.put(
 
448
            config.api_server.s3_bucket,
 
449
            str(self._storage_key), self.producer,
 
450
            headers={'Content-Length': str(self.deflated_size_hint)},
 
451
            streaming=True)
 
452
        return self.factory.connect_deferred
 
453
 
 
454
    def add_data(self, data):
 
455
        """Add data to this upload."""
 
456
        # add data is called by the server with the bytes that arrive in a
 
457
        # packet. This is at most MAX_MESSAGE_SIZE bytes (2**16, 65k at the
 
458
        # moment).
 
459
        # zlib has a theoretical limit of compression of 1032:1, so this
 
460
        # means that at most we will get a 1032*2**16 ~= 64MB, meaning that
 
461
        # the memory usage for this has a maximum.
 
462
        # """the theoretical limit for the zlib format (as opposed to its
 
463
        # implementation in the currently available sources) is 1032:1."""
 
464
        # http://zlib.net/zlib_tech.html
 
465
        self.producer.dataReceived(data)
 
466
        if (not self.blob_exists and
 
467
                self.buffers_size >= config.api_server.upload_buffer_max_size):
 
468
            raise errors.BufferLimit("Buffer limit reached.")
 
469
 
 
470
    def registerProducer(self, producer):
 
471
        """Register a producer, this is the client connection."""
 
472
        # by default do nothing, only MultipartUploadJob care about this.
 
473
        pass
 
474
 
 
475
    def unregisterProducer(self):
 
476
        """Unregister the producer."""
 
477
        # by default do nothing, only MultipartUploadJob care about this.
 
478
        pass
 
479
 
 
480
    def cancel(self):
 
481
        """Cancel this upload job."""
 
482
        return self._stop_producer_and_factory()
 
483
 
 
484
    @defer.inlineCallbacks
 
485
    def _stop_producer_and_factory(self):
 
486
        """Cancel this upload job.
 
487
 
 
488
        - Unregister producer.
 
489
        - Stop the producer if not yet stopped.
 
490
        - Cancel the factory if one exists.
 
491
        """
 
492
        self.unregisterProducer()
 
493
        self.canceling = True
 
494
        if self.producer is not None:
 
495
            # upload already started
 
496
            try:
 
497
                self.producer.stopProducing()
 
498
            except ProducerStopped:
 
499
                # dont't care if stopped it in the middle, we're
 
500
                # canceling!
 
501
                pass
 
502
        if self.factory is not None:
 
503
            yield self.factory.cancel()
 
504
 
 
505
    def stop(self):
 
506
        """Stop the upload and cleanup."""
 
507
        return self._stop_producer_and_factory()
 
508
 
 
509
    def _handle_connection_done(self, failure):
 
510
        """Process error states encountered by producers and consumers """
 
511
        if failure.check(twisted.internet.error.ConnectionDone):
 
512
            # if we're on the canceling pathway, we expect this
 
513
            if self.canceling:
 
514
                return
 
515
            raise errors.UploadCanceled("Connection closed prematurely.")
 
516
        return failure
 
517
 
 
518
    def _handle_s3_errors(self, failure):
 
519
        """Handle s3lib twisted.web.error.Error and TimeoutError."""
 
520
 
 
521
        def context_msg():
 
522
            """Return a str with the context for this upload."""
 
523
            session_ids = ''
 
524
            if self.user.protocols:
 
525
                session_ids = ','.join([str(p.session_id)
 
526
                                        for p in self.user.protocols])
 
527
            upload_context = dict(
 
528
                user_id=self.user.id,
 
529
                username=self.user.username.replace('%', '%%'),
 
530
                session_ids=session_ids or 'No sessions?',
 
531
                volume_id=self.file_node.volume_id,
 
532
                node_id=self.file_node.id,
 
533
                bytes_received=(self.producer.bytes_received
 
534
                                if self.producer else 0),
 
535
                bytes_sent=(self.producer.bytes_sent
 
536
                            if self.producer else 0))
 
537
            context_msg = (
 
538
                '%(session_ids)s - %(username)s (%(user_id)s) - ' +
 
539
                'node: %(volume_id)s::%(node_id)s - ' +
 
540
                'recv: %(bytes_received)s - sent: %(bytes_sent)s - ')
 
541
            return context_msg % upload_context
 
542
        if failure.check(twisted.web.error.Error):
 
543
            self.logger.warning(context_msg() + "s3 threw %s while receiving "
 
544
                                "data (returning TRY_AGAIN) with response: %r",
 
545
                                failure.value.status, failure.value.response)
 
546
            raise errors.S3UploadError(failure.value,
 
547
                                       "Web error (status %s) while uploading"
 
548
                                       " data." % (failure.value.status,))
 
549
        elif failure.check(twisted.internet.error.TimeoutError,
 
550
                           twisted.internet.error.ConnectionLost):
 
551
            self.logger.warning(context_msg() + "S3 connection threw a "
 
552
                                "TimeoutError while receiving data (returning "
 
553
                                "TRY_AGAIN) ")
 
554
            raise errors.S3UploadError(failure.value)
 
555
 
 
556
        return failure
 
557
 
 
558
    def flush_decompressor(self):
 
559
        """Flush the decompresor and handle the data."""
 
560
        # by default the producer do the hashing
 
561
        self.producer.flush_decompressor()
 
562
 
 
563
    def commit(self, put_result):
 
564
        """Simple commit, overwrite for more detailed behaviour."""
 
565
        return self._commit(put_result)
 
566
 
 
567
    @defer.inlineCallbacks
 
568
    def _commit(self, put_result):
 
569
        """Make this upload the current content for the node."""
 
570
        if self.producer is not None:
 
571
            assert self.producer.finished, "producer hasn't finished"
 
572
        self.flush_decompressor()
 
573
        # size matches hint
 
574
        if self.deflated_size != self.deflated_size_hint:
 
575
            raise errors.UploadCorrupt(self._deflated_size_hint_mismatch)
 
576
        if self.inflated_size != self.inflated_size_hint:
 
577
            raise errors.UploadCorrupt(self._inflated_size_hint_mismatch)
 
578
 
 
579
        # get the magic hash value here, don't log it, don't save it
 
580
        magic_hash = self.magic_hash_object.content_hash()
 
581
        magic_hash_value = magic_hash._magic_hash
 
582
        # magic hash should match the one sent by the client
 
583
        if self.magic_hash is not None and magic_hash_value != self.magic_hash:
 
584
            raise errors.UploadCorrupt(self._magic_hash_hint_mismatch)
 
585
 
 
586
        # hash matches hint
 
587
        hash = self.hash_object.content_hash()
 
588
        if hash != self.hash_hint:
 
589
            raise errors.UploadCorrupt(self._content_hash_hint_mismatch)
 
590
 
 
591
        # crc matches hint
 
592
        if self.crc32 != self.crc32_hint:
 
593
            raise errors.UploadCorrupt(self._crc32_hint_mismatch)
 
594
 
 
595
        storage_key = self._storage_key
 
596
        if storage_key is None:
 
597
            storage_key = self.file_node.storage_key
 
598
        if storage_key is None and self.inflated_size == 0:
 
599
            storage_key = ZERO_LENGTH_CONTENT_KEY
 
600
 
 
601
        new_gen = yield self._commit_content(storage_key, magic_hash_value)
 
602
        defer.returnValue(new_gen)
 
603
 
 
604
    @defer.inlineCallbacks
 
605
    def _commit_content(self, storage_key, magic_hash):
 
606
        """Commit the content in the DAL."""
 
607
        kwargs = dict(user_id=self.user.id, node_id=self.file_node.id,
 
608
                      volume_id=self.file_node.volume_id,
 
609
                      original_hash=self.original_file_hash,
 
610
                      hash_hint=self.hash_hint, crc32_hint=self.crc32_hint,
 
611
                      inflated_size_hint=self.inflated_size_hint,
 
612
                      deflated_size_hint=self.deflated_size_hint,
 
613
                      storage_key=storage_key, magic_hash=magic_hash,
 
614
                      session_id=self.session_id)
 
615
        try:
 
616
            r = yield self.user.rpc_dal.call('make_content', **kwargs)
 
617
        except dataerrors.ContentMissing:
 
618
            raise errors.TryAgain("Content missing on commit content.")
 
619
        except dataerrors.HashMismatch:
 
620
            raise errors.ConflictError("The File changed while uploading.")
 
621
        defer.returnValue(r['generation'])
 
622
 
 
623
 
 
624
class UploadJob(BaseUploadJob):
 
625
    """A non-resumable upload job."""
 
626
 
 
627
    def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
 
628
                 inflated_size_hint, deflated_size_hint,
 
629
                 session_id, blob_exists, magic_hash):
 
630
        super(UploadJob, self).__init__(user, file_node, previous_hash,
 
631
                                        hash_hint, crc32_hint,
 
632
                                        inflated_size_hint, deflated_size_hint,
 
633
                                        session_id, blob_exists,
 
634
                                        magic_hash)
 
635
        self.deflated_size = 0
 
636
        self.producer = S3Producer(self.deflated_size_hint)
 
637
 
 
638
    @property
 
639
    def upload_id(self):
 
640
        """Return the upload_id for this upload job."""
 
641
        return ''
 
642
 
 
643
    @property
 
644
    def offset(self):
 
645
        return 0
 
646
 
 
647
    @property
 
648
    def buffers_size(self):
 
649
        """The size of the producer buffer in this upload."""
 
650
        if self.producer:
 
651
            return self.producer.buffer_size
 
652
        else:
 
653
            return 0
 
654
 
 
655
    def add_data(self, data):
 
656
        super(UploadJob, self).add_data(data)
 
657
        self.deflated_size += len(data)
 
658
 
 
659
 
 
660
class MagicUploadJob(BaseUploadJob):
 
661
    """The magic upload job.
 
662
 
 
663
    Its initial offset is the size itself (no data should be added), all
 
664
    that is required for the upload is known at the beginning.  The only
 
665
    real action here is the commit.
 
666
    """
 
667
 
 
668
    def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
 
669
                 inflated_size_hint, deflated_size_hint,
 
670
                 storage_key, magic_hash, session_id, blob_exists):
 
671
        super(MagicUploadJob, self).__init__(user, file_node, previous_hash,
 
672
                                             hash_hint, crc32_hint,
 
673
                                             inflated_size_hint,
 
674
                                             deflated_size_hint,
 
675
                                             session_id, blob_exists,
 
676
                                             magic_hash)
 
677
        self.storage_key = storage_key
 
678
        # all already done!
 
679
        self.deferred.callback(None)
 
680
 
 
681
    @property
 
682
    def upload_id(self):
 
683
        """Return the upload_id for this upload job."""
 
684
        return ''
 
685
 
 
686
    @property
 
687
    def offset(self):
 
688
        """The initial offset is all the file."""
 
689
        return self.deflated_size_hint
 
690
 
 
691
    @property
 
692
    def buffers_size(self):
 
693
        """The size of the producer buffer in this upload."""
 
694
        return 0
 
695
 
 
696
    def add_data(self, data):
 
697
        """No data should be added!"""
 
698
        raise RuntimeError("No data should be added to the MagicUploadJob!")
 
699
 
 
700
    def connect(self):
 
701
        """Nothing to do."""
 
702
 
 
703
    def commit(self, put_result):
 
704
        """Make this upload the current content for the node."""
 
705
        return self._commit_content(self.storage_key, self.magic_hash)
 
706
 
 
707
 
 
708
class MultipartUploadJob(BaseUploadJob):
 
709
    """A multipart/resumable upload job."""
 
710
 
 
711
    def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
 
712
                 inflated_size_hint, deflated_size_hint, uploadjob, session_id,
 
713
                 blob_exists, magic_hash):
 
714
        super(MultipartUploadJob, self).__init__(
 
715
            user, file_node, previous_hash, hash_hint, crc32_hint,
 
716
            inflated_size_hint, deflated_size_hint, session_id, blob_exists,
 
717
            magic_hash)
 
718
        self.mp_upload = None
 
719
        self.uploadjob = uploadjob
 
720
 
 
721
    @property
 
722
    def buffers_size(self):
 
723
        """The size of the producer buffer in this upload."""
 
724
        size = 0
 
725
        if self.factory:
 
726
            size = self.factory.buffer_size
 
727
        return size
 
728
 
 
729
    @property
 
730
    def upload_id(self):
 
731
        """Return the upload_id for this upload job."""
 
732
        return self.multipart_key_name
 
733
 
 
734
    @property
 
735
    def offset(self):
 
736
        """Return the offset."""
 
737
        return self.uploadjob.uploaded_bytes
 
738
 
 
739
    @property
 
740
    def deflated_size(self):
 
741
        """Return the deflated_size."""
 
742
        return self.factory.deflated_size
 
743
 
 
744
    @property
 
745
    def inflated_size(self):
 
746
        """Return the inflated_size."""
 
747
        return self.factory.inflated_size
 
748
 
 
749
    @property
 
750
    def crc32(self):
 
751
        """Return the crc32."""
 
752
        return self.factory.crc32
 
753
 
 
754
    @property
 
755
    def hash_object(self):
 
756
        """Return the hash_object."""
 
757
        return self.factory.hash_object
 
758
 
 
759
    @property
 
760
    def magic_hash_object(self):
 
761
        """Return the magic_hash_object."""
 
762
        return self.factory.magic_hash_object
 
763
 
 
764
    @property
 
765
    def multipart_id(self):
 
766
        """Return the multipart_id."""
 
767
        return self.uploadjob.multipart_id
 
768
 
 
769
    @property
 
770
    def multipart_key_name(self):
 
771
        """Return the multipart_key_name."""
 
772
        return self.uploadjob.multipart_key
 
773
 
 
774
    @property
 
775
    def chunk_count(self):
 
776
        """Return the chunk_count."""
 
777
        return self.uploadjob.chunk_count
 
778
 
 
779
    @defer.inlineCallbacks
 
780
    def load_s3_multipart_upload(self):
 
781
        """Fetch or create a multipart upload in S3."""
 
782
        if self.s3 is None:
 
783
            self.s3 = self.user.manager.factory.s3()
 
784
        if self.multipart_id and self.multipart_key_name:
 
785
            try:
 
786
                self.mp_upload = yield self.s3.get_multipart_upload(
 
787
                    config.api_server.s3_bucket, self.multipart_key_name,
 
788
                    self.multipart_id)
 
789
            except twisted.web.error.Error, e:
 
790
                # log in warning just to be sure everything is working in
 
791
                # production
 
792
                upload_context = dict(
 
793
                    user_id=self.user.id,
 
794
                    username=self.user.username.replace('%', '%%'),
 
795
                    volume_id=self.uploadjob.volume_id,
 
796
                    node_id=str(self.uploadjob.node_id))
 
797
                context_msg = (
 
798
                    '%(username)s - (%(user_id)s) - '
 
799
                    'node: %(volume_id)s::%(node_id)s - ') % upload_context
 
800
                self.logger.warning("%s - Multipart upload doesn't exist "
 
801
                                    "(Got %r from S3)", context_msg, e.status)
 
802
                # if we get 404, this means that the upload was cancelled or
 
803
                # completed, but isn't there any more.
 
804
                if e.status == '404':
 
805
                    # create a new one and reset self.multipart_id
 
806
                    self.uploadjob.multipart_id = None
 
807
                    self.mp_upload = yield self.s3.create_multipart_upload(
 
808
                        config.api_server.s3_bucket, self.multipart_key_name)
 
809
                else:
 
810
                    raise
 
811
        else:
 
812
            self.mp_upload = yield self.s3.create_multipart_upload(
 
813
                config.api_server.s3_bucket, self.multipart_key_name)
 
814
 
 
815
    @defer.inlineCallbacks
 
816
    def part_done_callback(self, chunk_count, chunk_size, inflated_size, crc32,
 
817
                           hash_context, magic_hash_context,
 
818
                           decompress_context):
 
819
        """Process the info for each uploaded part"""
 
820
        yield self.uploadjob.add_part(chunk_size, inflated_size, crc32,
 
821
                                      hash_context, magic_hash_context,
 
822
                                      decompress_context)
 
823
 
 
824
    @defer.inlineCallbacks
 
825
    def _start_receiving(self):
 
826
        """Prepare the upload job to start receiving streaming bytes."""
 
827
        # get/create the mp_upload info from/in S3
 
828
        yield self.load_s3_multipart_upload()
 
829
        # persist the multipart upload id
 
830
        if self.multipart_id is None:
 
831
            yield self.uploadjob.set_multipart_id(self.mp_upload.id)
 
832
        # generate a new storage_key for this upload
 
833
        self._storage_key = self.multipart_key_name
 
834
        self.factory = upload.MultipartUploadFactory(
 
835
            self.mp_upload, config.api_server.storage_chunk_size,
 
836
            total_size=self.deflated_size_hint,
 
837
            offset=self.offset,
 
838
            inflated_size=self.uploadjob.inflated_size,
 
839
            crc32=self.uploadjob.crc32,
 
840
            chunk_count=self.chunk_count,
 
841
            hash_context=self.uploadjob.hash_context,
 
842
            magic_hash_context=self.uploadjob.magic_hash_context,
 
843
            decompress_context=self.uploadjob.decompress_context,
 
844
            part_done_cb=self.part_done_callback)
 
845
        # the factory is also the producer
 
846
        self.producer = self.factory
 
847
        self.factory.startFactory()
 
848
        yield self.factory.connect_deferred
 
849
 
 
850
    def flush_decompressor(self):
 
851
        """Flush the decompressor object and handle pending bytes."""
 
852
        self.factory.flush_decompressor()
 
853
 
 
854
    @defer.inlineCallbacks
 
855
    def _complete_upload(self):
 
856
        """Complete the multipart upload."""
 
857
        try:
 
858
            # complete the S3 multipart upload
 
859
            yield self._retry(self.mp_upload.complete)
 
860
        except twisted.web.error.Error, e:
 
861
            # if we get 404, this means that the upload was cancelled or
 
862
            # completed, but isn't there any more.
 
863
            if e.status == '404' and self.canceling:
 
864
                raise dataerrors.DoesNotExist("The multipart upload "
 
865
                                              "doesn't exists")
 
866
            # propagate the error
 
867
            raise
 
868
 
 
869
    @defer.inlineCallbacks
 
870
    def commit(self, put_result):
 
871
        """Make this upload the current content for the node."""
 
872
        # unregister the client transport
 
873
        self.unregisterProducer()
 
874
        # if this is a real multipart upload to S3, call complete
 
875
        # this can be none if we already have the content blob and we are just
 
876
        # checking the hash
 
877
        if self.mp_upload is not None:
 
878
            try:
 
879
                yield self._complete_upload()
 
880
            except twisted.web.error.Error, e:
 
881
                # if we get 404, this means that the upload was cancelled or
 
882
                # completed, but isn't there any more.
 
883
                if e.status == '404' and self.canceling:
 
884
                    raise dataerrors.DoesNotExist("The multipart upload "
 
885
                                                  "doesn't exists")
 
886
                # propagate the error
 
887
                raise
 
888
 
 
889
        try:
 
890
            new_gen = yield super(MultipartUploadJob, self).commit(put_result)
 
891
        except Exception as ex1:
 
892
            try:
 
893
                yield self.delete()
 
894
            except Exception as ex2:
 
895
                self.logger.warning("%s: while deleting uploadjob after "
 
896
                                    "an error, %s", ex2.__class__.__name__,
 
897
                                    ex2)
 
898
            raise ex1
 
899
        else:
 
900
            try:
 
901
                yield self.delete()
 
902
            except Exception as delete_exc:
 
903
                self.logger.warning("%s: while deleting uploadjob after "
 
904
                                    "commit, %s",
 
905
                                    delete_exc.__class__.__name__, delete_exc)
 
906
        defer.returnValue(new_gen)
 
907
 
 
908
    @defer.inlineCallbacks
 
909
    def delete(self):
 
910
        """ Cancel and clean up after the current upload job."""
 
911
        try:
 
912
            yield self.uploadjob.delete()
 
913
        except dataerrors.DoesNotExist:
 
914
            pass
 
915
 
 
916
    @defer.inlineCallbacks
 
917
    def cancel(self):
 
918
        """Cancel this upload."""
 
919
        # unregister the client transport
 
920
        yield super(MultipartUploadJob, self).cancel()
 
921
 
 
922
        # delete the upload_job
 
923
        d = self.delete()
 
924
        d.addErrback(self._handle_connection_done)
 
925
        yield d
 
926
 
 
927
        # abort/cancel the mp upload in s3
 
928
        if self.mp_upload:
 
929
            try:
 
930
                yield self._retry(self.mp_upload.cancel)
 
931
            except twisted.web.error.Error, e:
 
932
                # check if the upload isn't in S3 ignore the error
 
933
                if e.status != '404':
 
934
                    raise
 
935
 
 
936
    def _retry(self, func, *args, **kwargs):
 
937
        """Retry func using MultipartRetry."""
 
938
        s3_retries = config.api_server.s3_retries
 
939
        s3_retry_wait = config.api_server.s3_retry_wait
 
940
        retrier = MultipartRetry(
 
941
            # catch timeout, tcp timeout.
 
942
            catch=(twisted.internet.error.TimeoutError,
 
943
                   twisted.internet.error.TCPTimedOutError,
 
944
                   twisted.web.error.Error),
 
945
            retries=s3_retries, retry_wait=s3_retry_wait)
 
946
        return retrier(func, *args, **kwargs)
 
947
 
 
948
    def add_data(self, data):
 
949
        """Add data to this upload."""
 
950
        # override default add_data to never raise BufferLimit
 
951
        self.producer.dataReceived(data)
 
952
 
 
953
    def registerProducer(self, producer):
 
954
        """Register a producer, this is the client connection."""
 
955
        self.producer.registerProducer(producer)
 
956
 
 
957
    def unregisterProducer(self):
 
958
        """Unregister the producer."""
 
959
        if self.producer:
 
960
            self.producer.unregisterProducer()
 
961
 
 
962
 
 
963
class User(object):
 
964
    """A proxy for model.User objects."""
 
965
 
 
966
    def __init__(self, manager, user_id,
 
967
                 root_volume_id, username, visible_name):
 
968
        self.manager = manager
 
969
        self.id = user_id
 
970
        self.root_volume_id = root_volume_id
 
971
        self.username = username
 
972
        self.visible_name = visible_name
 
973
        self.protocols = []
 
974
        self.rpc_dal = self.manager.rpcdal_client
 
975
 
 
976
    def register_protocol(self, protocol):
 
977
        """Register protocol as a connection authenticated for this user.
 
978
 
 
979
        @param protocol: the Server protocol.
 
980
        """
 
981
        self.protocols.append(protocol)
 
982
 
 
983
    def unregister_protocol(self, protocol, cleanup=None):
 
984
        """Unregister protocol.
 
985
 
 
986
        @param protocol: the Server protocol.
 
987
        """
 
988
        self.protocols.remove(protocol)
 
989
 
 
990
    def broadcast(self, message, filter=lambda _: True):
 
991
        """Send message to all connections from this user."""
 
992
        for protocol in self.protocols:
 
993
            if not filter(protocol):
 
994
                continue
 
995
            new_message = protocol_pb2.Message()
 
996
            new_message.CopyFrom(message)
 
997
            new_message.id = protocol.get_new_request_id()
 
998
            protocol.sendMessage(new_message)
 
999
            protocol.log.trace_message("NOTIFICATION:", new_message)
 
1000
 
 
1001
    @defer.inlineCallbacks
 
1002
    def get_root(self):
 
1003
        """Get the root node for this user."""
 
1004
        r = yield self.rpc_dal.call('get_root', user_id=self.id)
 
1005
        defer.returnValue((r['root_id'], r['generation']))
 
1006
 
 
1007
    @defer.inlineCallbacks
 
1008
    def get_free_bytes(self, share_id=None):
 
1009
        """Returns free space for the given share or the user volume.
 
1010
 
 
1011
        @param share_id: if provided, the id of an accepted share to the user
 
1012
        """
 
1013
        if share_id:
 
1014
            try:
 
1015
                share = yield self.rpc_dal.call(
 
1016
                    'get_share', user_id=self.id, share_id=share_id)
 
1017
                owner_id = share['shared_by_id']
 
1018
            except dataerrors.DoesNotExist:
 
1019
                # There is currently a bug in the client which
 
1020
                # will allow volume_id to be passed to this method. And it
 
1021
                # will default to the free_bytes of the user. However, this
 
1022
                # method should not accept a volume_id and share_id should
 
1023
                # always be valid
 
1024
                owner_id = self.id
 
1025
        else:
 
1026
            owner_id = self.id
 
1027
        r = yield self.rpc_dal.call('get_user_quota', user_id=owner_id)
 
1028
        defer.returnValue(r['free_bytes'])
 
1029
 
 
1030
    @defer.inlineCallbacks
 
1031
    def get_storage_byte_quota(self):
 
1032
        """Returns purchased and available space for the user."""
 
1033
        r = yield self.rpc_dal.call('get_user_quota', user_id=self.id)
 
1034
        defer.returnValue((r['max_storage_bytes'], r['used_storage_bytes']))
 
1035
 
 
1036
    @defer.inlineCallbacks
 
1037
    def get_node(self, volume_id, node_id, content_hash):
 
1038
        """Get a content.Node for this node_id.
 
1039
 
 
1040
        @param: volume_id: None for the root volume, or uuid of udf or share id
 
1041
        @param node_id: an uuid object or string representing the id of the
 
1042
            we are looking for
 
1043
        @param content_hash: The current content hash of the node.
 
1044
        """
 
1045
        node = yield self.rpc_dal.call('get_node', user_id=self.id,
 
1046
                                       volume_id=volume_id, node_id=node_id)
 
1047
        if content_hash and content_hash != node['content_hash']:
 
1048
            msg = "Node is not available due to hash mismatch."
 
1049
            raise errors.NotAvailable(msg)
 
1050
 
 
1051
        if node['is_file'] and node['crc32'] is None:
 
1052
            msg = "Node does not exist since it has no content."
 
1053
            raise dataerrors.DoesNotExist(msg)
 
1054
 
 
1055
        defer.returnValue(Node(self.manager, node))
 
1056
 
 
1057
    @defer.inlineCallbacks
 
1058
    def move(self, volume_id, node_id, new_parent_id,
 
1059
             new_name, session_id=None):
 
1060
        """Move a node.
 
1061
 
 
1062
        Returns a list of modified nodes.
 
1063
 
 
1064
        @param volume_id: the id of the udf or share, None for root.
 
1065
        @param node_id: the id of the node to move.
 
1066
        @param new_parent_id: the node id of the new parent.
 
1067
        @param new_name: the new name for node_id.
 
1068
        """
 
1069
        args = dict(user_id=self.id, volume_id=volume_id, node_id=node_id,
 
1070
                    new_name=new_name, new_parent_id=new_parent_id,
 
1071
                    session_id=session_id)
 
1072
        r = yield self.rpc_dal.call('move', **args)
 
1073
        defer.returnValue((r['generation'], r['mimetype']))
 
1074
 
 
1075
    @defer.inlineCallbacks
 
1076
    def make_dir(self, volume_id, parent_id, name, session_id=None):
 
1077
        """Create a directory.
 
1078
 
 
1079
        @param: volume_id: None for the root volume, or uuid of udf or share id
 
1080
        @param parent: the parent content.Node.
 
1081
        @param name: the name for the directory.
 
1082
        """
 
1083
        args = dict(user_id=self.id, volume_id=volume_id, parent_id=parent_id,
 
1084
                    name=name, session_id=session_id)
 
1085
        r = yield self.rpc_dal.call('make_dir', **args)
 
1086
        defer.returnValue((r['node_id'], r['generation'], r['mimetype']))
 
1087
 
 
1088
    @defer.inlineCallbacks
 
1089
    def make_file(self, volume_id, parent_id, name,
 
1090
                  session_id=None):
 
1091
        """Create a file.
 
1092
 
 
1093
        @param: volume_id: None for the root volume, or uuid of udf or share id
 
1094
        @param parent: the parent content.Node.
 
1095
        @param name: the name for the file.
 
1096
        """
 
1097
        args = dict(user_id=self.id, volume_id=volume_id, parent_id=parent_id,
 
1098
                    name=name, session_id=session_id)
 
1099
        r = yield self.rpc_dal.call('make_file', **args)
 
1100
        defer.returnValue((r['node_id'], r['generation'], r['mimetype']))
 
1101
 
 
1102
    @defer.inlineCallbacks
 
1103
    def create_udf(self, path, name, session_id=None):
 
1104
        """Creates an UDF.
 
1105
 
 
1106
        @param path: the directory of where the UDF is
 
1107
        @param name: the name of the UDF
 
1108
        @param session_id: id of the session where the event was generated
 
1109
        """
 
1110
        fullpath = pypath.join(path, name)
 
1111
        r = yield self.rpc_dal.call('create_udf', user_id=self.id,
 
1112
                                    path=fullpath, session_id=session_id)
 
1113
        defer.returnValue((r['udf_id'], r['udf_root_id'], r['udf_path']))
 
1114
 
 
1115
    @defer.inlineCallbacks
 
1116
    def delete_volume(self, volume_id, session_id=None):
 
1117
        """Deletes a volume.
 
1118
 
 
1119
        @param volume_id: the id of the share or udf.
 
1120
        @param session_id: id of the session where the event was generated.
 
1121
        """
 
1122
        yield self.rpc_dal.call('delete_volume', user_id=self.id,
 
1123
                                volume_id=volume_id, session_id=session_id)
 
1124
 
 
1125
    @defer.inlineCallbacks
 
1126
    def list_volumes(self):
 
1127
        """List all the volumes the user is involved.
 
1128
 
 
1129
        This includes the real Root, the UDFs, and the shares that were shared.
 
1130
        to her and she already accepted.
 
1131
        """
 
1132
        r = yield self.rpc_dal.call('list_volumes', user_id=self.id)
 
1133
        root_info = r['root']
 
1134
        shares = r['shares']
 
1135
        udfs = r['udfs']
 
1136
        free_bytes = r['free_bytes']
 
1137
        defer.returnValue((root_info, shares, udfs, free_bytes))
 
1138
 
 
1139
    @defer.inlineCallbacks
 
1140
    def list_shares(self):
 
1141
        """List all the shares the user is involved.
 
1142
 
 
1143
        This only returns the "from me" shares, and the "to me" shares that I
 
1144
        still didn't accept.
 
1145
        """
 
1146
        r = yield self.rpc_dal.call('list_shares', user_id=self.id,
 
1147
                                    accepted=False)
 
1148
        defer.returnValue((r['shared_by'], r['shared_to']))
 
1149
 
 
1150
    @defer.inlineCallbacks
 
1151
    def create_share(self, node_id, shared_to_username, name, access_level):
 
1152
        """Creates a share.
 
1153
 
 
1154
        @param node_id: the id of the node that will be root of the share.
 
1155
        @param shared_to_username: the username of the receiving user.
 
1156
        @param name: the name of the share.
 
1157
        @param access_level: the permissions on the share.
 
1158
        """
 
1159
        readonly = access_level == "View"
 
1160
        r = yield self.rpc_dal.call('create_share', user_id=self.id,
 
1161
                                    node_id=node_id, share_name=name,
 
1162
                                    to_username=shared_to_username,
 
1163
                                    readonly=readonly)
 
1164
        defer.returnValue(r['share_id'])
 
1165
 
 
1166
    @defer.inlineCallbacks
 
1167
    def delete_share(self, share_id):
 
1168
        """Deletes a share.
 
1169
 
 
1170
        @param share_id: the share id.
 
1171
        """
 
1172
        yield self.rpc_dal.call('delete_share',
 
1173
                                user_id=self.id, share_id=share_id)
 
1174
 
 
1175
    @defer.inlineCallbacks
 
1176
    def share_accepted(self, share_id, answer):
 
1177
        """Accepts (or not) the share.
 
1178
 
 
1179
        @param share_id: the share id.
 
1180
        @param answer: if it was accepted ("Yes") or not ("No").
 
1181
        """
 
1182
        if answer == "Yes":
 
1183
            call = 'accept_share'
 
1184
        elif answer == "No":
 
1185
            call = 'decline_share'
 
1186
        else:
 
1187
            raise ValueError("Received invalid answer: %r" % answer)
 
1188
        yield self.rpc_dal.call(call, user_id=self.id, share_id=share_id)
 
1189
 
 
1190
    @defer.inlineCallbacks
 
1191
    def unlink_node(self, volume_id, node_id, session_id=None):
 
1192
        """Unlink a node.
 
1193
 
 
1194
        @param volume_id: the id of the volume of the node.
 
1195
        @param node_id: the id of the node.
 
1196
        """
 
1197
        r = yield self.rpc_dal.call('unlink_node', user_id=self.id,
 
1198
                                    volume_id=volume_id, node_id=node_id,
 
1199
                                    session_id=session_id)
 
1200
        defer.returnValue((r['generation'], r['kind'],
 
1201
                           r['name'], r['mimetype']))
 
1202
 
 
1203
    @defer.inlineCallbacks
 
1204
    def get_upload_job(self, vol_id, node_id, previous_hash, hash_value, crc32,
 
1205
                       inflated_size, deflated_size, session_id=None,
 
1206
                       magic_hash=None, upload_id=None):
 
1207
        """Create an upload reservation for a node.
 
1208
 
 
1209
        @param vol_id: the volume id this node belongs to.
 
1210
        @param node_id: the node to upload to.
 
1211
        @param previous_hash: the current hash of the node.
 
1212
        @param hash_value: the hash of the new content.
 
1213
        @param crc32: the crc32 of the new content.
 
1214
        @param size: the uncompressed size of the new content.
 
1215
        @param deflated_size: the compressed size of the new content.
 
1216
        """
 
1217
        if previous_hash == "":
 
1218
            previous_hash = None
 
1219
 
 
1220
        # reuse the content if we can
 
1221
        r = yield self.rpc_dal.call('get_reusable_content', user_id=self.id,
 
1222
                                    hash_value=hash_value,
 
1223
                                    magic_hash=magic_hash)
 
1224
        blob_exists, storage_key = r['blob_exists'], r['storage_key']
 
1225
 
 
1226
        if storage_key is not None:
 
1227
            upload_job = yield self._get_magic_upload_job(
 
1228
                vol_id, node_id, previous_hash, hash_value,
 
1229
                crc32, inflated_size, deflated_size,
 
1230
                storage_key, magic_hash, session_id,
 
1231
                blob_exists)
 
1232
            defer.returnValue(upload_job)
 
1233
 
 
1234
        # only use multipart upload if the content does not exist and we're
 
1235
        # above the minimum file size (if configured)
 
1236
        multipart_threshold = config.api_server.multipart_threshold
 
1237
        if (not blob_exists and
 
1238
                multipart_threshold > 0 and
 
1239
                deflated_size >= multipart_threshold):
 
1240
            # this is a multipart upload
 
1241
            multipart_key = uuid.uuid4()
 
1242
            upload_job = yield self._get_multipart_upload_job(
 
1243
                vol_id, node_id, previous_hash, hash_value, crc32,
 
1244
                inflated_size, deflated_size, multipart_key,
 
1245
                session_id, upload_id, blob_exists, magic_hash)
 
1246
        else:
 
1247
            upload_job = yield self._get_upload_job(
 
1248
                vol_id, node_id,
 
1249
                previous_hash, hash_value, crc32, inflated_size,
 
1250
                deflated_size, session_id, blob_exists, magic_hash)
 
1251
        defer.returnValue(upload_job)
 
1252
 
 
1253
    @defer.inlineCallbacks
 
1254
    def _get_upload_job(self, vol_id, node_id, previous_hash, hash_value,
 
1255
                        crc32, inflated_size, deflated_size,
 
1256
                        session_id, blob_exists, magic_hash):
 
1257
        """Create an upload reservation for a node.
 
1258
 
 
1259
        @param vol_id: the volume id this node belongs to.
 
1260
        @param node_id: the node to upload to.
 
1261
        @param previous_hash: the current hash of the node.
 
1262
        @param hash_value: the hash of the new content.
 
1263
        @param crc32: the crc32 of the new content.
 
1264
        @param size: the uncompressed size of the new content.
 
1265
        @param deflated_size: the compressed size of the new content.
 
1266
        """
 
1267
        node = yield self.rpc_dal.call('get_node', user_id=self.id,
 
1268
                                       volume_id=vol_id, node_id=node_id)
 
1269
        if not node["is_file"]:
 
1270
            raise dataerrors.NoPermission("Can only put content on files.")
 
1271
        file_node = Node(self.manager, node)
 
1272
        uj = UploadJob(self, file_node, previous_hash, hash_value,
 
1273
                       crc32, inflated_size, deflated_size,
 
1274
                       session_id, blob_exists, magic_hash)
 
1275
        defer.returnValue(uj)
 
1276
 
 
1277
    @defer.inlineCallbacks
 
1278
    def _get_magic_upload_job(self, vol_id, node_id, previous_hash, hash_value,
 
1279
                              crc32, inflated_size, deflated_size, storage_key,
 
1280
                              magic_hash, session_id, blob_exists):
 
1281
        """Create a magic upload reservation for a node.
 
1282
 
 
1283
        @param vol_id: the volume id this node belongs to.
 
1284
        @param node_id: the node to upload to.
 
1285
        @param previous_hash: the current hash of the node.
 
1286
        @param hash_value: the hash of the new content.
 
1287
        @param crc32: the crc32 of the new content.
 
1288
        @param size: the uncompressed size of the new content.
 
1289
        @param deflated_size: the compressed size of the new content.
 
1290
        @param storage_key: the content's storage key
 
1291
        @param magic_hash: the magic_hash from client
 
1292
        """
 
1293
        node = yield self.rpc_dal.call('get_node', user_id=self.id,
 
1294
                                       volume_id=vol_id, node_id=node_id)
 
1295
        if not node["is_file"]:
 
1296
            raise dataerrors.NoPermission("Can only put content on files.")
 
1297
        file_node = Node(self.manager, node)
 
1298
        uj = MagicUploadJob(self, file_node, previous_hash, hash_value,
 
1299
                            crc32, inflated_size, deflated_size,
 
1300
                            storage_key, magic_hash, session_id, blob_exists)
 
1301
        defer.returnValue(uj)
 
1302
 
 
1303
    @defer.inlineCallbacks
 
1304
    def _get_multipart_upload_job(self, vol_id, node_id, previous_hash,
 
1305
                                  hash_value, crc32, inflated_size,
 
1306
                                  deflated_size, multipart_key,
 
1307
                                  session_id, upload_id, blob_exists,
 
1308
                                  magic_hash):
 
1309
        """Create an multipart upload reservation for a node.
 
1310
 
 
1311
        @param vol_id: the volume id this node belongs to.
 
1312
        @param node_id: the node to upload to.
 
1313
        @param previous_hash: the current hash of the node.
 
1314
        @param hash_value: the hash of the new content.
 
1315
        @param crc32: the crc32 of the new content.
 
1316
        @param size: the uncompressed size of the new content.
 
1317
        @param deflated_size: the compressed size of the new content.
 
1318
        @param multipart_key: the key name of the upload.
 
1319
        @param upload_id: the upload_id sent by the client.
 
1320
        """
 
1321
        node = yield self.rpc_dal.call('get_node', user_id=self.id,
 
1322
                                       volume_id=vol_id, node_id=node_id)
 
1323
        if not node["is_file"]:
 
1324
            raise dataerrors.NoPermission("Can only put content on files.")
 
1325
        file_node = Node(self.manager, node)
 
1326
 
 
1327
        upload = None
 
1328
        if upload_id:
 
1329
            # check if there is already a job.
 
1330
            try:
 
1331
                uploadid = uuid.UUID(upload_id)
 
1332
            except ValueError:
 
1333
                # invalid upload_id, just ignore it a create a new upload.
 
1334
                upload = None
 
1335
            else:
 
1336
                try:
 
1337
                    upload = yield DBUploadJob.get(self, vol_id, node_id,
 
1338
                                                   uploadid, hash_value, crc32,
 
1339
                                                   inflated_size,
 
1340
                                                   deflated_size)
 
1341
                except dataerrors.DoesNotExist:
 
1342
                    # there is no uploadjob with the specified id
 
1343
                    upload = None
 
1344
 
 
1345
        if upload is None:
 
1346
            # no uploadjob found, create a new one.
 
1347
            try:
 
1348
                upload = yield DBUploadJob.make(self, vol_id, node_id,
 
1349
                                                previous_hash, hash_value,
 
1350
                                                crc32, inflated_size,
 
1351
                                                deflated_size, multipart_key)
 
1352
            except dataerrors.HashMismatch:
 
1353
                raise errors.ConflictError("Previous hash does not match.")
 
1354
        else:
 
1355
            # update the when_last_active value.
 
1356
            yield upload.touch()
 
1357
 
 
1358
        uj = MultipartUploadJob(self, file_node, previous_hash, hash_value,
 
1359
                                crc32, inflated_size, deflated_size,
 
1360
                                upload, session_id, blob_exists, magic_hash)
 
1361
        defer.returnValue(uj)
 
1362
 
 
1363
    @defer.inlineCallbacks
 
1364
    def get_delta(self, volume_id, from_generation, limit=None):
 
1365
        """Get the delta form generation for volume_id."""
 
1366
        r = yield self.rpc_dal.call('get_delta', user_id=self.id,
 
1367
                                    volume_id=volume_id, limit=limit,
 
1368
                                    from_generation=from_generation)
 
1369
        nodes = [Node(self.manager, n) for n in r['nodes']]
 
1370
        defer.returnValue((nodes, r['vol_generation'], r['free_bytes']))
 
1371
 
 
1372
    @defer.inlineCallbacks
 
1373
    def get_from_scratch(self, volume_id, start_from_path=None, limit=None,
 
1374
                         max_generation=None):
 
1375
        """Get the list of live nodes in volume_id."""
 
1376
        r = yield self.rpc_dal.call('get_from_scratch', user_id=self.id,
 
1377
                                    volume_id=volume_id,
 
1378
                                    start_from_path=start_from_path,
 
1379
                                    limit=limit, max_generation=max_generation)
 
1380
        nodes = [Node(self.manager, n) for n in r['nodes']]
 
1381
        defer.returnValue((nodes, r['vol_generation'], r['free_bytes']))
 
1382
 
 
1383
    @defer.inlineCallbacks
 
1384
    def get_volume_id(self, node_id):
 
1385
        """Get the (client) volume_id (UDF id or root) of this node_id.
 
1386
 
 
1387
        @param node_id: an uuid object or string representing the id of the
 
1388
            we are looking for
 
1389
        """
 
1390
        r = yield self.rpc_dal.call('get_volume_id', user_id=self.id,
 
1391
                                    node_id=node_id)
 
1392
        defer.returnValue(r['volume_id'])
 
1393
 
 
1394
 
 
1395
class ContentManager(object):
 
1396
    """Manages Users."""
 
1397
 
 
1398
    def __init__(self, factory):
 
1399
        """Create a ContentManager."""
 
1400
        self.factory = factory
 
1401
        self.users = weakref.WeakValueDictionary()
 
1402
 
 
1403
    @defer.inlineCallbacks
 
1404
    def get_user_by_id(self, user_id, session_id=None, required=False):
 
1405
        """Return a user by id and session id if its connected.
 
1406
 
 
1407
        If it's not cached and required, it's retrieved from the DB.
 
1408
        """
 
1409
        user = self.users.get(user_id, None)
 
1410
        if user is None and required:
 
1411
            r = yield self.rpcdal_client.call('get_user_data', user_id=user_id,
 
1412
                                              session_id=session_id)
 
1413
            # Another task may have already updated the cache, so check again
 
1414
            user = self.users.get(user_id, None)
 
1415
            if user is None:
 
1416
                user = User(self, user_id, r['root_volume_id'],
 
1417
                            r['username'], r['visible_name'])
 
1418
                self.users[user_id] = user
 
1419
        defer.returnValue(user)