18
18
from __future__ import with_statement
19
19
import cPickle as pickle
24
23
from collections import defaultdict
25
24
from datetime import datetime
25
from gettext import gettext as _
26
26
from hashlib import md5
27
from tempfile import mkstemp
28
27
from urllib import unquote
29
from contextlib import contextmanager
31
from xattr import getxattr, setxattr
32
29
from eventlet import sleep, Timeout
34
31
from swift.common.utils import mkdirs, normalize_timestamp, public, \
35
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
36
split_path, drop_buffer_cache, get_logger, write_pickle, \
32
hash_path, split_path, get_logger, write_pickle, \
37
33
config_true_value, validate_device_partition, timing_stats, \
34
ThreadPool, replication
39
35
from swift.common.bufferedhttp import http_connect
40
36
from swift.common.constraints import check_object_creation, check_mount, \
41
37
check_float, check_utf8
42
38
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
43
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
44
from swift.obj.base import invalidate_hash, \
45
quarantine_renamer, get_hashes
39
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, \
40
DiskFileDeviceUnavailable
46
41
from swift.common.http import is_success
47
42
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
48
43
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, HTTPNotModified, \
49
44
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
50
45
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, UTC, \
51
HTTPInsufficientStorage, HTTPForbidden, multi_range_iterator, \
46
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
48
from swift.obj.diskfile import DiskFile, get_hashes
55
51
DATADIR = 'objects'
56
52
ASYNCDIR = 'async_pending'
58
METADATA_KEY = 'user.swift.metadata'
59
53
MAX_OBJECT_NAME_LENGTH = 1024
60
54
# keep these lower-case
61
55
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())
64
def read_metadata(fd):
66
Helper function to read the pickled metadata from an object file.
68
:param fd: file descriptor to load the metadata from
70
:returns: dictionary of metadata
76
metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
80
return pickle.loads(metadata)
83
def write_metadata(fd, metadata):
85
Helper function to write pickled metadata for an object file.
87
:param fd: file descriptor to write the metadata
88
:param metadata: metadata to write
90
metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
93
setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
94
metastr = metastr[254:]
98
class DiskWriter(object):
100
Encapsulation of the write context for servicing PUT REST API
101
requests. Serves as the context manager object for DiskFile's writer()
104
def __init__(self, disk_file, fd, tmppath, threadpool):
105
self.disk_file = disk_file
107
self.tmppath = tmppath
110
self.threadpool = threadpool
112
def write(self, chunk):
114
Write a chunk of data into the temporary file.
116
:param chunk: the chunk of data to write as a string object
119
def _write_entire_chunk(chunk):
121
written = os.write(self.fd, chunk)
122
self.upload_size += written
123
chunk = chunk[written:]
125
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
127
# For large files sync every 512MB (by default) written
128
diff = self.upload_size - self.last_sync
129
if diff >= self.disk_file.bytes_per_sync:
130
self.threadpool.force_run_in_thread(fdatasync, self.fd)
131
drop_buffer_cache(self.fd, self.last_sync, diff)
132
self.last_sync = self.upload_size
134
def put(self, metadata, extension='.data'):
136
Finalize writing the file on disk, and renames it from the temp file
137
to the real location. This should be called after the data has been
138
written to the temp file.
140
:param metadata: dictionary of metadata to be written
141
:param extension: extension to be used when making the file
143
assert self.tmppath is not None
144
timestamp = normalize_timestamp(metadata['X-Timestamp'])
145
metadata['name'] = self.disk_file.name
148
# Write the metadata before calling fsync() so that both data and
149
# metadata are flushed to disk.
150
write_metadata(self.fd, metadata)
151
# We call fsync() before calling drop_cache() to lower the amount
152
# of redundant work the drop cache code will perform on the pages
153
# (now that after fsync the pages will be all clean).
155
# From the Department of the Redundancy Department, make sure
156
# we call drop_cache() after fsync() to avoid redundant work
158
drop_buffer_cache(self.fd, 0, self.upload_size)
159
invalidate_hash(os.path.dirname(self.disk_file.datadir))
160
# After the rename completes, this object will be available for
161
# other requests to reference.
162
renamer(self.tmppath,
163
os.path.join(self.disk_file.datadir,
164
timestamp + extension))
166
self.threadpool.force_run_in_thread(finalize_put)
167
self.disk_file.metadata = metadata
170
class DiskFile(object):
172
Manage object files on disk.
174
:param path: path to devices on the node
175
:param device: device name
176
:param partition: partition on the device the object lives in
177
:param account: account name for the object
178
:param container: container name for the object
179
:param obj: object name for the object
180
:param keep_data_fp: if True, don't close the fp, otherwise close it
181
:param disk_chunk_size: size of chunks on file reads
182
:param bytes_per_sync: number of bytes between fdatasync calls
183
:param iter_hook: called when __iter__ returns a chunk
184
:param threadpool: thread pool in which to do blocking operations
186
:raises DiskFileCollision: on md5 collision
189
def __init__(self, path, device, partition, account, container, obj,
190
logger, keep_data_fp=False, disk_chunk_size=65536,
191
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
193
self.disk_chunk_size = disk_chunk_size
194
self.bytes_per_sync = bytes_per_sync
195
self.iter_hook = iter_hook
196
self.name = '/' + '/'.join((account, container, obj))
197
name_hash = hash_path(account, container, obj)
198
self.datadir = os.path.join(
199
path, device, storage_directory(DATADIR, partition, name_hash))
200
self.device_path = os.path.join(path, device)
201
self.tmpdir = os.path.join(path, device, 'tmp')
204
self.meta_file = None
205
self.data_file = None
207
self.iter_etag = None
208
self.started_at_0 = False
209
self.read_to_eof = False
210
self.quarantined_dir = None
211
self.keep_cache = False
212
self.suppress_file_closing = False
213
self.threadpool = threadpool or ThreadPool(nthreads=0)
214
if not os.path.exists(self.datadir):
216
files = sorted(os.listdir(self.datadir), reverse=True)
218
if afile.endswith('.ts'):
219
self.data_file = self.meta_file = None
220
self.metadata = {'deleted': True}
222
if afile.endswith('.meta') and not self.meta_file:
223
self.meta_file = os.path.join(self.datadir, afile)
224
if afile.endswith('.data') and not self.data_file:
225
self.data_file = os.path.join(self.datadir, afile)
227
if not self.data_file:
229
self.fp = open(self.data_file, 'rb')
230
self.metadata = read_metadata(self.fp)
232
self.close(verify_file=False)
234
with open(self.meta_file) as mfp:
235
for key in self.metadata.keys():
236
if key.lower() not in DISALLOWED_HEADERS:
237
del self.metadata[key]
238
self.metadata.update(read_metadata(mfp))
239
if 'name' in self.metadata:
240
if self.metadata['name'] != self.name:
241
self.logger.error(_('Client path %(client)s does not match '
242
'path stored in object metadata %(meta)s'),
243
{'client': self.name,
244
'meta': self.metadata['name']})
245
raise DiskFileCollision('Client path does not match path '
246
'stored in object metadata')
249
"""Returns an iterator over the data file."""
253
self.started_at_0 = False
254
self.read_to_eof = False
255
if self.fp.tell() == 0:
256
self.started_at_0 = True
257
self.iter_etag = md5()
259
chunk = self.threadpool.run_in_thread(
260
self.fp.read, self.disk_chunk_size)
263
self.iter_etag.update(chunk)
265
if read - dropped_cache > (1024 * 1024):
266
self._drop_cache(self.fp.fileno(), dropped_cache,
267
read - dropped_cache)
273
self.read_to_eof = True
274
self._drop_cache(self.fp.fileno(), dropped_cache,
275
read - dropped_cache)
278
if not self.suppress_file_closing:
281
def app_iter_range(self, start, stop):
282
"""Returns an iterator over the data file for range (start, stop)"""
283
if start or start == 0:
286
length = stop - start
290
if length is not None:
293
# Chop off the extra:
298
def app_iter_ranges(self, ranges, content_type, boundary, size):
299
"""Returns an iterator over the data file for a set of ranges"""
304
self.suppress_file_closing = True
305
for chunk in multi_range_iterator(
306
ranges, content_type, boundary, size,
307
self.app_iter_range):
310
self.suppress_file_closing = False
313
def _handle_close_quarantine(self):
314
"""Check if file needs to be quarantined"""
316
self.get_data_file_size()
317
except DiskFileError:
320
except DiskFileNotExist:
323
if self.iter_etag and self.started_at_0 and self.read_to_eof and \
324
'ETag' in self.metadata and \
325
self.iter_etag.hexdigest() != self.metadata.get('ETag'):
328
def close(self, verify_file=True):
330
Close the file. Will handle quarantining file if necessary.
332
:param verify_file: Defaults to True. If false, will not check
333
file to see if it needs quarantining.
338
self._handle_close_quarantine()
339
except (Exception, Timeout), e:
341
'ERROR DiskFile %(data_file)s in '
342
'%(data_dir)s close failure: %(exc)s : %(stack)'),
343
{'exc': e, 'stack': ''.join(traceback.format_stack()),
344
'data_file': self.data_file, 'data_dir': self.datadir})
349
def is_deleted(self):
351
Check if the file is deleted.
353
:returns: True if the file doesn't exist or has been flagged as
356
return not self.data_file or 'deleted' in self.metadata
358
def is_expired(self):
360
Check if the file is expired.
362
:returns: True if the file has an X-Delete-At in the past
364
return ('X-Delete-At' in self.metadata and
365
int(self.metadata['X-Delete-At']) <= time.time())
368
def writer(self, size=None):
370
Context manager to write a file. We create a temporary file first, and
371
then return a DiskWriter object to encapsulate the state.
373
:param size: optional initial size of file to explicitly allocate on
375
:raises DiskFileNoSpace: if a size is specified and allocation fails
377
if not os.path.exists(self.tmpdir):
379
fd, tmppath = mkstemp(dir=self.tmpdir)
381
if size is not None and size > 0:
385
raise DiskFileNoSpace()
386
yield DiskWriter(self, fd, tmppath, self.threadpool)
397
def put_metadata(self, metadata, tombstone=False):
399
Short hand for putting metadata to .meta and .ts files.
401
:param metadata: dictionary of metadata to be written
402
:param tombstone: whether or not we are writing a tombstone
404
extension = '.ts' if tombstone else '.meta'
405
with self.writer() as writer:
406
writer.put(metadata, extension=extension)
408
def unlinkold(self, timestamp):
410
Remove any older versions of the object file. Any file that has an
411
older timestamp than timestamp will be deleted.
413
:param timestamp: timestamp to compare with each file
415
timestamp = normalize_timestamp(timestamp)
418
for fname in os.listdir(self.datadir):
419
if fname < timestamp:
421
os.unlink(os.path.join(self.datadir, fname))
422
except OSError, err: # pragma: no cover
423
if err.errno != errno.ENOENT:
425
self.threadpool.run_in_thread(_unlinkold)
427
def _drop_cache(self, fd, offset, length):
428
"""Method for no-oping buffer cache drop method."""
429
if not self.keep_cache:
430
drop_buffer_cache(fd, offset, length)
432
def quarantine(self):
434
In the case that a file is corrupted, move it to a quarantined
435
area to allow replication to fix it.
437
:returns: if quarantine is successful, path to quarantined
438
directory otherwise None
440
if not (self.is_deleted() or self.quarantined_dir):
441
self.quarantined_dir = self.threadpool.run_in_thread(
442
quarantine_renamer, self.device_path, self.data_file)
443
self.logger.increment('quarantines')
444
return self.quarantined_dir
446
def get_data_file_size(self):
448
Returns the os.path.getsize for the file. Raises an exception if this
449
file does not match the Content-Length stored in the metadata. Or if
450
self.data_file does not exist.
452
:returns: file size as an int
453
:raises DiskFileError: on file size mismatch.
454
:raises DiskFileNotExist: on file not existing (including deleted)
459
file_size = self.threadpool.run_in_thread(
460
os.path.getsize, self.data_file)
461
if 'Content-Length' in self.metadata:
462
metadata_size = int(self.metadata['Content-Length'])
463
if file_size != metadata_size:
465
'Content-Length of %s does not match file size '
466
'of %s' % (metadata_size, file_size))
469
if err.errno != errno.ENOENT:
471
raise DiskFileNotExist('Data File does not exist.')
474
58
class ObjectController(object):
475
59
"""Implements the WSGI application for the Swift Object Server."""
974
558
not check_float(request.headers['x-timestamp']):
975
559
return HTTPBadRequest(body='Missing timestamp', request=request,
976
560
content_type='text/plain')
977
if self.mount_check and not check_mount(self.devices, device):
562
disk_file = self._diskfile(device, partition, account, container,
564
except DiskFileDeviceUnavailable:
978
565
return HTTPInsufficientStorage(drive=device, request=request)
979
response_class = HTTPNoContent
980
disk_file = self._diskfile(device, partition, account, container, obj)
981
566
if 'x-if-delete-at' in request.headers and \
982
567
int(request.headers['x-if-delete-at']) != \
983
568
int(disk_file.metadata.get('X-Delete-At') or 0):
984
569
return HTTPPreconditionFailed(
986
571
body='X-If-Delete-At and X-Delete-At do not match')
987
orig_timestamp = disk_file.metadata.get('X-Timestamp')
988
if disk_file.is_deleted() or disk_file.is_expired():
989
response_class = HTTPNotFound
991
'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
993
572
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
994
573
if old_delete_at:
995
574
self.delete_at_update('DELETE', old_delete_at, account,
996
575
container, obj, request, device)
997
disk_file.put_metadata(metadata, tombstone=True)
998
disk_file.unlinkold(metadata['X-Timestamp'])
999
if not orig_timestamp or \
1000
orig_timestamp < request.headers['x-timestamp']:
576
orig_timestamp = disk_file.metadata.get('X-Timestamp', 0)
577
req_timestamp = request.headers['X-Timestamp']
578
if disk_file.is_deleted() or disk_file.is_expired():
579
response_class = HTTPNotFound
581
if orig_timestamp < req_timestamp:
582
response_class = HTTPNoContent
584
response_class = HTTPConflict
585
if orig_timestamp < req_timestamp:
586
disk_file.put_metadata({'X-Timestamp': req_timestamp},
588
disk_file.unlinkold(req_timestamp)
1001
589
self.container_update(
1002
590
'DELETE', account, container, obj, request,
1003
HeaderKeyDict({'x-timestamp': metadata['X-Timestamp']}),
591
HeaderKeyDict({'x-timestamp': req_timestamp}),
1005
593
resp = response_class(request=request)
1009
598
@timing_stats(sample_rate=0.1)
1010
599
def REPLICATE(self, request):
1012
601
Handle REPLICATE requests for the Swift Object Server. This is used
1013
602
by the object replicator to get hashes for directories.
1016
device, partition, suffix = split_path(
1017
unquote(request.path), 2, 3, True)
1018
validate_device_partition(device, partition)
1019
except ValueError, e:
1020
return HTTPBadRequest(body=str(e), request=request,
1021
content_type='text/plain')
604
device, partition, suffix = self._parse_path(request, 2, 3)
1022
606
if self.mount_check and not check_mount(self.devices, device):
1023
607
return HTTPInsufficientStorage(drive=device, request=request)
1024
608
path = os.path.join(self.devices, device, DATADIR, partition)