~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/obj/server.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2013-08-13 10:37:13 UTC
  • mfrom: (1.2.21)
  • Revision ID: package-import@ubuntu.com-20130813103713-1ctbx4zifyljs2aq
Tags: 1.9.1-0ubuntu1
[ James Page ]
* d/control: Update VCS fields for new branch locations.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
from __future__ import with_statement
19
19
import cPickle as pickle
20
 
import errno
21
20
import os
22
21
import time
23
22
import traceback
24
23
from collections import defaultdict
25
24
from datetime import datetime
 
25
from gettext import gettext as _
26
26
from hashlib import md5
27
 
from tempfile import mkstemp
28
27
from urllib import unquote
29
 
from contextlib import contextmanager
30
28
 
31
 
from xattr import getxattr, setxattr
32
29
from eventlet import sleep, Timeout
33
30
 
34
31
from swift.common.utils import mkdirs, normalize_timestamp, public, \
35
 
    storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
36
 
    split_path, drop_buffer_cache, get_logger, write_pickle, \
 
32
    hash_path, split_path, get_logger, write_pickle, \
37
33
    config_true_value, validate_device_partition, timing_stats, \
38
 
    ThreadPool
 
34
    ThreadPool, replication
39
35
from swift.common.bufferedhttp import http_connect
40
36
from swift.common.constraints import check_object_creation, check_mount, \
41
37
    check_float, check_utf8
42
38
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
43
 
    DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
44
 
from swift.obj.base import invalidate_hash, \
45
 
    quarantine_renamer, get_hashes
 
39
    DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, \
 
40
    DiskFileDeviceUnavailable
46
41
from swift.common.http import is_success
47
42
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
48
43
    HTTPInternalServerError, HTTPNoContent, HTTPNotFound, HTTPNotModified, \
49
44
    HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
50
45
    HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, UTC, \
51
 
    HTTPInsufficientStorage, HTTPForbidden, multi_range_iterator, \
52
 
    HeaderKeyDict
 
46
    HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
 
47
    HTTPConflict
 
48
from swift.obj.diskfile import DiskFile, get_hashes
53
49
 
54
50
 
55
51
DATADIR = 'objects'
56
52
ASYNCDIR = 'async_pending'
57
 
PICKLE_PROTOCOL = 2
58
 
METADATA_KEY = 'user.swift.metadata'
59
53
MAX_OBJECT_NAME_LENGTH = 1024
60
54
# keep these lower-case
61
55
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())
62
56
 
63
57
 
64
 
def read_metadata(fd):
65
 
    """
66
 
    Helper function to read the pickled metadata from an object file.
67
 
 
68
 
    :param fd: file descriptor to load the metadata from
69
 
 
70
 
    :returns: dictionary of metadata
71
 
    """
72
 
    metadata = ''
73
 
    key = 0
74
 
    try:
75
 
        while True:
76
 
            metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
77
 
            key += 1
78
 
    except IOError:
79
 
        pass
80
 
    return pickle.loads(metadata)
81
 
 
82
 
 
83
 
def write_metadata(fd, metadata):
84
 
    """
85
 
    Helper function to write pickled metadata for an object file.
86
 
 
87
 
    :param fd: file descriptor to write the metadata
88
 
    :param metadata: metadata to write
89
 
    """
90
 
    metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
91
 
    key = 0
92
 
    while metastr:
93
 
        setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
94
 
        metastr = metastr[254:]
95
 
        key += 1
96
 
 
97
 
 
98
 
class DiskWriter(object):
99
 
    """
100
 
    Encapsulation of the write context for servicing PUT REST API
101
 
    requests. Serves as the context manager object for DiskFile's writer()
102
 
    method.
103
 
    """
104
 
    def __init__(self, disk_file, fd, tmppath, threadpool):
105
 
        self.disk_file = disk_file
106
 
        self.fd = fd
107
 
        self.tmppath = tmppath
108
 
        self.upload_size = 0
109
 
        self.last_sync = 0
110
 
        self.threadpool = threadpool
111
 
 
112
 
    def write(self, chunk):
113
 
        """
114
 
        Write a chunk of data into the temporary file.
115
 
 
116
 
        :param chunk: the chunk of data to write as a string object
117
 
        """
118
 
 
119
 
        def _write_entire_chunk(chunk):
120
 
            while chunk:
121
 
                written = os.write(self.fd, chunk)
122
 
                self.upload_size += written
123
 
                chunk = chunk[written:]
124
 
 
125
 
        self.threadpool.run_in_thread(_write_entire_chunk, chunk)
126
 
 
127
 
        # For large files sync every 512MB (by default) written
128
 
        diff = self.upload_size - self.last_sync
129
 
        if diff >= self.disk_file.bytes_per_sync:
130
 
            self.threadpool.force_run_in_thread(fdatasync, self.fd)
131
 
            drop_buffer_cache(self.fd, self.last_sync, diff)
132
 
            self.last_sync = self.upload_size
133
 
 
134
 
    def put(self, metadata, extension='.data'):
135
 
        """
136
 
        Finalize writing the file on disk, and renames it from the temp file
137
 
        to the real location.  This should be called after the data has been
138
 
        written to the temp file.
139
 
 
140
 
        :param metadata: dictionary of metadata to be written
141
 
        :param extension: extension to be used when making the file
142
 
        """
143
 
        assert self.tmppath is not None
144
 
        timestamp = normalize_timestamp(metadata['X-Timestamp'])
145
 
        metadata['name'] = self.disk_file.name
146
 
 
147
 
        def finalize_put():
148
 
            # Write the metadata before calling fsync() so that both data and
149
 
            # metadata are flushed to disk.
150
 
            write_metadata(self.fd, metadata)
151
 
            # We call fsync() before calling drop_cache() to lower the amount
152
 
            # of redundant work the drop cache code will perform on the pages
153
 
            # (now that after fsync the pages will be all clean).
154
 
            fsync(self.fd)
155
 
            # From the Department of the Redundancy Department, make sure
156
 
            # we call drop_cache() after fsync() to avoid redundant work
157
 
            # (pages all clean).
158
 
            drop_buffer_cache(self.fd, 0, self.upload_size)
159
 
            invalidate_hash(os.path.dirname(self.disk_file.datadir))
160
 
            # After the rename completes, this object will be available for
161
 
            # other requests to reference.
162
 
            renamer(self.tmppath,
163
 
                    os.path.join(self.disk_file.datadir,
164
 
                                 timestamp + extension))
165
 
 
166
 
        self.threadpool.force_run_in_thread(finalize_put)
167
 
        self.disk_file.metadata = metadata
168
 
 
169
 
 
170
 
class DiskFile(object):
171
 
    """
172
 
    Manage object files on disk.
173
 
 
174
 
    :param path: path to devices on the node
175
 
    :param device: device name
176
 
    :param partition: partition on the device the object lives in
177
 
    :param account: account name for the object
178
 
    :param container: container name for the object
179
 
    :param obj: object name for the object
180
 
    :param keep_data_fp: if True, don't close the fp, otherwise close it
181
 
    :param disk_chunk_size: size of chunks on file reads
182
 
    :param bytes_per_sync: number of bytes between fdatasync calls
183
 
    :param iter_hook: called when __iter__ returns a chunk
184
 
    :param threadpool: thread pool in which to do blocking operations
185
 
 
186
 
    :raises DiskFileCollision: on md5 collision
187
 
    """
188
 
 
189
 
    def __init__(self, path, device, partition, account, container, obj,
190
 
                 logger, keep_data_fp=False, disk_chunk_size=65536,
191
 
                 bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
192
 
                 threadpool=None):
193
 
        self.disk_chunk_size = disk_chunk_size
194
 
        self.bytes_per_sync = bytes_per_sync
195
 
        self.iter_hook = iter_hook
196
 
        self.name = '/' + '/'.join((account, container, obj))
197
 
        name_hash = hash_path(account, container, obj)
198
 
        self.datadir = os.path.join(
199
 
            path, device, storage_directory(DATADIR, partition, name_hash))
200
 
        self.device_path = os.path.join(path, device)
201
 
        self.tmpdir = os.path.join(path, device, 'tmp')
202
 
        self.logger = logger
203
 
        self.metadata = {}
204
 
        self.meta_file = None
205
 
        self.data_file = None
206
 
        self.fp = None
207
 
        self.iter_etag = None
208
 
        self.started_at_0 = False
209
 
        self.read_to_eof = False
210
 
        self.quarantined_dir = None
211
 
        self.keep_cache = False
212
 
        self.suppress_file_closing = False
213
 
        self.threadpool = threadpool or ThreadPool(nthreads=0)
214
 
        if not os.path.exists(self.datadir):
215
 
            return
216
 
        files = sorted(os.listdir(self.datadir), reverse=True)
217
 
        for afile in files:
218
 
            if afile.endswith('.ts'):
219
 
                self.data_file = self.meta_file = None
220
 
                self.metadata = {'deleted': True}
221
 
                return
222
 
            if afile.endswith('.meta') and not self.meta_file:
223
 
                self.meta_file = os.path.join(self.datadir, afile)
224
 
            if afile.endswith('.data') and not self.data_file:
225
 
                self.data_file = os.path.join(self.datadir, afile)
226
 
                break
227
 
        if not self.data_file:
228
 
            return
229
 
        self.fp = open(self.data_file, 'rb')
230
 
        self.metadata = read_metadata(self.fp)
231
 
        if not keep_data_fp:
232
 
            self.close(verify_file=False)
233
 
        if self.meta_file:
234
 
            with open(self.meta_file) as mfp:
235
 
                for key in self.metadata.keys():
236
 
                    if key.lower() not in DISALLOWED_HEADERS:
237
 
                        del self.metadata[key]
238
 
                self.metadata.update(read_metadata(mfp))
239
 
        if 'name' in self.metadata:
240
 
            if self.metadata['name'] != self.name:
241
 
                self.logger.error(_('Client path %(client)s does not match '
242
 
                                    'path stored in object metadata %(meta)s'),
243
 
                                  {'client': self.name,
244
 
                                   'meta': self.metadata['name']})
245
 
                raise DiskFileCollision('Client path does not match path '
246
 
                                        'stored in object metadata')
247
 
 
248
 
    def __iter__(self):
249
 
        """Returns an iterator over the data file."""
250
 
        try:
251
 
            dropped_cache = 0
252
 
            read = 0
253
 
            self.started_at_0 = False
254
 
            self.read_to_eof = False
255
 
            if self.fp.tell() == 0:
256
 
                self.started_at_0 = True
257
 
                self.iter_etag = md5()
258
 
            while True:
259
 
                chunk = self.threadpool.run_in_thread(
260
 
                    self.fp.read, self.disk_chunk_size)
261
 
                if chunk:
262
 
                    if self.iter_etag:
263
 
                        self.iter_etag.update(chunk)
264
 
                    read += len(chunk)
265
 
                    if read - dropped_cache > (1024 * 1024):
266
 
                        self._drop_cache(self.fp.fileno(), dropped_cache,
267
 
                                         read - dropped_cache)
268
 
                        dropped_cache = read
269
 
                    yield chunk
270
 
                    if self.iter_hook:
271
 
                        self.iter_hook()
272
 
                else:
273
 
                    self.read_to_eof = True
274
 
                    self._drop_cache(self.fp.fileno(), dropped_cache,
275
 
                                     read - dropped_cache)
276
 
                    break
277
 
        finally:
278
 
            if not self.suppress_file_closing:
279
 
                self.close()
280
 
 
281
 
    def app_iter_range(self, start, stop):
282
 
        """Returns an iterator over the data file for range (start, stop)"""
283
 
        if start or start == 0:
284
 
            self.fp.seek(start)
285
 
        if stop is not None:
286
 
            length = stop - start
287
 
        else:
288
 
            length = None
289
 
        for chunk in self:
290
 
            if length is not None:
291
 
                length -= len(chunk)
292
 
                if length < 0:
293
 
                    # Chop off the extra:
294
 
                    yield chunk[:length]
295
 
                    break
296
 
            yield chunk
297
 
 
298
 
    def app_iter_ranges(self, ranges, content_type, boundary, size):
299
 
        """Returns an iterator over the data file for a set of ranges"""
300
 
        if not ranges:
301
 
            yield ''
302
 
        else:
303
 
            try:
304
 
                self.suppress_file_closing = True
305
 
                for chunk in multi_range_iterator(
306
 
                        ranges, content_type, boundary, size,
307
 
                        self.app_iter_range):
308
 
                    yield chunk
309
 
            finally:
310
 
                self.suppress_file_closing = False
311
 
                self.close()
312
 
 
313
 
    def _handle_close_quarantine(self):
314
 
        """Check if file needs to be quarantined"""
315
 
        try:
316
 
            self.get_data_file_size()
317
 
        except DiskFileError:
318
 
            self.quarantine()
319
 
            return
320
 
        except DiskFileNotExist:
321
 
            return
322
 
 
323
 
        if self.iter_etag and self.started_at_0 and self.read_to_eof and \
324
 
                'ETag' in self.metadata and \
325
 
                self.iter_etag.hexdigest() != self.metadata.get('ETag'):
326
 
            self.quarantine()
327
 
 
328
 
    def close(self, verify_file=True):
329
 
        """
330
 
        Close the file. Will handle quarantining file if necessary.
331
 
 
332
 
        :param verify_file: Defaults to True. If false, will not check
333
 
                            file to see if it needs quarantining.
334
 
        """
335
 
        if self.fp:
336
 
            try:
337
 
                if verify_file:
338
 
                    self._handle_close_quarantine()
339
 
            except (Exception, Timeout), e:
340
 
                self.logger.error(_(
341
 
                    'ERROR DiskFile %(data_file)s in '
342
 
                    '%(data_dir)s close failure: %(exc)s : %(stack)'),
343
 
                    {'exc': e, 'stack': ''.join(traceback.format_stack()),
344
 
                     'data_file': self.data_file, 'data_dir': self.datadir})
345
 
            finally:
346
 
                self.fp.close()
347
 
                self.fp = None
348
 
 
349
 
    def is_deleted(self):
350
 
        """
351
 
        Check if the file is deleted.
352
 
 
353
 
        :returns: True if the file doesn't exist or has been flagged as
354
 
                  deleted.
355
 
        """
356
 
        return not self.data_file or 'deleted' in self.metadata
357
 
 
358
 
    def is_expired(self):
359
 
        """
360
 
        Check if the file is expired.
361
 
 
362
 
        :returns: True if the file has an X-Delete-At in the past
363
 
        """
364
 
        return ('X-Delete-At' in self.metadata and
365
 
                int(self.metadata['X-Delete-At']) <= time.time())
366
 
 
367
 
    @contextmanager
368
 
    def writer(self, size=None):
369
 
        """
370
 
        Context manager to write a file. We create a temporary file first, and
371
 
        then return a DiskWriter object to encapsulate the state.
372
 
 
373
 
        :param size: optional initial size of file to explicitly allocate on
374
 
                     disk
375
 
        :raises DiskFileNoSpace: if a size is specified and allocation fails
376
 
        """
377
 
        if not os.path.exists(self.tmpdir):
378
 
            mkdirs(self.tmpdir)
379
 
        fd, tmppath = mkstemp(dir=self.tmpdir)
380
 
        try:
381
 
            if size is not None and size > 0:
382
 
                try:
383
 
                    fallocate(fd, size)
384
 
                except OSError:
385
 
                    raise DiskFileNoSpace()
386
 
            yield DiskWriter(self, fd, tmppath, self.threadpool)
387
 
        finally:
388
 
            try:
389
 
                os.close(fd)
390
 
            except OSError:
391
 
                pass
392
 
            try:
393
 
                os.unlink(tmppath)
394
 
            except OSError:
395
 
                pass
396
 
 
397
 
    def put_metadata(self, metadata, tombstone=False):
398
 
        """
399
 
        Short hand for putting metadata to .meta and .ts files.
400
 
 
401
 
        :param metadata: dictionary of metadata to be written
402
 
        :param tombstone: whether or not we are writing a tombstone
403
 
        """
404
 
        extension = '.ts' if tombstone else '.meta'
405
 
        with self.writer() as writer:
406
 
            writer.put(metadata, extension=extension)
407
 
 
408
 
    def unlinkold(self, timestamp):
409
 
        """
410
 
        Remove any older versions of the object file.  Any file that has an
411
 
        older timestamp than timestamp will be deleted.
412
 
 
413
 
        :param timestamp: timestamp to compare with each file
414
 
        """
415
 
        timestamp = normalize_timestamp(timestamp)
416
 
 
417
 
        def _unlinkold():
418
 
            for fname in os.listdir(self.datadir):
419
 
                if fname < timestamp:
420
 
                    try:
421
 
                        os.unlink(os.path.join(self.datadir, fname))
422
 
                    except OSError, err:    # pragma: no cover
423
 
                        if err.errno != errno.ENOENT:
424
 
                            raise
425
 
        self.threadpool.run_in_thread(_unlinkold)
426
 
 
427
 
    def _drop_cache(self, fd, offset, length):
428
 
        """Method for no-oping buffer cache drop method."""
429
 
        if not self.keep_cache:
430
 
            drop_buffer_cache(fd, offset, length)
431
 
 
432
 
    def quarantine(self):
433
 
        """
434
 
        In the case that a file is corrupted, move it to a quarantined
435
 
        area to allow replication to fix it.
436
 
 
437
 
        :returns: if quarantine is successful, path to quarantined
438
 
                  directory otherwise None
439
 
        """
440
 
        if not (self.is_deleted() or self.quarantined_dir):
441
 
            self.quarantined_dir = self.threadpool.run_in_thread(
442
 
                quarantine_renamer, self.device_path, self.data_file)
443
 
            self.logger.increment('quarantines')
444
 
            return self.quarantined_dir
445
 
 
446
 
    def get_data_file_size(self):
447
 
        """
448
 
        Returns the os.path.getsize for the file.  Raises an exception if this
449
 
        file does not match the Content-Length stored in the metadata. Or if
450
 
        self.data_file does not exist.
451
 
 
452
 
        :returns: file size as an int
453
 
        :raises DiskFileError: on file size mismatch.
454
 
        :raises DiskFileNotExist: on file not existing (including deleted)
455
 
        """
456
 
        try:
457
 
            file_size = 0
458
 
            if self.data_file:
459
 
                file_size = self.threadpool.run_in_thread(
460
 
                    os.path.getsize, self.data_file)
461
 
                if 'Content-Length' in self.metadata:
462
 
                    metadata_size = int(self.metadata['Content-Length'])
463
 
                    if file_size != metadata_size:
464
 
                        raise DiskFileError(
465
 
                            'Content-Length of %s does not match file size '
466
 
                            'of %s' % (metadata_size, file_size))
467
 
                return file_size
468
 
        except OSError, err:
469
 
            if err.errno != errno.ENOENT:
470
 
                raise
471
 
        raise DiskFileNotExist('Data File does not exist.')
472
 
 
473
 
 
474
58
class ObjectController(object):
475
59
    """Implements the WSGI application for the Swift Object Server."""
476
60
 
496
80
        self.slow = int(conf.get('slow', 0))
497
81
        self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
498
82
        replication_server = conf.get('replication_server', None)
499
 
        if replication_server is None:
500
 
            allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
501
 
                               'POST']
502
 
        else:
 
83
        if replication_server is not None:
503
84
            replication_server = config_true_value(replication_server)
504
 
            if replication_server:
505
 
                allowed_methods = ['REPLICATE']
506
 
            else:
507
 
                allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
508
85
        self.replication_server = replication_server
509
 
        self.allowed_methods = allowed_methods
510
86
        self.threads_per_disk = int(conf.get('threads_per_disk', '0'))
511
87
        self.threadpools = defaultdict(
512
88
            lambda: ThreadPool(nthreads=self.threads_per_disk))
528
104
            int(conf.get('expiring_objects_container_divisor') or 86400)
529
105
 
530
106
    def _diskfile(self, device, partition, account, container, obj, **kwargs):
531
 
        """ Utility method for instantiating a DiskFile. """
 
107
        """Utility method for instantiating a DiskFile."""
 
108
        kwargs.setdefault('mount_check', self.mount_check)
532
109
        kwargs.setdefault('bytes_per_sync', self.bytes_per_sync)
533
110
        kwargs.setdefault('disk_chunk_size', self.disk_chunk_size)
534
111
        kwargs.setdefault('logger', self.logger)
535
112
        kwargs.setdefault('threadpool', self.threadpools[device])
 
113
        kwargs.setdefault('obj_dir', DATADIR)
 
114
        kwargs.setdefault('disallowed_metadata_keys', DISALLOWED_HEADERS)
536
115
        return DiskFile(self.devices, device, partition, account,
537
116
                        container, obj, **kwargs)
538
117
 
698
277
                '%s-%s/%s/%s' % (delete_at, account, container, obj),
699
278
                host, partition, contdevice, headers_out, objdevice)
700
279
 
 
280
    def _parse_path(self, request, minsegs=5, maxsegs=5):
 
281
        """
 
282
        Utility function to split and validate the request path.
 
283
 
 
284
        :returns: result of split_path if everything's okay
 
285
        :raises: HTTPBadRequest if something's not okay
 
286
        """
 
287
        try:
 
288
            segs = split_path(unquote(request.path), minsegs, maxsegs, True)
 
289
            validate_device_partition(segs[0], segs[1])
 
290
            return segs
 
291
        except ValueError as err:
 
292
            raise HTTPBadRequest(body=str(err), request=request,
 
293
                                 content_type='text/plain')
 
294
 
701
295
    @public
702
296
    @timing_stats()
703
297
    def POST(self, request):
704
298
        """Handle HTTP POST requests for the Swift Object Server."""
705
 
        try:
706
 
            device, partition, account, container, obj = \
707
 
                split_path(unquote(request.path), 5, 5, True)
708
 
            validate_device_partition(device, partition)
709
 
        except ValueError, err:
710
 
            return HTTPBadRequest(body=str(err), request=request,
711
 
                                  content_type='text/plain')
 
299
        device, partition, account, container, obj = self._parse_path(request)
 
300
 
712
301
        if 'x-timestamp' not in request.headers or \
713
302
                not check_float(request.headers['x-timestamp']):
714
303
            return HTTPBadRequest(body='Missing timestamp', request=request,
717
306
        if new_delete_at and new_delete_at < time.time():
718
307
            return HTTPBadRequest(body='X-Delete-At in past', request=request,
719
308
                                  content_type='text/plain')
720
 
        if self.mount_check and not check_mount(self.devices, device):
 
309
        try:
 
310
            disk_file = self._diskfile(device, partition, account, container,
 
311
                                       obj)
 
312
        except DiskFileDeviceUnavailable:
721
313
            return HTTPInsufficientStorage(drive=device, request=request)
722
 
        disk_file = self._diskfile(device, partition, account, container, obj)
723
314
        if disk_file.is_deleted() or disk_file.is_expired():
724
315
            return HTTPNotFound(request=request)
725
316
        try:
727
318
        except (DiskFileError, DiskFileNotExist):
728
319
            disk_file.quarantine()
729
320
            return HTTPNotFound(request=request)
 
321
        orig_timestamp = disk_file.metadata.get('X-Timestamp', '0')
 
322
        if orig_timestamp >= request.headers['x-timestamp']:
 
323
            return HTTPConflict(request=request)
730
324
        metadata = {'X-Timestamp': request.headers['x-timestamp']}
731
325
        metadata.update(val for val in request.headers.iteritems()
732
326
                        if val[0].startswith('X-Object-Meta-'))
749
343
    @timing_stats()
750
344
    def PUT(self, request):
751
345
        """Handle HTTP PUT requests for the Swift Object Server."""
752
 
        try:
753
 
            device, partition, account, container, obj = \
754
 
                split_path(unquote(request.path), 5, 5, True)
755
 
            validate_device_partition(device, partition)
756
 
        except ValueError, err:
757
 
            return HTTPBadRequest(body=str(err), request=request,
758
 
                                  content_type='text/plain')
 
346
        device, partition, account, container, obj = self._parse_path(request)
 
347
 
759
348
        if 'x-timestamp' not in request.headers or \
760
349
                not check_float(request.headers['x-timestamp']):
761
350
            return HTTPBadRequest(body='Missing timestamp', request=request,
772
361
        except ValueError as e:
773
362
            return HTTPBadRequest(body=str(e), request=request,
774
363
                                  content_type='text/plain')
775
 
        if self.mount_check and not check_mount(self.devices, device):
 
364
        try:
 
365
            disk_file = self._diskfile(device, partition, account, container,
 
366
                                       obj)
 
367
        except DiskFileDeviceUnavailable:
776
368
            return HTTPInsufficientStorage(drive=device, request=request)
777
 
        disk_file = self._diskfile(device, partition, account, container, obj)
778
369
        old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
779
370
        orig_timestamp = disk_file.metadata.get('X-Timestamp')
 
371
        if orig_timestamp and orig_timestamp >= request.headers['x-timestamp']:
 
372
            return HTTPConflict(request=request)
780
373
        upload_expiration = time.time() + self.max_upload_time
781
374
        etag = md5()
782
375
        elapsed_time = 0
846
439
    @timing_stats()
847
440
    def GET(self, request):
848
441
        """Handle HTTP GET requests for the Swift Object Server."""
 
442
        device, partition, account, container, obj = self._parse_path(request)
 
443
 
849
444
        try:
850
 
            device, partition, account, container, obj = \
851
 
                split_path(unquote(request.path), 5, 5, True)
852
 
            validate_device_partition(device, partition)
853
 
        except ValueError, err:
854
 
            return HTTPBadRequest(body=str(err), request=request,
855
 
                                  content_type='text/plain')
856
 
        if self.mount_check and not check_mount(self.devices, device):
 
445
            disk_file = self._diskfile(device, partition, account, container,
 
446
                                       obj, keep_data_fp=True, iter_hook=sleep)
 
447
        except DiskFileDeviceUnavailable:
857
448
            return HTTPInsufficientStorage(drive=device, request=request)
858
 
        disk_file = self._diskfile(device, partition, account, container, obj,
859
 
                                   keep_data_fp=True, iter_hook=sleep)
860
449
        if disk_file.is_deleted() or disk_file.is_expired():
861
450
            if request.headers.get('if-match') == '*':
862
451
                return HTTPPreconditionFailed(request=request)
924
513
    @timing_stats(sample_rate=0.8)
925
514
    def HEAD(self, request):
926
515
        """Handle HTTP HEAD requests for the Swift Object Server."""
 
516
        device, partition, account, container, obj = self._parse_path(request)
 
517
 
927
518
        try:
928
 
            device, partition, account, container, obj = \
929
 
                split_path(unquote(request.path), 5, 5, True)
930
 
            validate_device_partition(device, partition)
931
 
        except ValueError, err:
932
 
            resp = HTTPBadRequest(request=request)
933
 
            resp.content_type = 'text/plain'
934
 
            resp.body = str(err)
935
 
            return resp
936
 
        if self.mount_check and not check_mount(self.devices, device):
 
519
            disk_file = self._diskfile(device, partition, account, container,
 
520
                                       obj)
 
521
        except DiskFileDeviceUnavailable:
937
522
            return HTTPInsufficientStorage(drive=device, request=request)
938
 
        disk_file = self._diskfile(device, partition, account, container, obj)
939
523
        if disk_file.is_deleted() or disk_file.is_expired():
940
524
            return HTTPNotFound(request=request)
941
525
        try:
974
558
                not check_float(request.headers['x-timestamp']):
975
559
            return HTTPBadRequest(body='Missing timestamp', request=request,
976
560
                                  content_type='text/plain')
977
 
        if self.mount_check and not check_mount(self.devices, device):
 
561
        try:
 
562
            disk_file = self._diskfile(device, partition, account, container,
 
563
                                       obj)
 
564
        except DiskFileDeviceUnavailable:
978
565
            return HTTPInsufficientStorage(drive=device, request=request)
979
 
        response_class = HTTPNoContent
980
 
        disk_file = self._diskfile(device, partition, account, container, obj)
981
566
        if 'x-if-delete-at' in request.headers and \
982
567
                int(request.headers['x-if-delete-at']) != \
983
568
                int(disk_file.metadata.get('X-Delete-At') or 0):
984
569
            return HTTPPreconditionFailed(
985
570
                request=request,
986
571
                body='X-If-Delete-At and X-Delete-At do not match')
987
 
        orig_timestamp = disk_file.metadata.get('X-Timestamp')
988
 
        if disk_file.is_deleted() or disk_file.is_expired():
989
 
            response_class = HTTPNotFound
990
 
        metadata = {
991
 
            'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
992
 
        }
993
572
        old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
994
573
        if old_delete_at:
995
574
            self.delete_at_update('DELETE', old_delete_at, account,
996
575
                                  container, obj, request, device)
997
 
        disk_file.put_metadata(metadata, tombstone=True)
998
 
        disk_file.unlinkold(metadata['X-Timestamp'])
999
 
        if not orig_timestamp or \
1000
 
                orig_timestamp < request.headers['x-timestamp']:
 
576
        orig_timestamp = disk_file.metadata.get('X-Timestamp', 0)
 
577
        req_timestamp = request.headers['X-Timestamp']
 
578
        if disk_file.is_deleted() or disk_file.is_expired():
 
579
            response_class = HTTPNotFound
 
580
        else:
 
581
            if orig_timestamp < req_timestamp:
 
582
                response_class = HTTPNoContent
 
583
            else:
 
584
                response_class = HTTPConflict
 
585
        if orig_timestamp < req_timestamp:
 
586
            disk_file.put_metadata({'X-Timestamp': req_timestamp},
 
587
                                   tombstone=True)
 
588
            disk_file.unlinkold(req_timestamp)
1001
589
            self.container_update(
1002
590
                'DELETE', account, container, obj, request,
1003
 
                HeaderKeyDict({'x-timestamp': metadata['X-Timestamp']}),
 
591
                HeaderKeyDict({'x-timestamp': req_timestamp}),
1004
592
                device)
1005
593
        resp = response_class(request=request)
1006
594
        return resp
1007
595
 
1008
596
    @public
 
597
    @replication
1009
598
    @timing_stats(sample_rate=0.1)
1010
599
    def REPLICATE(self, request):
1011
600
        """
1012
601
        Handle REPLICATE requests for the Swift Object Server.  This is used
1013
602
        by the object replicator to get hashes for directories.
1014
603
        """
1015
 
        try:
1016
 
            device, partition, suffix = split_path(
1017
 
                unquote(request.path), 2, 3, True)
1018
 
            validate_device_partition(device, partition)
1019
 
        except ValueError, e:
1020
 
            return HTTPBadRequest(body=str(e), request=request,
1021
 
                                  content_type='text/plain')
 
604
        device, partition, suffix = self._parse_path(request, 2, 3)
 
605
 
1022
606
        if self.mount_check and not check_mount(self.devices, device):
1023
607
            return HTTPInsufficientStorage(drive=device, request=request)
1024
608
        path = os.path.join(self.devices, device, DATADIR, partition)
1043
627
                try:
1044
628
                    method = getattr(self, req.method)
1045
629
                    getattr(method, 'publicly_accessible')
1046
 
                    if req.method not in self.allowed_methods:
 
630
                    replication_method = getattr(method, 'replication', False)
 
631
                    if (self.replication_server is not None and
 
632
                            self.replication_server != replication_method):
1047
633
                        raise AttributeError('Not allowed method.')
1048
634
                except AttributeError:
1049
635
                    res = HTTPMethodNotAllowed()
1051
637
                    res = method(req)
1052
638
            except DiskFileCollision:
1053
639
                res = HTTPForbidden(request=req)
 
640
            except HTTPException as error_response:
 
641
                res = error_response
1054
642
            except (Exception, Timeout):
1055
643
                self.logger.exception(_(
1056
644
                    'ERROR __call__ error with %(method)s'