1
# Copyright (c) 2010-2013 OpenStack, LLC.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
""" Disk File Interface for Swift Object Server"""
18
from __future__ import with_statement
19
import cPickle as pickle
27
from gettext import gettext as _
28
from os.path import basename, dirname, exists, getmtime, getsize, join
29
from tempfile import mkstemp
30
from contextlib import contextmanager
32
from xattr import getxattr, setxattr
33
from eventlet import Timeout
35
from swift.common.constraints import check_mount
36
from swift.common.utils import mkdirs, normalize_timestamp, \
37
storage_directory, hash_path, renamer, fallocate, fsync, \
38
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle
39
from swift.common.exceptions import DiskFileError, DiskFileNotExist, \
40
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
42
from swift.common.swob import multi_range_iterator
47
HASH_FILE = 'hashes.pkl'
48
METADATA_KEY = 'user.swift.metadata'
51
def read_metadata(fd):
53
Helper function to read the pickled metadata from an object file.
55
:param fd: file descriptor to load the metadata from
57
:returns: dictionary of metadata
63
metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
67
return pickle.loads(metadata)
70
def write_metadata(fd, metadata):
72
Helper function to write pickled metadata for an object file.
74
:param fd: file descriptor to write the metadata
75
:param metadata: metadata to write
77
metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
80
setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
81
metastr = metastr[254:]
85
def quarantine_renamer(device_path, corrupted_file_path):
87
In the case that a file is corrupted, move it to a quarantined
88
area to allow replication to fix it.
90
:params device_path: The path to the device the corrupted file is on.
91
:params corrupted_file_path: The path to the file you want quarantined.
93
:returns: path (str) of directory the file was moved to
94
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
95
exceptions from rename
97
from_dir = dirname(corrupted_file_path)
98
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
99
invalidate_hash(dirname(from_dir))
101
renamer(from_dir, to_dir)
103
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
105
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
106
renamer(from_dir, to_dir)
110
def hash_suffix(path, reclaim_age):
112
Performs reclamation and returns an md5 of all (remaining) files.
114
:param reclaim_age: age in seconds at which to remove tombstones
115
:raises PathNotDir: if given path is not a valid directory
116
:raises OSError: for non-ENOTDIR errors
120
path_contents = sorted(os.listdir(path))
122
if err.errno in (errno.ENOTDIR, errno.ENOENT):
125
for hsh in path_contents:
126
hsh_path = join(path, hsh)
128
files = os.listdir(hsh_path)
130
if err.errno == errno.ENOTDIR:
131
partition_path = dirname(path)
132
objects_path = dirname(partition_path)
133
device_path = dirname(objects_path)
134
quar_path = quarantine_renamer(device_path, hsh_path)
136
_('Quarantined %s to %s because it is not a directory') %
137
(hsh_path, quar_path))
141
if files[0].endswith('.ts'):
142
# remove tombstones older than reclaim_age
143
ts = files[0].rsplit('.', 1)[0]
144
if (time.time() - float(ts)) > reclaim_age:
145
os.unlink(join(hsh_path, files[0]))
146
files.remove(files[0])
148
files.sort(reverse=True)
149
meta = data = tomb = None
150
for filename in list(files):
151
if not meta and filename.endswith('.meta'):
153
if not data and filename.endswith('.data'):
155
if not tomb and filename.endswith('.ts'):
157
if (filename < tomb or # any file older than tomb
158
filename < data or # any file older than data
159
(filename.endswith('.meta') and
160
filename < meta)): # old meta
161
os.unlink(join(hsh_path, filename))
162
files.remove(filename)
165
for filename in files:
171
return md5.hexdigest()
174
def invalidate_hash(suffix_dir):
176
Invalidates the hash for a suffix_dir in the partition's hashes file.
178
:param suffix_dir: absolute path to suffix dir whose hash needs
182
suffix = basename(suffix_dir)
183
partition_dir = dirname(suffix_dir)
184
hashes_file = join(partition_dir, HASH_FILE)
185
with lock_path(partition_dir):
187
with open(hashes_file, 'rb') as fp:
188
hashes = pickle.load(fp)
189
if suffix in hashes and not hashes[suffix]:
193
hashes[suffix] = None
194
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
197
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
198
reclaim_age=ONE_WEEK):
200
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
201
the hash cache for suffix existence at the (unexpectedly high) cost of a
202
listdir. reclaim_age is just passed on to hash_suffix.
204
:param partition_dir: absolute path of partition to get hashes for
205
:param recalculate: list of suffixes which should be recalculated when got
206
:param do_listdir: force existence check for all hashes in the partition
207
:param reclaim_age: age at which to remove tombstones
209
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
213
hashes_file = join(partition_dir, HASH_FILE)
215
force_rewrite = False
219
if recalculate is None:
223
with open(hashes_file, 'rb') as fp:
224
hashes = pickle.load(fp)
225
mtime = getmtime(hashes_file)
230
for suff in os.listdir(partition_dir):
232
hashes.setdefault(suff, None)
234
hashes.update((hash_, None) for hash_ in recalculate)
235
for suffix, hash_ in hashes.items():
237
suffix_dir = join(partition_dir, suffix)
239
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
244
logging.exception(_('Error hashing suffix'))
247
with lock_path(partition_dir):
248
if force_rewrite or not exists(hashes_file) or \
249
getmtime(hashes_file) == mtime:
251
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
252
return hashed, hashes
253
return get_hashes(partition_dir, recalculate, do_listdir,
256
return hashed, hashes
259
class DiskWriter(object):
261
Encapsulation of the write context for servicing PUT REST API
262
requests. Serves as the context manager object for DiskFile's writer()
265
def __init__(self, disk_file, fd, tmppath, threadpool):
266
self.disk_file = disk_file
268
self.tmppath = tmppath
271
self.threadpool = threadpool
273
def write(self, chunk):
275
Write a chunk of data into the temporary file.
277
:param chunk: the chunk of data to write as a string object
280
def _write_entire_chunk(chunk):
282
written = os.write(self.fd, chunk)
283
self.upload_size += written
284
chunk = chunk[written:]
286
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
288
# For large files sync every 512MB (by default) written
289
diff = self.upload_size - self.last_sync
290
if diff >= self.disk_file.bytes_per_sync:
291
self.threadpool.force_run_in_thread(fdatasync, self.fd)
292
drop_buffer_cache(self.fd, self.last_sync, diff)
293
self.last_sync = self.upload_size
295
def put(self, metadata, extension='.data'):
297
Finalize writing the file on disk, and renames it from the temp file
298
to the real location. This should be called after the data has been
299
written to the temp file.
301
:param metadata: dictionary of metadata to be written
302
:param extension: extension to be used when making the file
305
raise ValueError("tmppath is unusable.")
306
timestamp = normalize_timestamp(metadata['X-Timestamp'])
307
metadata['name'] = self.disk_file.name
310
# Write the metadata before calling fsync() so that both data and
311
# metadata are flushed to disk.
312
write_metadata(self.fd, metadata)
313
# We call fsync() before calling drop_cache() to lower the amount
314
# of redundant work the drop cache code will perform on the pages
315
# (now that after fsync the pages will be all clean).
317
# From the Department of the Redundancy Department, make sure
318
# we call drop_cache() after fsync() to avoid redundant work
320
drop_buffer_cache(self.fd, 0, self.upload_size)
321
invalidate_hash(dirname(self.disk_file.datadir))
322
# After the rename completes, this object will be available for
323
# other requests to reference.
324
renamer(self.tmppath, join(self.disk_file.datadir,
325
timestamp + extension))
327
self.threadpool.force_run_in_thread(finalize_put)
328
self.disk_file.metadata = metadata
331
class DiskFile(object):
333
Manage object files on disk.
335
:param path: path to devices on the node
336
:param device: device name
337
:param partition: partition on the device the object lives in
338
:param account: account name for the object
339
:param container: container name for the object
340
:param obj: object name for the object
341
:param keep_data_fp: if True, don't close the fp, otherwise close it
342
:param disk_chunk_size: size of chunks on file reads
343
:param bytes_per_sync: number of bytes between fdatasync calls
344
:param iter_hook: called when __iter__ returns a chunk
345
:param threadpool: thread pool in which to do blocking operations
347
:raises DiskFileCollision: on md5 collision
350
def __init__(self, path, device, partition, account, container, obj,
351
logger, keep_data_fp=False, disk_chunk_size=65536,
352
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
353
threadpool=None, obj_dir='objects', mount_check=False,
354
disallowed_metadata_keys=None):
355
if mount_check and not check_mount(path, device):
356
raise DiskFileDeviceUnavailable()
357
self.disk_chunk_size = disk_chunk_size
358
self.bytes_per_sync = bytes_per_sync
359
self.iter_hook = iter_hook
360
self.name = '/' + '/'.join((account, container, obj))
361
name_hash = hash_path(account, container, obj)
363
path, device, storage_directory(obj_dir, partition, name_hash))
364
self.device_path = join(path, device)
365
self.tmpdir = join(path, device, 'tmp')
367
self.disallowed_metadata_keys = disallowed_metadata_keys or []
369
self.data_file = None
371
self.iter_etag = None
372
self.started_at_0 = False
373
self.read_to_eof = False
374
self.quarantined_dir = None
375
self.keep_cache = False
376
self.suppress_file_closing = False
377
self.threadpool = threadpool or ThreadPool(nthreads=0)
378
if not exists(self.datadir):
380
files = sorted(os.listdir(self.datadir), reverse=True)
383
if afile.endswith('.ts'):
384
self.data_file = None
385
with open(join(self.datadir, afile)) as mfp:
386
self.metadata = read_metadata(mfp)
387
self.metadata['deleted'] = True
389
if afile.endswith('.meta') and not meta_file:
390
meta_file = join(self.datadir, afile)
391
if afile.endswith('.data') and not self.data_file:
392
self.data_file = join(self.datadir, afile)
394
if not self.data_file:
396
self.fp = open(self.data_file, 'rb')
397
self.metadata = read_metadata(self.fp)
399
self.close(verify_file=False)
401
with open(meta_file) as mfp:
402
for key in self.metadata.keys():
403
if key.lower() not in self.disallowed_metadata_keys:
404
del self.metadata[key]
405
self.metadata.update(read_metadata(mfp))
406
if 'name' in self.metadata:
407
if self.metadata['name'] != self.name:
408
self.logger.error(_('Client path %(client)s does not match '
409
'path stored in object metadata %(meta)s'),
410
{'client': self.name,
411
'meta': self.metadata['name']})
412
raise DiskFileCollision('Client path does not match path '
413
'stored in object metadata')
416
"""Returns an iterator over the data file."""
420
self.started_at_0 = False
421
self.read_to_eof = False
422
if self.fp.tell() == 0:
423
self.started_at_0 = True
424
self.iter_etag = hashlib.md5()
426
chunk = self.threadpool.run_in_thread(
427
self.fp.read, self.disk_chunk_size)
430
self.iter_etag.update(chunk)
432
if read - dropped_cache > (1024 * 1024):
433
self._drop_cache(self.fp.fileno(), dropped_cache,
434
read - dropped_cache)
440
self.read_to_eof = True
441
self._drop_cache(self.fp.fileno(), dropped_cache,
442
read - dropped_cache)
445
if not self.suppress_file_closing:
448
def app_iter_range(self, start, stop):
449
"""Returns an iterator over the data file for range (start, stop)"""
450
if start or start == 0:
453
length = stop - start
458
if length is not None:
461
# Chop off the extra:
466
if not self.suppress_file_closing:
469
def app_iter_ranges(self, ranges, content_type, boundary, size):
470
"""Returns an iterator over the data file for a set of ranges"""
475
self.suppress_file_closing = True
476
for chunk in multi_range_iterator(
477
ranges, content_type, boundary, size,
478
self.app_iter_range):
481
self.suppress_file_closing = False
484
def _handle_close_quarantine(self):
485
"""Check if file needs to be quarantined"""
487
self.get_data_file_size()
488
except DiskFileNotExist:
490
except DiskFileError:
494
if self.iter_etag and self.started_at_0 and self.read_to_eof and \
495
'ETag' in self.metadata and \
496
self.iter_etag.hexdigest() != self.metadata.get('ETag'):
499
def close(self, verify_file=True):
501
Close the file. Will handle quarantining file if necessary.
503
:param verify_file: Defaults to True. If false, will not check
504
file to see if it needs quarantining.
509
self._handle_close_quarantine()
510
except (Exception, Timeout), e:
512
'ERROR DiskFile %(data_file)s in '
513
'%(data_dir)s close failure: %(exc)s : %(stack)'),
514
{'exc': e, 'stack': ''.join(traceback.format_stack()),
515
'data_file': self.data_file, 'data_dir': self.datadir})
520
def is_deleted(self):
522
Check if the file is deleted.
524
:returns: True if the file doesn't exist or has been flagged as
527
return not self.data_file or 'deleted' in self.metadata
529
def is_expired(self):
531
Check if the file is expired.
533
:returns: True if the file has an X-Delete-At in the past
535
return ('X-Delete-At' in self.metadata and
536
int(self.metadata['X-Delete-At']) <= time.time())
539
def writer(self, size=None):
541
Context manager to write a file. We create a temporary file first, and
542
then return a DiskWriter object to encapsulate the state.
544
:param size: optional initial size of file to explicitly allocate on
546
:raises DiskFileNoSpace: if a size is specified and allocation fails
548
if not exists(self.tmpdir):
550
fd, tmppath = mkstemp(dir=self.tmpdir)
552
if size is not None and size > 0:
556
raise DiskFileNoSpace()
557
yield DiskWriter(self, fd, tmppath, self.threadpool)
568
def put_metadata(self, metadata, tombstone=False):
570
Short hand for putting metadata to .meta and .ts files.
572
:param metadata: dictionary of metadata to be written
573
:param tombstone: whether or not we are writing a tombstone
575
extension = '.ts' if tombstone else '.meta'
576
with self.writer() as writer:
577
writer.put(metadata, extension=extension)
579
def unlinkold(self, timestamp):
581
Remove any older versions of the object file. Any file that has an
582
older timestamp than timestamp will be deleted.
584
:param timestamp: timestamp to compare with each file
586
timestamp = normalize_timestamp(timestamp)
589
for fname in os.listdir(self.datadir):
590
if fname < timestamp:
592
os.unlink(join(self.datadir, fname))
593
except OSError, err: # pragma: no cover
594
if err.errno != errno.ENOENT:
596
self.threadpool.run_in_thread(_unlinkold)
598
def _drop_cache(self, fd, offset, length):
599
"""Method for no-oping buffer cache drop method."""
600
if not self.keep_cache:
601
drop_buffer_cache(fd, offset, length)
603
def quarantine(self):
605
In the case that a file is corrupted, move it to a quarantined
606
area to allow replication to fix it.
608
:returns: if quarantine is successful, path to quarantined
609
directory otherwise None
611
if not (self.is_deleted() or self.quarantined_dir):
612
self.quarantined_dir = self.threadpool.run_in_thread(
613
quarantine_renamer, self.device_path, self.data_file)
614
self.logger.increment('quarantines')
615
return self.quarantined_dir
617
def get_data_file_size(self):
619
Returns the os.path.getsize for the file. Raises an exception if this
620
file does not match the Content-Length stored in the metadata. Or if
621
self.data_file does not exist.
623
:returns: file size as an int
624
:raises DiskFileError: on file size mismatch.
625
:raises DiskFileNotExist: on file not existing (including deleted)
630
file_size = self.threadpool.run_in_thread(
631
getsize, self.data_file)
632
if 'Content-Length' in self.metadata:
633
metadata_size = int(self.metadata['Content-Length'])
634
if file_size != metadata_size:
636
'Content-Length of %s does not match file size '
637
'of %s' % (metadata_size, file_size))
640
if err.errno != errno.ENOENT:
642
raise DiskFileNotExist('Data File does not exist.')