1
# Copyright 2008-2015 Canonical
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU Affero General Public License as
5
# published by the Free Software Foundation, either version 3 of the
6
# License, or (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
# For further info, check http://launchpad.net/filesync-server
18
"""Provides a layer to handle all the database objects from twisted.
20
This layer is the main interface to the RPC DAL.
25
import posixpath as pypath
31
import twisted.internet.error
32
import twisted.web.error
34
from twisted.internet import defer
35
from twisted.python.failure import Failure
36
from twisted.web import http
38
from s3lib.s3lib import ProducerStopped
39
from s3lib.producers import S3Producer, NullConsumer
40
from backends.filesync.data import errors as dataerrors
41
from config import config
42
from ubuntuone import txutils
43
from ubuntuone.storage.server import errors, upload
44
from ubuntuone.storageprotocol import protocol_pb2
46
ZERO_LENGTH_CONTENT_KEY = ""
49
class NetworkRetry(txutils.NetworkRetry):
50
"""NetworkRetry subclass that raise RetryLimitReached."""
52
def __init__(self, *args, **kwargs):
53
super(NetworkRetry, self).__init__(*args, **kwargs)
54
self.logger = logging.getLogger('storage.server')
56
def _handle_retried_exc(self, exc, retry_count, func, args, kwargs):
57
"""Handle a exception inside the retry loop. (do nothing by default)"""
58
self.logger.warning("Retrying: %s with: %s, %s - failed with: "
59
"%s('%s') - retry_count: %d", func, args, kwargs,
60
exc.__class__.__name__, exc, retry_count)
62
def _handle_exc(self, exc):
63
"""Handle a exception raised by the last retry.
65
This subclass returns a RetryLimitReached failure.
67
msg = ("Maximum retries (%i) reached. Please try again.\n"
68
"Original exception: %s: %s" % (self.retries,
69
exc.__class__.__name__,
71
return defer.fail(Failure(dataerrors.RetryLimitReached(msg),
72
dataerrors.RetryLimitReached,
76
class MultipartRetry(NetworkRetry):
77
"""NetworkRetry subclass that handle S3 errors."""
79
def _handle_retried_exc(self, exc, retry_count, func, args, kwargs):
80
"""Handle a exception inside the retry loop.
82
if it's a 404 error, bail out.
84
This method is always called from a try/except.
86
if isinstance(exc, twisted.web.error.Error):
87
# propagate 404 errors
88
if exc.status == '404':
90
super(MultipartRetry, self)._handle_retried_exc(
91
exc, retry_count, func, args, kwargs)
94
class FalseProducer(object):
95
"""Not really a producer, just deliver all the data when asked to.
97
It has all the methods to comply the Push or Pull Producer Interface,
98
but the only one implemented is resumeProducing: sends all the data.
101
def __init__(self, data):
103
self.deferred = defer.Deferred()
106
def resumeProducing(self):
107
"""Resume producing, just send all the data."""
109
self.consumer.write(self.data)
110
self.deferred.callback(True)
112
def stopProducing(self):
113
"""Stop producing."""
115
def pauseProducing(self):
116
"""Pause producing."""
120
"""StorageObject proxy."""
122
s3_retries = config.api_server.s3_retries
123
s3_retry_wait = config.api_server.s3_retry_wait
125
def __init__(self, manager, node):
128
@param manager: the ContentManager which created this object
129
@param node: a dao.StorageNode
131
self.manager = manager
133
self.volume_id = node['volume_id']
134
self.path = node['path']
135
self.name = node['name']
136
self.parent_id = node['parent_id']
137
self.is_file = node['is_file']
138
self.content_hash = node['content_hash']
139
self.size = node['size'] or 0
140
self.crc32 = node['crc32'] or 0
141
self.deflated_size = node['deflated_size'] or 0
142
self.is_live = node['is_live']
143
self.generation = node['generation']
144
self.is_public = node['is_public']
145
last_modif = node['last_modified']
147
# special cases for no content
148
if node['storage_key'] is None:
149
self.has_content = False
150
self.storage_key = ZERO_LENGTH_CONTENT_KEY
152
self.has_content = node['has_content']
153
self.storage_key = node['storage_key']
155
self.last_modified = calendar.timegm(last_modif.timetuple())
157
self.logger = logging.getLogger('storage.server')
159
@defer.inlineCallbacks
160
def get_content(self, start=None, end=None, previous_hash=None, user=None):
161
"""Get the content for this node.
163
@param start: the start offset
164
@param end: the end offset
165
@param previous_hash: not used for FileNode.
166
@param user: the user doing the request, useful for logging.
169
raise TypeError("Content can be retrieved only on Files.")
170
storage_key = self.storage_key
171
size = self.deflated_size
172
if storage_key == ZERO_LENGTH_CONTENT_KEY:
173
# we send the compressed empty string
174
producer = FalseProducer(zlib.compress(""))
175
defer.returnValue(producer)
177
if start is not None or end is not None:
178
headers = {'Range': 'bytes=%d-%d' % (
179
start if start is not None else 0,
180
end if end is not None else size)}
184
retrier = NetworkRetry(catch=twisted.internet.error.TimeoutError,
185
retries=self.s3_retries,
186
retry_wait=self.s3_retry_wait)
187
producer = yield retrier(self._get_producer,
188
config.api_server.s3_bucket,
189
str(storage_key), headers, user)
190
defer.returnValue(producer)
192
def _get_from_s3(self, bucket, key, headers, streaming):
193
"""Inner function to handle s3.get."""
194
s3 = self.manager.factory.s3()
195
return s3.get(bucket, key, headers=headers, streaming=streaming)
197
@defer.inlineCallbacks
198
def _get_producer(self, bucket, storage_key,
199
headers, user, streaming=True):
200
"""Return the content Producer."""
202
get_response = self._get_from_s3(bucket, str(storage_key),
204
producer = yield get_response.deferred
205
except twisted.web.error.Error, exc:
206
# if fallback bucket is configured, and the file was
207
# not found, try that bucket.
208
if config.api_server.s3_fallback_bucket and exc.status == '404':
209
get_response = self._get_from_s3(
210
config.api_server.s3_fallback_bucket,
211
str(storage_key), headers, streaming)
213
producer = yield get_response.deferred
214
except twisted.web.error.Error, exc:
215
yield self._handle_s3_errors(Failure(exc), user)
217
yield self._handle_s3_errors(Failure(exc), user)
219
producer.deferred.addErrback(self._handle_s3_errors, user)
220
defer.returnValue(producer)
222
def _context_msg(self, user):
223
"""Return a string with the context."""
225
session_ids = ','.join([str(p.session_id) for p in user.protocols])
227
session_ids = 'No sessions?'
228
context = dict(user_id=user.id,
229
username=user.username.replace('%', '%%'),
230
session_ids=session_ids, node_id=str(self.id))
231
return ("%(session_ids)s - %(username)s (%(user_id)s) - "
232
"node_id=%(node_id)s" % context)
234
def _handle_s3_errors(self, failure, user):
235
"""Transform s3 errors in something more appropiate.
237
s3 errors include all the errors in twisted.web.error, and also
238
specifically twisted.internet.error.TimeoutError.
240
Always return a failure or raise an exception.
242
if not failure.check(twisted.web.error.Error,
243
twisted.internet.error.TimeoutError):
247
ctx = self._context_msg(user)
248
if failure.check(twisted.internet.error.TimeoutError):
249
self.logger.warning("%s - s3 threw TimeoutError while sending data"
250
" (returning TRY_AGAIN)", ctx)
251
raise errors.S3DownloadError(
252
failure.value, "TimeoutError while downloading data.")
255
status = failure.value.status
256
if status == str(http.REQUESTED_RANGE_NOT_SATISFIABLE):
257
self.logger.warning("%s - s3 returned a status code of 416.", ctx)
258
raise errors.NotAvailable(failure.getErrorMessage())
259
if status == str(http.NOT_FOUND):
260
self.logger.warning("%s - s3 returned a status code of 404.", ctx)
261
raise errors.NotAvailable(failure.getErrorMessage())
263
self.logger.warning("%s - s3 threw %s while sending data "
264
"(returning TRY_AGAIN) with response: %r",
265
ctx, status, failure.value.response)
266
msg = "Web error (status %s) while downloading data."
267
raise errors.S3DownloadError(
268
failure.value, msg % failure.value.status)
271
class DBUploadJob(object):
272
"""A proxy for Upload model objects."""
274
def __init__(self, user, volume_id, node_id, uploadjob_id, uploaded_bytes,
275
multipart_id, multipart_key, chunk_count, inflated_size,
276
crc32, hash_context, magic_hash_context, decompress_context,
278
self.__dict__ = locals()
281
def get(cls, user, volume_id, node_id, uploadjob_id, hash_value, crc32,
282
inflated_size, deflated_size):
283
"""Get a multipart upload job."""
284
data = dict(user=user, volume_id=volume_id,
286
kwargs = dict(user_id=user.id, volume_id=volume_id, node_id=node_id,
287
uploadjob_id=uploadjob_id,
288
hash_value=hash_value, crc32=crc32,
289
inflated_size=inflated_size, deflated_size=deflated_size)
290
d = user.rpc_dal.call('get_uploadjob', **kwargs)
291
d.addCallback(lambda r: r.update(data) or r)
292
d.addCallback(lambda r: cls(**r))
296
def make(cls, user, volume_id, node_id, previous_hash,
297
hash_value, crc32, inflated_size, deflated_size, multipart_key):
298
"""Make an upload job."""
299
data = dict(user=user, volume_id=volume_id,
300
node_id=node_id, multipart_key=multipart_key)
301
kwargs = dict(user_id=user.id, volume_id=volume_id, node_id=node_id,
302
previous_hash=previous_hash,
303
hash_value=hash_value, crc32=crc32,
304
inflated_size=inflated_size,
305
deflated_size=deflated_size, multipart_key=multipart_key)
306
d = user.rpc_dal.call('make_uploadjob', **kwargs)
307
d.addCallback(lambda r: r.update(data) or r)
308
d.addCallback(lambda r: cls(**r))
311
def set_multipart_id(self, multipart_id):
312
"""Set the multipart id for the upload job."""
313
self.multipart_id = multipart_id
314
return self.user.rpc_dal.call('set_uploadjob_multipart_id',
315
user_id=self.user.id,
316
uploadjob_id=self.uploadjob_id,
317
multipart_id=multipart_id)
319
def add_part(self, chunk_size, inflated_size, crc32,
320
hash_context, magic_hash_context, decompress_context):
321
"""Add a part to an upload job."""
322
kwargs = dict(user_id=self.user.id, uploadjob_id=self.uploadjob_id,
323
chunk_size=chunk_size, inflated_size=inflated_size,
324
crc32=crc32, hash_context=hash_context,
325
magic_hash_context=magic_hash_context,
326
decompress_context=decompress_context)
327
return self.user.rpc_dal.call('add_part_to_uploadjob', **kwargs)
330
"""Delete an upload job."""
331
return self.user.rpc_dal.call('delete_uploadjob', user_id=self.user.id,
332
uploadjob_id=self.uploadjob_id)
334
@defer.inlineCallbacks
336
"""Touch an upload job."""
337
r = yield self.user.rpc_dal.call('touch_uploadjob',
338
user_id=self.user.id,
339
uploadjob_id=self.uploadjob_id)
340
self.when_last_active = r['when_last_active']
343
class BaseUploadJob(object):
344
"""Main interface for Uploads."""
346
_inflated_size_hint_mismatch = "Inflated size does not match hint."
347
_deflated_size_hint_mismatch = "Deflated size does not match hint."
348
_content_hash_hint_mismatch = "Content hash does not match hint."
349
_magic_hash_hint_mismatch = "Magic hash does not match hint."
350
_crc32_hint_mismatch = "Crc32 does not match hint."
352
def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
353
inflated_size_hint, deflated_size_hint, session_id,
354
blob_exists, magic_hash):
355
node_hash = file_node.content_hash
356
if (node_hash or previous_hash) and \
357
node_hash != previous_hash and node_hash != hash_hint:
358
raise errors.ConflictError("Previous hash does not match.")
361
self.session_id = session_id
362
self.magic_hash = magic_hash
365
self.deferred = defer.Deferred()
367
self._initial_data = True
368
# this are used to track keys uploaded into s3
369
self._storage_key = None
370
self.canceling = False
371
self.logger = logging.getLogger('storage.server')
373
self.original_file_hash = node_hash
374
self.hash_hint = hash_hint
375
self.crc32_hint = crc32_hint
376
self.inflated_size_hint = inflated_size_hint
377
self.deflated_size_hint = deflated_size_hint
378
self.file_node = file_node
379
self.blob_exists = blob_exists
382
def buffers_size(self):
383
"""The total size of the buffer in this upload."""
384
raise NotImplementedError("subclass responsability.")
388
"""Return the upload_id for this upload job."""
389
raise NotImplementedError("subclass responsability.")
393
"""The offset of this upload."""
394
raise NotImplementedError("subclass responsability.")
397
def inflated_size(self):
398
"""The inflated size of this upload."""
399
return self.producer.inflated_size
403
"""The crc32 of this upload."""
404
return self.producer.crc32
407
def hash_object(self):
408
"""The hash_object of this upload."""
409
return self.producer.hash_object
412
def magic_hash_object(self):
413
"""The magic_hash_object of this upload."""
414
return self.producer.magic_hash_object
416
@defer.inlineCallbacks
418
"""Setup the producer and consumer (S3 connection)."""
420
self.s3 = self.user.manager.factory.s3()
422
# we have a storage object like this already
423
# wrap the S3Producer to only hash and discard the bytes
424
self.producer = upload.ProxyHashingProducer(self.producer)
425
self.factory = NullConsumer()
426
self.factory.registerProducer(self.producer)
427
self.producer.resumeProducing()
428
self.deferred.callback(None)
430
# we need to upload this content, get ready for it
431
d = self._start_receiving()
432
d.addErrback(self._handle_connection_done)
433
d.addErrback(self._handle_s3_errors)
435
# at this point, self.factory is set...if it's not it's ok to
437
self.factory.deferred.addErrback(self._handle_connection_done)
438
self.factory.deferred.addErrback(self._handle_s3_errors)
439
self.factory.deferred.chainDeferred(self.deferred)
441
def _start_receiving(self):
442
"""Prepare the upload job to start receiving streaming bytes."""
443
# generate a new storage_key for this upload
444
self._storage_key = uuid.uuid4()
445
# replace self.producer with a hashing proxy
446
self.producer = upload.ProxyHashingProducer(self.producer)
447
self.factory = self.s3.put(
448
config.api_server.s3_bucket,
449
str(self._storage_key), self.producer,
450
headers={'Content-Length': str(self.deflated_size_hint)},
452
return self.factory.connect_deferred
454
def add_data(self, data):
455
"""Add data to this upload."""
456
# add data is called by the server with the bytes that arrive in a
457
# packet. This is at most MAX_MESSAGE_SIZE bytes (2**16, 65k at the
459
# zlib has a theoretical limit of compression of 1032:1, so this
460
# means that at most we will get a 1032*2**16 ~= 64MB, meaning that
461
# the memory usage for this has a maximum.
462
# """the theoretical limit for the zlib format (as opposed to its
463
# implementation in the currently available sources) is 1032:1."""
464
# http://zlib.net/zlib_tech.html
465
self.producer.dataReceived(data)
466
if (not self.blob_exists and
467
self.buffers_size >= config.api_server.upload_buffer_max_size):
468
raise errors.BufferLimit("Buffer limit reached.")
470
def registerProducer(self, producer):
471
"""Register a producer, this is the client connection."""
472
# by default do nothing, only MultipartUploadJob care about this.
475
def unregisterProducer(self):
476
"""Unregister the producer."""
477
# by default do nothing, only MultipartUploadJob care about this.
481
"""Cancel this upload job."""
482
return self._stop_producer_and_factory()
484
@defer.inlineCallbacks
485
def _stop_producer_and_factory(self):
486
"""Cancel this upload job.
488
- Unregister producer.
489
- Stop the producer if not yet stopped.
490
- Cancel the factory if one exists.
492
self.unregisterProducer()
493
self.canceling = True
494
if self.producer is not None:
495
# upload already started
497
self.producer.stopProducing()
498
except ProducerStopped:
499
# dont't care if stopped it in the middle, we're
502
if self.factory is not None:
503
yield self.factory.cancel()
506
"""Stop the upload and cleanup."""
507
return self._stop_producer_and_factory()
509
def _handle_connection_done(self, failure):
510
"""Process error states encountered by producers and consumers """
511
if failure.check(twisted.internet.error.ConnectionDone):
512
# if we're on the canceling pathway, we expect this
515
raise errors.UploadCanceled("Connection closed prematurely.")
518
def _handle_s3_errors(self, failure):
519
"""Handle s3lib twisted.web.error.Error and TimeoutError."""
522
"""Return a str with the context for this upload."""
524
if self.user.protocols:
525
session_ids = ','.join([str(p.session_id)
526
for p in self.user.protocols])
527
upload_context = dict(
528
user_id=self.user.id,
529
username=self.user.username.replace('%', '%%'),
530
session_ids=session_ids or 'No sessions?',
531
volume_id=self.file_node.volume_id,
532
node_id=self.file_node.id,
533
bytes_received=(self.producer.bytes_received
534
if self.producer else 0),
535
bytes_sent=(self.producer.bytes_sent
536
if self.producer else 0))
538
'%(session_ids)s - %(username)s (%(user_id)s) - ' +
539
'node: %(volume_id)s::%(node_id)s - ' +
540
'recv: %(bytes_received)s - sent: %(bytes_sent)s - ')
541
return context_msg % upload_context
542
if failure.check(twisted.web.error.Error):
543
self.logger.warning(context_msg() + "s3 threw %s while receiving "
544
"data (returning TRY_AGAIN) with response: %r",
545
failure.value.status, failure.value.response)
546
raise errors.S3UploadError(failure.value,
547
"Web error (status %s) while uploading"
548
" data." % (failure.value.status,))
549
elif failure.check(twisted.internet.error.TimeoutError,
550
twisted.internet.error.ConnectionLost):
551
self.logger.warning(context_msg() + "S3 connection threw a "
552
"TimeoutError while receiving data (returning "
554
raise errors.S3UploadError(failure.value)
558
def flush_decompressor(self):
559
"""Flush the decompresor and handle the data."""
560
# by default the producer do the hashing
561
self.producer.flush_decompressor()
563
def commit(self, put_result):
564
"""Simple commit, overwrite for more detailed behaviour."""
565
return self._commit(put_result)
567
@defer.inlineCallbacks
568
def _commit(self, put_result):
569
"""Make this upload the current content for the node."""
570
if self.producer is not None:
571
assert self.producer.finished, "producer hasn't finished"
572
self.flush_decompressor()
574
if self.deflated_size != self.deflated_size_hint:
575
raise errors.UploadCorrupt(self._deflated_size_hint_mismatch)
576
if self.inflated_size != self.inflated_size_hint:
577
raise errors.UploadCorrupt(self._inflated_size_hint_mismatch)
579
# get the magic hash value here, don't log it, don't save it
580
magic_hash = self.magic_hash_object.content_hash()
581
magic_hash_value = magic_hash._magic_hash
582
# magic hash should match the one sent by the client
583
if self.magic_hash is not None and magic_hash_value != self.magic_hash:
584
raise errors.UploadCorrupt(self._magic_hash_hint_mismatch)
587
hash = self.hash_object.content_hash()
588
if hash != self.hash_hint:
589
raise errors.UploadCorrupt(self._content_hash_hint_mismatch)
592
if self.crc32 != self.crc32_hint:
593
raise errors.UploadCorrupt(self._crc32_hint_mismatch)
595
storage_key = self._storage_key
596
if storage_key is None:
597
storage_key = self.file_node.storage_key
598
if storage_key is None and self.inflated_size == 0:
599
storage_key = ZERO_LENGTH_CONTENT_KEY
601
new_gen = yield self._commit_content(storage_key, magic_hash_value)
602
defer.returnValue(new_gen)
604
@defer.inlineCallbacks
605
def _commit_content(self, storage_key, magic_hash):
606
"""Commit the content in the DAL."""
607
kwargs = dict(user_id=self.user.id, node_id=self.file_node.id,
608
volume_id=self.file_node.volume_id,
609
original_hash=self.original_file_hash,
610
hash_hint=self.hash_hint, crc32_hint=self.crc32_hint,
611
inflated_size_hint=self.inflated_size_hint,
612
deflated_size_hint=self.deflated_size_hint,
613
storage_key=storage_key, magic_hash=magic_hash,
614
session_id=self.session_id)
616
r = yield self.user.rpc_dal.call('make_content', **kwargs)
617
except dataerrors.ContentMissing:
618
raise errors.TryAgain("Content missing on commit content.")
619
except dataerrors.HashMismatch:
620
raise errors.ConflictError("The File changed while uploading.")
621
defer.returnValue(r['generation'])
624
class UploadJob(BaseUploadJob):
625
"""A non-resumable upload job."""
627
def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
628
inflated_size_hint, deflated_size_hint,
629
session_id, blob_exists, magic_hash):
630
super(UploadJob, self).__init__(user, file_node, previous_hash,
631
hash_hint, crc32_hint,
632
inflated_size_hint, deflated_size_hint,
633
session_id, blob_exists,
635
self.deflated_size = 0
636
self.producer = S3Producer(self.deflated_size_hint)
640
"""Return the upload_id for this upload job."""
648
def buffers_size(self):
649
"""The size of the producer buffer in this upload."""
651
return self.producer.buffer_size
655
def add_data(self, data):
656
super(UploadJob, self).add_data(data)
657
self.deflated_size += len(data)
660
class MagicUploadJob(BaseUploadJob):
661
"""The magic upload job.
663
Its initial offset is the size itself (no data should be added), all
664
that is required for the upload is known at the beginning. The only
665
real action here is the commit.
668
def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
669
inflated_size_hint, deflated_size_hint,
670
storage_key, magic_hash, session_id, blob_exists):
671
super(MagicUploadJob, self).__init__(user, file_node, previous_hash,
672
hash_hint, crc32_hint,
675
session_id, blob_exists,
677
self.storage_key = storage_key
679
self.deferred.callback(None)
683
"""Return the upload_id for this upload job."""
688
"""The initial offset is all the file."""
689
return self.deflated_size_hint
692
def buffers_size(self):
693
"""The size of the producer buffer in this upload."""
696
def add_data(self, data):
697
"""No data should be added!"""
698
raise RuntimeError("No data should be added to the MagicUploadJob!")
703
def commit(self, put_result):
704
"""Make this upload the current content for the node."""
705
return self._commit_content(self.storage_key, self.magic_hash)
708
class MultipartUploadJob(BaseUploadJob):
709
"""A multipart/resumable upload job."""
711
def __init__(self, user, file_node, previous_hash, hash_hint, crc32_hint,
712
inflated_size_hint, deflated_size_hint, uploadjob, session_id,
713
blob_exists, magic_hash):
714
super(MultipartUploadJob, self).__init__(
715
user, file_node, previous_hash, hash_hint, crc32_hint,
716
inflated_size_hint, deflated_size_hint, session_id, blob_exists,
718
self.mp_upload = None
719
self.uploadjob = uploadjob
722
def buffers_size(self):
723
"""The size of the producer buffer in this upload."""
726
size = self.factory.buffer_size
731
"""Return the upload_id for this upload job."""
732
return self.multipart_key_name
736
"""Return the offset."""
737
return self.uploadjob.uploaded_bytes
740
def deflated_size(self):
741
"""Return the deflated_size."""
742
return self.factory.deflated_size
745
def inflated_size(self):
746
"""Return the inflated_size."""
747
return self.factory.inflated_size
751
"""Return the crc32."""
752
return self.factory.crc32
755
def hash_object(self):
756
"""Return the hash_object."""
757
return self.factory.hash_object
760
def magic_hash_object(self):
761
"""Return the magic_hash_object."""
762
return self.factory.magic_hash_object
765
def multipart_id(self):
766
"""Return the multipart_id."""
767
return self.uploadjob.multipart_id
770
def multipart_key_name(self):
771
"""Return the multipart_key_name."""
772
return self.uploadjob.multipart_key
775
def chunk_count(self):
776
"""Return the chunk_count."""
777
return self.uploadjob.chunk_count
779
@defer.inlineCallbacks
780
def load_s3_multipart_upload(self):
781
"""Fetch or create a multipart upload in S3."""
783
self.s3 = self.user.manager.factory.s3()
784
if self.multipart_id and self.multipart_key_name:
786
self.mp_upload = yield self.s3.get_multipart_upload(
787
config.api_server.s3_bucket, self.multipart_key_name,
789
except twisted.web.error.Error, e:
790
# log in warning just to be sure everything is working in
792
upload_context = dict(
793
user_id=self.user.id,
794
username=self.user.username.replace('%', '%%'),
795
volume_id=self.uploadjob.volume_id,
796
node_id=str(self.uploadjob.node_id))
798
'%(username)s - (%(user_id)s) - '
799
'node: %(volume_id)s::%(node_id)s - ') % upload_context
800
self.logger.warning("%s - Multipart upload doesn't exist "
801
"(Got %r from S3)", context_msg, e.status)
802
# if we get 404, this means that the upload was cancelled or
803
# completed, but isn't there any more.
804
if e.status == '404':
805
# create a new one and reset self.multipart_id
806
self.uploadjob.multipart_id = None
807
self.mp_upload = yield self.s3.create_multipart_upload(
808
config.api_server.s3_bucket, self.multipart_key_name)
812
self.mp_upload = yield self.s3.create_multipart_upload(
813
config.api_server.s3_bucket, self.multipart_key_name)
815
@defer.inlineCallbacks
816
def part_done_callback(self, chunk_count, chunk_size, inflated_size, crc32,
817
hash_context, magic_hash_context,
819
"""Process the info for each uploaded part"""
820
yield self.uploadjob.add_part(chunk_size, inflated_size, crc32,
821
hash_context, magic_hash_context,
824
@defer.inlineCallbacks
825
def _start_receiving(self):
826
"""Prepare the upload job to start receiving streaming bytes."""
827
# get/create the mp_upload info from/in S3
828
yield self.load_s3_multipart_upload()
829
# persist the multipart upload id
830
if self.multipart_id is None:
831
yield self.uploadjob.set_multipart_id(self.mp_upload.id)
832
# generate a new storage_key for this upload
833
self._storage_key = self.multipart_key_name
834
self.factory = upload.MultipartUploadFactory(
835
self.mp_upload, config.api_server.storage_chunk_size,
836
total_size=self.deflated_size_hint,
838
inflated_size=self.uploadjob.inflated_size,
839
crc32=self.uploadjob.crc32,
840
chunk_count=self.chunk_count,
841
hash_context=self.uploadjob.hash_context,
842
magic_hash_context=self.uploadjob.magic_hash_context,
843
decompress_context=self.uploadjob.decompress_context,
844
part_done_cb=self.part_done_callback)
845
# the factory is also the producer
846
self.producer = self.factory
847
self.factory.startFactory()
848
yield self.factory.connect_deferred
850
def flush_decompressor(self):
851
"""Flush the decompressor object and handle pending bytes."""
852
self.factory.flush_decompressor()
854
@defer.inlineCallbacks
855
def _complete_upload(self):
856
"""Complete the multipart upload."""
858
# complete the S3 multipart upload
859
yield self._retry(self.mp_upload.complete)
860
except twisted.web.error.Error, e:
861
# if we get 404, this means that the upload was cancelled or
862
# completed, but isn't there any more.
863
if e.status == '404' and self.canceling:
864
raise dataerrors.DoesNotExist("The multipart upload "
866
# propagate the error
869
@defer.inlineCallbacks
870
def commit(self, put_result):
871
"""Make this upload the current content for the node."""
872
# unregister the client transport
873
self.unregisterProducer()
874
# if this is a real multipart upload to S3, call complete
875
# this can be none if we already have the content blob and we are just
877
if self.mp_upload is not None:
879
yield self._complete_upload()
880
except twisted.web.error.Error, e:
881
# if we get 404, this means that the upload was cancelled or
882
# completed, but isn't there any more.
883
if e.status == '404' and self.canceling:
884
raise dataerrors.DoesNotExist("The multipart upload "
886
# propagate the error
890
new_gen = yield super(MultipartUploadJob, self).commit(put_result)
891
except Exception as ex1:
894
except Exception as ex2:
895
self.logger.warning("%s: while deleting uploadjob after "
896
"an error, %s", ex2.__class__.__name__,
902
except Exception as delete_exc:
903
self.logger.warning("%s: while deleting uploadjob after "
905
delete_exc.__class__.__name__, delete_exc)
906
defer.returnValue(new_gen)
908
@defer.inlineCallbacks
910
""" Cancel and clean up after the current upload job."""
912
yield self.uploadjob.delete()
913
except dataerrors.DoesNotExist:
916
@defer.inlineCallbacks
918
"""Cancel this upload."""
919
# unregister the client transport
920
yield super(MultipartUploadJob, self).cancel()
922
# delete the upload_job
924
d.addErrback(self._handle_connection_done)
927
# abort/cancel the mp upload in s3
930
yield self._retry(self.mp_upload.cancel)
931
except twisted.web.error.Error, e:
932
# check if the upload isn't in S3 ignore the error
933
if e.status != '404':
936
def _retry(self, func, *args, **kwargs):
937
"""Retry func using MultipartRetry."""
938
s3_retries = config.api_server.s3_retries
939
s3_retry_wait = config.api_server.s3_retry_wait
940
retrier = MultipartRetry(
941
# catch timeout, tcp timeout.
942
catch=(twisted.internet.error.TimeoutError,
943
twisted.internet.error.TCPTimedOutError,
944
twisted.web.error.Error),
945
retries=s3_retries, retry_wait=s3_retry_wait)
946
return retrier(func, *args, **kwargs)
948
def add_data(self, data):
949
"""Add data to this upload."""
950
# override default add_data to never raise BufferLimit
951
self.producer.dataReceived(data)
953
def registerProducer(self, producer):
954
"""Register a producer, this is the client connection."""
955
self.producer.registerProducer(producer)
957
def unregisterProducer(self):
958
"""Unregister the producer."""
960
self.producer.unregisterProducer()
964
"""A proxy for model.User objects."""
966
def __init__(self, manager, user_id,
967
root_volume_id, username, visible_name):
968
self.manager = manager
970
self.root_volume_id = root_volume_id
971
self.username = username
972
self.visible_name = visible_name
974
self.rpc_dal = self.manager.rpcdal_client
976
def register_protocol(self, protocol):
977
"""Register protocol as a connection authenticated for this user.
979
@param protocol: the Server protocol.
981
self.protocols.append(protocol)
983
def unregister_protocol(self, protocol, cleanup=None):
984
"""Unregister protocol.
986
@param protocol: the Server protocol.
988
self.protocols.remove(protocol)
990
def broadcast(self, message, filter=lambda _: True):
991
"""Send message to all connections from this user."""
992
for protocol in self.protocols:
993
if not filter(protocol):
995
new_message = protocol_pb2.Message()
996
new_message.CopyFrom(message)
997
new_message.id = protocol.get_new_request_id()
998
protocol.sendMessage(new_message)
999
protocol.log.trace_message("NOTIFICATION:", new_message)
1001
@defer.inlineCallbacks
1003
"""Get the root node for this user."""
1004
r = yield self.rpc_dal.call('get_root', user_id=self.id)
1005
defer.returnValue((r['root_id'], r['generation']))
1007
@defer.inlineCallbacks
1008
def get_free_bytes(self, share_id=None):
1009
"""Returns free space for the given share or the user volume.
1011
@param share_id: if provided, the id of an accepted share to the user
1015
share = yield self.rpc_dal.call(
1016
'get_share', user_id=self.id, share_id=share_id)
1017
owner_id = share['shared_by_id']
1018
except dataerrors.DoesNotExist:
1019
# There is currently a bug in the client which
1020
# will allow volume_id to be passed to this method. And it
1021
# will default to the free_bytes of the user. However, this
1022
# method should not accept a volume_id and share_id should
1027
r = yield self.rpc_dal.call('get_user_quota', user_id=owner_id)
1028
defer.returnValue(r['free_bytes'])
1030
@defer.inlineCallbacks
1031
def get_storage_byte_quota(self):
1032
"""Returns purchased and available space for the user."""
1033
r = yield self.rpc_dal.call('get_user_quota', user_id=self.id)
1034
defer.returnValue((r['max_storage_bytes'], r['used_storage_bytes']))
1036
@defer.inlineCallbacks
1037
def get_node(self, volume_id, node_id, content_hash):
1038
"""Get a content.Node for this node_id.
1040
@param: volume_id: None for the root volume, or uuid of udf or share id
1041
@param node_id: an uuid object or string representing the id of the
1043
@param content_hash: The current content hash of the node.
1045
node = yield self.rpc_dal.call('get_node', user_id=self.id,
1046
volume_id=volume_id, node_id=node_id)
1047
if content_hash and content_hash != node['content_hash']:
1048
msg = "Node is not available due to hash mismatch."
1049
raise errors.NotAvailable(msg)
1051
if node['is_file'] and node['crc32'] is None:
1052
msg = "Node does not exist since it has no content."
1053
raise dataerrors.DoesNotExist(msg)
1055
defer.returnValue(Node(self.manager, node))
1057
@defer.inlineCallbacks
1058
def move(self, volume_id, node_id, new_parent_id,
1059
new_name, session_id=None):
1062
Returns a list of modified nodes.
1064
@param volume_id: the id of the udf or share, None for root.
1065
@param node_id: the id of the node to move.
1066
@param new_parent_id: the node id of the new parent.
1067
@param new_name: the new name for node_id.
1069
args = dict(user_id=self.id, volume_id=volume_id, node_id=node_id,
1070
new_name=new_name, new_parent_id=new_parent_id,
1071
session_id=session_id)
1072
r = yield self.rpc_dal.call('move', **args)
1073
defer.returnValue((r['generation'], r['mimetype']))
1075
@defer.inlineCallbacks
1076
def make_dir(self, volume_id, parent_id, name, session_id=None):
1077
"""Create a directory.
1079
@param: volume_id: None for the root volume, or uuid of udf or share id
1080
@param parent: the parent content.Node.
1081
@param name: the name for the directory.
1083
args = dict(user_id=self.id, volume_id=volume_id, parent_id=parent_id,
1084
name=name, session_id=session_id)
1085
r = yield self.rpc_dal.call('make_dir', **args)
1086
defer.returnValue((r['node_id'], r['generation'], r['mimetype']))
1088
@defer.inlineCallbacks
1089
def make_file(self, volume_id, parent_id, name,
1093
@param: volume_id: None for the root volume, or uuid of udf or share id
1094
@param parent: the parent content.Node.
1095
@param name: the name for the file.
1097
args = dict(user_id=self.id, volume_id=volume_id, parent_id=parent_id,
1098
name=name, session_id=session_id)
1099
r = yield self.rpc_dal.call('make_file', **args)
1100
defer.returnValue((r['node_id'], r['generation'], r['mimetype']))
1102
@defer.inlineCallbacks
1103
def create_udf(self, path, name, session_id=None):
1106
@param path: the directory of where the UDF is
1107
@param name: the name of the UDF
1108
@param session_id: id of the session where the event was generated
1110
fullpath = pypath.join(path, name)
1111
r = yield self.rpc_dal.call('create_udf', user_id=self.id,
1112
path=fullpath, session_id=session_id)
1113
defer.returnValue((r['udf_id'], r['udf_root_id'], r['udf_path']))
1115
@defer.inlineCallbacks
1116
def delete_volume(self, volume_id, session_id=None):
1117
"""Deletes a volume.
1119
@param volume_id: the id of the share or udf.
1120
@param session_id: id of the session where the event was generated.
1122
yield self.rpc_dal.call('delete_volume', user_id=self.id,
1123
volume_id=volume_id, session_id=session_id)
1125
@defer.inlineCallbacks
1126
def list_volumes(self):
1127
"""List all the volumes the user is involved.
1129
This includes the real Root, the UDFs, and the shares that were shared.
1130
to her and she already accepted.
1132
r = yield self.rpc_dal.call('list_volumes', user_id=self.id)
1133
root_info = r['root']
1134
shares = r['shares']
1136
free_bytes = r['free_bytes']
1137
defer.returnValue((root_info, shares, udfs, free_bytes))
1139
@defer.inlineCallbacks
1140
def list_shares(self):
1141
"""List all the shares the user is involved.
1143
This only returns the "from me" shares, and the "to me" shares that I
1144
still didn't accept.
1146
r = yield self.rpc_dal.call('list_shares', user_id=self.id,
1148
defer.returnValue((r['shared_by'], r['shared_to']))
1150
@defer.inlineCallbacks
1151
def create_share(self, node_id, shared_to_username, name, access_level):
1154
@param node_id: the id of the node that will be root of the share.
1155
@param shared_to_username: the username of the receiving user.
1156
@param name: the name of the share.
1157
@param access_level: the permissions on the share.
1159
readonly = access_level == "View"
1160
r = yield self.rpc_dal.call('create_share', user_id=self.id,
1161
node_id=node_id, share_name=name,
1162
to_username=shared_to_username,
1164
defer.returnValue(r['share_id'])
1166
@defer.inlineCallbacks
1167
def delete_share(self, share_id):
1170
@param share_id: the share id.
1172
yield self.rpc_dal.call('delete_share',
1173
user_id=self.id, share_id=share_id)
1175
@defer.inlineCallbacks
1176
def share_accepted(self, share_id, answer):
1177
"""Accepts (or not) the share.
1179
@param share_id: the share id.
1180
@param answer: if it was accepted ("Yes") or not ("No").
1183
call = 'accept_share'
1184
elif answer == "No":
1185
call = 'decline_share'
1187
raise ValueError("Received invalid answer: %r" % answer)
1188
yield self.rpc_dal.call(call, user_id=self.id, share_id=share_id)
1190
@defer.inlineCallbacks
1191
def unlink_node(self, volume_id, node_id, session_id=None):
1194
@param volume_id: the id of the volume of the node.
1195
@param node_id: the id of the node.
1197
r = yield self.rpc_dal.call('unlink_node', user_id=self.id,
1198
volume_id=volume_id, node_id=node_id,
1199
session_id=session_id)
1200
defer.returnValue((r['generation'], r['kind'],
1201
r['name'], r['mimetype']))
1203
@defer.inlineCallbacks
1204
def get_upload_job(self, vol_id, node_id, previous_hash, hash_value, crc32,
1205
inflated_size, deflated_size, session_id=None,
1206
magic_hash=None, upload_id=None):
1207
"""Create an upload reservation for a node.
1209
@param vol_id: the volume id this node belongs to.
1210
@param node_id: the node to upload to.
1211
@param previous_hash: the current hash of the node.
1212
@param hash_value: the hash of the new content.
1213
@param crc32: the crc32 of the new content.
1214
@param size: the uncompressed size of the new content.
1215
@param deflated_size: the compressed size of the new content.
1217
if previous_hash == "":
1218
previous_hash = None
1220
# reuse the content if we can
1221
r = yield self.rpc_dal.call('get_reusable_content', user_id=self.id,
1222
hash_value=hash_value,
1223
magic_hash=magic_hash)
1224
blob_exists, storage_key = r['blob_exists'], r['storage_key']
1226
if storage_key is not None:
1227
upload_job = yield self._get_magic_upload_job(
1228
vol_id, node_id, previous_hash, hash_value,
1229
crc32, inflated_size, deflated_size,
1230
storage_key, magic_hash, session_id,
1232
defer.returnValue(upload_job)
1234
# only use multipart upload if the content does not exist and we're
1235
# above the minimum file size (if configured)
1236
multipart_threshold = config.api_server.multipart_threshold
1237
if (not blob_exists and
1238
multipart_threshold > 0 and
1239
deflated_size >= multipart_threshold):
1240
# this is a multipart upload
1241
multipart_key = uuid.uuid4()
1242
upload_job = yield self._get_multipart_upload_job(
1243
vol_id, node_id, previous_hash, hash_value, crc32,
1244
inflated_size, deflated_size, multipart_key,
1245
session_id, upload_id, blob_exists, magic_hash)
1247
upload_job = yield self._get_upload_job(
1249
previous_hash, hash_value, crc32, inflated_size,
1250
deflated_size, session_id, blob_exists, magic_hash)
1251
defer.returnValue(upload_job)
1253
@defer.inlineCallbacks
1254
def _get_upload_job(self, vol_id, node_id, previous_hash, hash_value,
1255
crc32, inflated_size, deflated_size,
1256
session_id, blob_exists, magic_hash):
1257
"""Create an upload reservation for a node.
1259
@param vol_id: the volume id this node belongs to.
1260
@param node_id: the node to upload to.
1261
@param previous_hash: the current hash of the node.
1262
@param hash_value: the hash of the new content.
1263
@param crc32: the crc32 of the new content.
1264
@param size: the uncompressed size of the new content.
1265
@param deflated_size: the compressed size of the new content.
1267
node = yield self.rpc_dal.call('get_node', user_id=self.id,
1268
volume_id=vol_id, node_id=node_id)
1269
if not node["is_file"]:
1270
raise dataerrors.NoPermission("Can only put content on files.")
1271
file_node = Node(self.manager, node)
1272
uj = UploadJob(self, file_node, previous_hash, hash_value,
1273
crc32, inflated_size, deflated_size,
1274
session_id, blob_exists, magic_hash)
1275
defer.returnValue(uj)
1277
@defer.inlineCallbacks
1278
def _get_magic_upload_job(self, vol_id, node_id, previous_hash, hash_value,
1279
crc32, inflated_size, deflated_size, storage_key,
1280
magic_hash, session_id, blob_exists):
1281
"""Create a magic upload reservation for a node.
1283
@param vol_id: the volume id this node belongs to.
1284
@param node_id: the node to upload to.
1285
@param previous_hash: the current hash of the node.
1286
@param hash_value: the hash of the new content.
1287
@param crc32: the crc32 of the new content.
1288
@param size: the uncompressed size of the new content.
1289
@param deflated_size: the compressed size of the new content.
1290
@param storage_key: the content's storage key
1291
@param magic_hash: the magic_hash from client
1293
node = yield self.rpc_dal.call('get_node', user_id=self.id,
1294
volume_id=vol_id, node_id=node_id)
1295
if not node["is_file"]:
1296
raise dataerrors.NoPermission("Can only put content on files.")
1297
file_node = Node(self.manager, node)
1298
uj = MagicUploadJob(self, file_node, previous_hash, hash_value,
1299
crc32, inflated_size, deflated_size,
1300
storage_key, magic_hash, session_id, blob_exists)
1301
defer.returnValue(uj)
1303
@defer.inlineCallbacks
1304
def _get_multipart_upload_job(self, vol_id, node_id, previous_hash,
1305
hash_value, crc32, inflated_size,
1306
deflated_size, multipart_key,
1307
session_id, upload_id, blob_exists,
1309
"""Create an multipart upload reservation for a node.
1311
@param vol_id: the volume id this node belongs to.
1312
@param node_id: the node to upload to.
1313
@param previous_hash: the current hash of the node.
1314
@param hash_value: the hash of the new content.
1315
@param crc32: the crc32 of the new content.
1316
@param size: the uncompressed size of the new content.
1317
@param deflated_size: the compressed size of the new content.
1318
@param multipart_key: the key name of the upload.
1319
@param upload_id: the upload_id sent by the client.
1321
node = yield self.rpc_dal.call('get_node', user_id=self.id,
1322
volume_id=vol_id, node_id=node_id)
1323
if not node["is_file"]:
1324
raise dataerrors.NoPermission("Can only put content on files.")
1325
file_node = Node(self.manager, node)
1329
# check if there is already a job.
1331
uploadid = uuid.UUID(upload_id)
1333
# invalid upload_id, just ignore it a create a new upload.
1337
upload = yield DBUploadJob.get(self, vol_id, node_id,
1338
uploadid, hash_value, crc32,
1341
except dataerrors.DoesNotExist:
1342
# there is no uploadjob with the specified id
1346
# no uploadjob found, create a new one.
1348
upload = yield DBUploadJob.make(self, vol_id, node_id,
1349
previous_hash, hash_value,
1350
crc32, inflated_size,
1351
deflated_size, multipart_key)
1352
except dataerrors.HashMismatch:
1353
raise errors.ConflictError("Previous hash does not match.")
1355
# update the when_last_active value.
1356
yield upload.touch()
1358
uj = MultipartUploadJob(self, file_node, previous_hash, hash_value,
1359
crc32, inflated_size, deflated_size,
1360
upload, session_id, blob_exists, magic_hash)
1361
defer.returnValue(uj)
1363
@defer.inlineCallbacks
1364
def get_delta(self, volume_id, from_generation, limit=None):
1365
"""Get the delta form generation for volume_id."""
1366
r = yield self.rpc_dal.call('get_delta', user_id=self.id,
1367
volume_id=volume_id, limit=limit,
1368
from_generation=from_generation)
1369
nodes = [Node(self.manager, n) for n in r['nodes']]
1370
defer.returnValue((nodes, r['vol_generation'], r['free_bytes']))
1372
@defer.inlineCallbacks
1373
def get_from_scratch(self, volume_id, start_from_path=None, limit=None,
1374
max_generation=None):
1375
"""Get the list of live nodes in volume_id."""
1376
r = yield self.rpc_dal.call('get_from_scratch', user_id=self.id,
1377
volume_id=volume_id,
1378
start_from_path=start_from_path,
1379
limit=limit, max_generation=max_generation)
1380
nodes = [Node(self.manager, n) for n in r['nodes']]
1381
defer.returnValue((nodes, r['vol_generation'], r['free_bytes']))
1383
@defer.inlineCallbacks
1384
def get_volume_id(self, node_id):
1385
"""Get the (client) volume_id (UDF id or root) of this node_id.
1387
@param node_id: an uuid object or string representing the id of the
1390
r = yield self.rpc_dal.call('get_volume_id', user_id=self.id,
1392
defer.returnValue(r['volume_id'])
1395
class ContentManager(object):
1396
"""Manages Users."""
1398
def __init__(self, factory):
1399
"""Create a ContentManager."""
1400
self.factory = factory
1401
self.users = weakref.WeakValueDictionary()
1403
@defer.inlineCallbacks
1404
def get_user_by_id(self, user_id, session_id=None, required=False):
1405
"""Return a user by id and session id if its connected.
1407
If it's not cached and required, it's retrieved from the DB.
1409
user = self.users.get(user_id, None)
1410
if user is None and required:
1411
r = yield self.rpcdal_client.call('get_user_data', user_id=user_id,
1412
session_id=session_id)
1413
# Another task may have already updated the cache, so check again
1414
user = self.users.get(user_id, None)
1416
user = User(self, user_id, r['root_volume_id'],
1417
r['username'], r['visible_name'])
1418
self.users[user_id] = user
1419
defer.returnValue(user)