~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/obj/diskfile.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2013-08-13 10:37:13 UTC
  • mfrom: (1.2.21)
  • Revision ID: package-import@ubuntu.com-20130813103713-1ctbx4zifyljs2aq
Tags: 1.9.1-0ubuntu1
[ James Page ]
* d/control: Update VCS fields for new branch locations.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2010-2013 OpenStack, LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
""" Disk File Interface for Swift Object Server"""
 
17
 
 
18
from __future__ import with_statement
 
19
import cPickle as pickle
 
20
import errno
 
21
import os
 
22
import time
 
23
import uuid
 
24
import hashlib
 
25
import logging
 
26
import traceback
 
27
from gettext import gettext as _
 
28
from os.path import basename, dirname, exists, getmtime, getsize, join
 
29
from tempfile import mkstemp
 
30
from contextlib import contextmanager
 
31
 
 
32
from xattr import getxattr, setxattr
 
33
from eventlet import Timeout
 
34
 
 
35
from swift.common.constraints import check_mount
 
36
from swift.common.utils import mkdirs, normalize_timestamp, \
 
37
    storage_directory, hash_path, renamer, fallocate, fsync, \
 
38
    fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle
 
39
from swift.common.exceptions import DiskFileError, DiskFileNotExist, \
 
40
    DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
 
41
    PathNotDir
 
42
from swift.common.swob import multi_range_iterator
 
43
 
 
44
 
 
45
PICKLE_PROTOCOL = 2
 
46
ONE_WEEK = 604800
 
47
HASH_FILE = 'hashes.pkl'
 
48
METADATA_KEY = 'user.swift.metadata'
 
49
 
 
50
 
 
51
def read_metadata(fd):
 
52
    """
 
53
    Helper function to read the pickled metadata from an object file.
 
54
 
 
55
    :param fd: file descriptor to load the metadata from
 
56
 
 
57
    :returns: dictionary of metadata
 
58
    """
 
59
    metadata = ''
 
60
    key = 0
 
61
    try:
 
62
        while True:
 
63
            metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
 
64
            key += 1
 
65
    except IOError:
 
66
        pass
 
67
    return pickle.loads(metadata)
 
68
 
 
69
 
 
70
def write_metadata(fd, metadata):
 
71
    """
 
72
    Helper function to write pickled metadata for an object file.
 
73
 
 
74
    :param fd: file descriptor to write the metadata
 
75
    :param metadata: metadata to write
 
76
    """
 
77
    metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
 
78
    key = 0
 
79
    while metastr:
 
80
        setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
 
81
        metastr = metastr[254:]
 
82
        key += 1
 
83
 
 
84
 
 
85
def quarantine_renamer(device_path, corrupted_file_path):
 
86
    """
 
87
    In the case that a file is corrupted, move it to a quarantined
 
88
    area to allow replication to fix it.
 
89
 
 
90
    :params device_path: The path to the device the corrupted file is on.
 
91
    :params corrupted_file_path: The path to the file you want quarantined.
 
92
 
 
93
    :returns: path (str) of directory the file was moved to
 
94
    :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
 
95
                     exceptions from rename
 
96
    """
 
97
    from_dir = dirname(corrupted_file_path)
 
98
    to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
 
99
    invalidate_hash(dirname(from_dir))
 
100
    try:
 
101
        renamer(from_dir, to_dir)
 
102
    except OSError, e:
 
103
        if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
 
104
            raise
 
105
        to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
 
106
        renamer(from_dir, to_dir)
 
107
    return to_dir
 
108
 
 
109
 
 
110
def hash_suffix(path, reclaim_age):
 
111
    """
 
112
    Performs reclamation and returns an md5 of all (remaining) files.
 
113
 
 
114
    :param reclaim_age: age in seconds at which to remove tombstones
 
115
    :raises PathNotDir: if given path is not a valid directory
 
116
    :raises OSError: for non-ENOTDIR errors
 
117
    """
 
118
    md5 = hashlib.md5()
 
119
    try:
 
120
        path_contents = sorted(os.listdir(path))
 
121
    except OSError, err:
 
122
        if err.errno in (errno.ENOTDIR, errno.ENOENT):
 
123
            raise PathNotDir()
 
124
        raise
 
125
    for hsh in path_contents:
 
126
        hsh_path = join(path, hsh)
 
127
        try:
 
128
            files = os.listdir(hsh_path)
 
129
        except OSError, err:
 
130
            if err.errno == errno.ENOTDIR:
 
131
                partition_path = dirname(path)
 
132
                objects_path = dirname(partition_path)
 
133
                device_path = dirname(objects_path)
 
134
                quar_path = quarantine_renamer(device_path, hsh_path)
 
135
                logging.exception(
 
136
                    _('Quarantined %s to %s because it is not a directory') %
 
137
                    (hsh_path, quar_path))
 
138
                continue
 
139
            raise
 
140
        if len(files) == 1:
 
141
            if files[0].endswith('.ts'):
 
142
                # remove tombstones older than reclaim_age
 
143
                ts = files[0].rsplit('.', 1)[0]
 
144
                if (time.time() - float(ts)) > reclaim_age:
 
145
                    os.unlink(join(hsh_path, files[0]))
 
146
                    files.remove(files[0])
 
147
        elif files:
 
148
            files.sort(reverse=True)
 
149
            meta = data = tomb = None
 
150
            for filename in list(files):
 
151
                if not meta and filename.endswith('.meta'):
 
152
                    meta = filename
 
153
                if not data and filename.endswith('.data'):
 
154
                    data = filename
 
155
                if not tomb and filename.endswith('.ts'):
 
156
                    tomb = filename
 
157
                if (filename < tomb or       # any file older than tomb
 
158
                    filename < data or       # any file older than data
 
159
                    (filename.endswith('.meta') and
 
160
                     filename < meta)):      # old meta
 
161
                    os.unlink(join(hsh_path, filename))
 
162
                    files.remove(filename)
 
163
        if not files:
 
164
            os.rmdir(hsh_path)
 
165
        for filename in files:
 
166
            md5.update(filename)
 
167
    try:
 
168
        os.rmdir(path)
 
169
    except OSError:
 
170
        pass
 
171
    return md5.hexdigest()
 
172
 
 
173
 
 
174
def invalidate_hash(suffix_dir):
 
175
    """
 
176
    Invalidates the hash for a suffix_dir in the partition's hashes file.
 
177
 
 
178
    :param suffix_dir: absolute path to suffix dir whose hash needs
 
179
                       invalidating
 
180
    """
 
181
 
 
182
    suffix = basename(suffix_dir)
 
183
    partition_dir = dirname(suffix_dir)
 
184
    hashes_file = join(partition_dir, HASH_FILE)
 
185
    with lock_path(partition_dir):
 
186
        try:
 
187
            with open(hashes_file, 'rb') as fp:
 
188
                hashes = pickle.load(fp)
 
189
            if suffix in hashes and not hashes[suffix]:
 
190
                return
 
191
        except Exception:
 
192
            return
 
193
        hashes[suffix] = None
 
194
        write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
 
195
 
 
196
 
 
197
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
 
198
               reclaim_age=ONE_WEEK):
 
199
    """
 
200
    Get a list of hashes for the suffix dir.  do_listdir causes it to mistrust
 
201
    the hash cache for suffix existence at the (unexpectedly high) cost of a
 
202
    listdir.  reclaim_age is just passed on to hash_suffix.
 
203
 
 
204
    :param partition_dir: absolute path of partition to get hashes for
 
205
    :param recalculate: list of suffixes which should be recalculated when got
 
206
    :param do_listdir: force existence check for all hashes in the partition
 
207
    :param reclaim_age: age at which to remove tombstones
 
208
 
 
209
    :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
 
210
    """
 
211
 
 
212
    hashed = 0
 
213
    hashes_file = join(partition_dir, HASH_FILE)
 
214
    modified = False
 
215
    force_rewrite = False
 
216
    hashes = {}
 
217
    mtime = -1
 
218
 
 
219
    if recalculate is None:
 
220
        recalculate = []
 
221
 
 
222
    try:
 
223
        with open(hashes_file, 'rb') as fp:
 
224
            hashes = pickle.load(fp)
 
225
        mtime = getmtime(hashes_file)
 
226
    except Exception:
 
227
        do_listdir = True
 
228
        force_rewrite = True
 
229
    if do_listdir:
 
230
        for suff in os.listdir(partition_dir):
 
231
            if len(suff) == 3:
 
232
                hashes.setdefault(suff, None)
 
233
        modified = True
 
234
    hashes.update((hash_, None) for hash_ in recalculate)
 
235
    for suffix, hash_ in hashes.items():
 
236
        if not hash_:
 
237
            suffix_dir = join(partition_dir, suffix)
 
238
            try:
 
239
                hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
 
240
                hashed += 1
 
241
            except PathNotDir:
 
242
                del hashes[suffix]
 
243
            except OSError:
 
244
                logging.exception(_('Error hashing suffix'))
 
245
            modified = True
 
246
    if modified:
 
247
        with lock_path(partition_dir):
 
248
            if force_rewrite or not exists(hashes_file) or \
 
249
                    getmtime(hashes_file) == mtime:
 
250
                write_pickle(
 
251
                    hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
 
252
                return hashed, hashes
 
253
        return get_hashes(partition_dir, recalculate, do_listdir,
 
254
                          reclaim_age)
 
255
    else:
 
256
        return hashed, hashes
 
257
 
 
258
 
 
259
class DiskWriter(object):
 
260
    """
 
261
    Encapsulation of the write context for servicing PUT REST API
 
262
    requests. Serves as the context manager object for DiskFile's writer()
 
263
    method.
 
264
    """
 
265
    def __init__(self, disk_file, fd, tmppath, threadpool):
 
266
        self.disk_file = disk_file
 
267
        self.fd = fd
 
268
        self.tmppath = tmppath
 
269
        self.upload_size = 0
 
270
        self.last_sync = 0
 
271
        self.threadpool = threadpool
 
272
 
 
273
    def write(self, chunk):
 
274
        """
 
275
        Write a chunk of data into the temporary file.
 
276
 
 
277
        :param chunk: the chunk of data to write as a string object
 
278
        """
 
279
 
 
280
        def _write_entire_chunk(chunk):
 
281
            while chunk:
 
282
                written = os.write(self.fd, chunk)
 
283
                self.upload_size += written
 
284
                chunk = chunk[written:]
 
285
 
 
286
        self.threadpool.run_in_thread(_write_entire_chunk, chunk)
 
287
 
 
288
        # For large files sync every 512MB (by default) written
 
289
        diff = self.upload_size - self.last_sync
 
290
        if diff >= self.disk_file.bytes_per_sync:
 
291
            self.threadpool.force_run_in_thread(fdatasync, self.fd)
 
292
            drop_buffer_cache(self.fd, self.last_sync, diff)
 
293
            self.last_sync = self.upload_size
 
294
 
 
295
    def put(self, metadata, extension='.data'):
 
296
        """
 
297
        Finalize writing the file on disk, and renames it from the temp file
 
298
        to the real location.  This should be called after the data has been
 
299
        written to the temp file.
 
300
 
 
301
        :param metadata: dictionary of metadata to be written
 
302
        :param extension: extension to be used when making the file
 
303
        """
 
304
        if not self.tmppath:
 
305
            raise ValueError("tmppath is unusable.")
 
306
        timestamp = normalize_timestamp(metadata['X-Timestamp'])
 
307
        metadata['name'] = self.disk_file.name
 
308
 
 
309
        def finalize_put():
 
310
            # Write the metadata before calling fsync() so that both data and
 
311
            # metadata are flushed to disk.
 
312
            write_metadata(self.fd, metadata)
 
313
            # We call fsync() before calling drop_cache() to lower the amount
 
314
            # of redundant work the drop cache code will perform on the pages
 
315
            # (now that after fsync the pages will be all clean).
 
316
            fsync(self.fd)
 
317
            # From the Department of the Redundancy Department, make sure
 
318
            # we call drop_cache() after fsync() to avoid redundant work
 
319
            # (pages all clean).
 
320
            drop_buffer_cache(self.fd, 0, self.upload_size)
 
321
            invalidate_hash(dirname(self.disk_file.datadir))
 
322
            # After the rename completes, this object will be available for
 
323
            # other requests to reference.
 
324
            renamer(self.tmppath, join(self.disk_file.datadir,
 
325
                                       timestamp + extension))
 
326
 
 
327
        self.threadpool.force_run_in_thread(finalize_put)
 
328
        self.disk_file.metadata = metadata
 
329
 
 
330
 
 
331
class DiskFile(object):
 
332
    """
 
333
    Manage object files on disk.
 
334
 
 
335
    :param path: path to devices on the node
 
336
    :param device: device name
 
337
    :param partition: partition on the device the object lives in
 
338
    :param account: account name for the object
 
339
    :param container: container name for the object
 
340
    :param obj: object name for the object
 
341
    :param keep_data_fp: if True, don't close the fp, otherwise close it
 
342
    :param disk_chunk_size: size of chunks on file reads
 
343
    :param bytes_per_sync: number of bytes between fdatasync calls
 
344
    :param iter_hook: called when __iter__ returns a chunk
 
345
    :param threadpool: thread pool in which to do blocking operations
 
346
 
 
347
    :raises DiskFileCollision: on md5 collision
 
348
    """
 
349
 
 
350
    def __init__(self, path, device, partition, account, container, obj,
 
351
                 logger, keep_data_fp=False, disk_chunk_size=65536,
 
352
                 bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
 
353
                 threadpool=None, obj_dir='objects', mount_check=False,
 
354
                 disallowed_metadata_keys=None):
 
355
        if mount_check and not check_mount(path, device):
 
356
            raise DiskFileDeviceUnavailable()
 
357
        self.disk_chunk_size = disk_chunk_size
 
358
        self.bytes_per_sync = bytes_per_sync
 
359
        self.iter_hook = iter_hook
 
360
        self.name = '/' + '/'.join((account, container, obj))
 
361
        name_hash = hash_path(account, container, obj)
 
362
        self.datadir = join(
 
363
            path, device, storage_directory(obj_dir, partition, name_hash))
 
364
        self.device_path = join(path, device)
 
365
        self.tmpdir = join(path, device, 'tmp')
 
366
        self.logger = logger
 
367
        self.disallowed_metadata_keys = disallowed_metadata_keys or []
 
368
        self.metadata = {}
 
369
        self.data_file = None
 
370
        self.fp = None
 
371
        self.iter_etag = None
 
372
        self.started_at_0 = False
 
373
        self.read_to_eof = False
 
374
        self.quarantined_dir = None
 
375
        self.keep_cache = False
 
376
        self.suppress_file_closing = False
 
377
        self.threadpool = threadpool or ThreadPool(nthreads=0)
 
378
        if not exists(self.datadir):
 
379
            return
 
380
        files = sorted(os.listdir(self.datadir), reverse=True)
 
381
        meta_file = None
 
382
        for afile in files:
 
383
            if afile.endswith('.ts'):
 
384
                self.data_file = None
 
385
                with open(join(self.datadir, afile)) as mfp:
 
386
                    self.metadata = read_metadata(mfp)
 
387
                self.metadata['deleted'] = True
 
388
                break
 
389
            if afile.endswith('.meta') and not meta_file:
 
390
                meta_file = join(self.datadir, afile)
 
391
            if afile.endswith('.data') and not self.data_file:
 
392
                self.data_file = join(self.datadir, afile)
 
393
                break
 
394
        if not self.data_file:
 
395
            return
 
396
        self.fp = open(self.data_file, 'rb')
 
397
        self.metadata = read_metadata(self.fp)
 
398
        if not keep_data_fp:
 
399
            self.close(verify_file=False)
 
400
        if meta_file:
 
401
            with open(meta_file) as mfp:
 
402
                for key in self.metadata.keys():
 
403
                    if key.lower() not in self.disallowed_metadata_keys:
 
404
                        del self.metadata[key]
 
405
                self.metadata.update(read_metadata(mfp))
 
406
        if 'name' in self.metadata:
 
407
            if self.metadata['name'] != self.name:
 
408
                self.logger.error(_('Client path %(client)s does not match '
 
409
                                    'path stored in object metadata %(meta)s'),
 
410
                                  {'client': self.name,
 
411
                                   'meta': self.metadata['name']})
 
412
                raise DiskFileCollision('Client path does not match path '
 
413
                                        'stored in object metadata')
 
414
 
 
415
    def __iter__(self):
 
416
        """Returns an iterator over the data file."""
 
417
        try:
 
418
            dropped_cache = 0
 
419
            read = 0
 
420
            self.started_at_0 = False
 
421
            self.read_to_eof = False
 
422
            if self.fp.tell() == 0:
 
423
                self.started_at_0 = True
 
424
                self.iter_etag = hashlib.md5()
 
425
            while True:
 
426
                chunk = self.threadpool.run_in_thread(
 
427
                    self.fp.read, self.disk_chunk_size)
 
428
                if chunk:
 
429
                    if self.iter_etag:
 
430
                        self.iter_etag.update(chunk)
 
431
                    read += len(chunk)
 
432
                    if read - dropped_cache > (1024 * 1024):
 
433
                        self._drop_cache(self.fp.fileno(), dropped_cache,
 
434
                                         read - dropped_cache)
 
435
                        dropped_cache = read
 
436
                    yield chunk
 
437
                    if self.iter_hook:
 
438
                        self.iter_hook()
 
439
                else:
 
440
                    self.read_to_eof = True
 
441
                    self._drop_cache(self.fp.fileno(), dropped_cache,
 
442
                                     read - dropped_cache)
 
443
                    break
 
444
        finally:
 
445
            if not self.suppress_file_closing:
 
446
                self.close()
 
447
 
 
448
    def app_iter_range(self, start, stop):
 
449
        """Returns an iterator over the data file for range (start, stop)"""
 
450
        if start or start == 0:
 
451
            self.fp.seek(start)
 
452
        if stop is not None:
 
453
            length = stop - start
 
454
        else:
 
455
            length = None
 
456
        try:
 
457
            for chunk in self:
 
458
                if length is not None:
 
459
                    length -= len(chunk)
 
460
                    if length < 0:
 
461
                        # Chop off the extra:
 
462
                        yield chunk[:length]
 
463
                        break
 
464
                yield chunk
 
465
        finally:
 
466
            if not self.suppress_file_closing:
 
467
                self.close()
 
468
 
 
469
    def app_iter_ranges(self, ranges, content_type, boundary, size):
 
470
        """Returns an iterator over the data file for a set of ranges"""
 
471
        if not ranges:
 
472
            yield ''
 
473
        else:
 
474
            try:
 
475
                self.suppress_file_closing = True
 
476
                for chunk in multi_range_iterator(
 
477
                        ranges, content_type, boundary, size,
 
478
                        self.app_iter_range):
 
479
                    yield chunk
 
480
            finally:
 
481
                self.suppress_file_closing = False
 
482
                self.close()
 
483
 
 
484
    def _handle_close_quarantine(self):
 
485
        """Check if file needs to be quarantined"""
 
486
        try:
 
487
            self.get_data_file_size()
 
488
        except DiskFileNotExist:
 
489
            return
 
490
        except DiskFileError:
 
491
            self.quarantine()
 
492
            return
 
493
 
 
494
        if self.iter_etag and self.started_at_0 and self.read_to_eof and \
 
495
                'ETag' in self.metadata and \
 
496
                self.iter_etag.hexdigest() != self.metadata.get('ETag'):
 
497
            self.quarantine()
 
498
 
 
499
    def close(self, verify_file=True):
 
500
        """
 
501
        Close the file. Will handle quarantining file if necessary.
 
502
 
 
503
        :param verify_file: Defaults to True. If false, will not check
 
504
                            file to see if it needs quarantining.
 
505
        """
 
506
        if self.fp:
 
507
            try:
 
508
                if verify_file:
 
509
                    self._handle_close_quarantine()
 
510
            except (Exception, Timeout), e:
 
511
                self.logger.error(_(
 
512
                    'ERROR DiskFile %(data_file)s in '
 
513
                    '%(data_dir)s close failure: %(exc)s : %(stack)'),
 
514
                    {'exc': e, 'stack': ''.join(traceback.format_stack()),
 
515
                     'data_file': self.data_file, 'data_dir': self.datadir})
 
516
            finally:
 
517
                self.fp.close()
 
518
                self.fp = None
 
519
 
 
520
    def is_deleted(self):
 
521
        """
 
522
        Check if the file is deleted.
 
523
 
 
524
        :returns: True if the file doesn't exist or has been flagged as
 
525
                  deleted.
 
526
        """
 
527
        return not self.data_file or 'deleted' in self.metadata
 
528
 
 
529
    def is_expired(self):
 
530
        """
 
531
        Check if the file is expired.
 
532
 
 
533
        :returns: True if the file has an X-Delete-At in the past
 
534
        """
 
535
        return ('X-Delete-At' in self.metadata and
 
536
                int(self.metadata['X-Delete-At']) <= time.time())
 
537
 
 
538
    @contextmanager
 
539
    def writer(self, size=None):
 
540
        """
 
541
        Context manager to write a file. We create a temporary file first, and
 
542
        then return a DiskWriter object to encapsulate the state.
 
543
 
 
544
        :param size: optional initial size of file to explicitly allocate on
 
545
                     disk
 
546
        :raises DiskFileNoSpace: if a size is specified and allocation fails
 
547
        """
 
548
        if not exists(self.tmpdir):
 
549
            mkdirs(self.tmpdir)
 
550
        fd, tmppath = mkstemp(dir=self.tmpdir)
 
551
        try:
 
552
            if size is not None and size > 0:
 
553
                try:
 
554
                    fallocate(fd, size)
 
555
                except OSError:
 
556
                    raise DiskFileNoSpace()
 
557
            yield DiskWriter(self, fd, tmppath, self.threadpool)
 
558
        finally:
 
559
            try:
 
560
                os.close(fd)
 
561
            except OSError:
 
562
                pass
 
563
            try:
 
564
                os.unlink(tmppath)
 
565
            except OSError:
 
566
                pass
 
567
 
 
568
    def put_metadata(self, metadata, tombstone=False):
 
569
        """
 
570
        Short hand for putting metadata to .meta and .ts files.
 
571
 
 
572
        :param metadata: dictionary of metadata to be written
 
573
        :param tombstone: whether or not we are writing a tombstone
 
574
        """
 
575
        extension = '.ts' if tombstone else '.meta'
 
576
        with self.writer() as writer:
 
577
            writer.put(metadata, extension=extension)
 
578
 
 
579
    def unlinkold(self, timestamp):
 
580
        """
 
581
        Remove any older versions of the object file.  Any file that has an
 
582
        older timestamp than timestamp will be deleted.
 
583
 
 
584
        :param timestamp: timestamp to compare with each file
 
585
        """
 
586
        timestamp = normalize_timestamp(timestamp)
 
587
 
 
588
        def _unlinkold():
 
589
            for fname in os.listdir(self.datadir):
 
590
                if fname < timestamp:
 
591
                    try:
 
592
                        os.unlink(join(self.datadir, fname))
 
593
                    except OSError, err:    # pragma: no cover
 
594
                        if err.errno != errno.ENOENT:
 
595
                            raise
 
596
        self.threadpool.run_in_thread(_unlinkold)
 
597
 
 
598
    def _drop_cache(self, fd, offset, length):
 
599
        """Method for no-oping buffer cache drop method."""
 
600
        if not self.keep_cache:
 
601
            drop_buffer_cache(fd, offset, length)
 
602
 
 
603
    def quarantine(self):
 
604
        """
 
605
        In the case that a file is corrupted, move it to a quarantined
 
606
        area to allow replication to fix it.
 
607
 
 
608
        :returns: if quarantine is successful, path to quarantined
 
609
                  directory otherwise None
 
610
        """
 
611
        if not (self.is_deleted() or self.quarantined_dir):
 
612
            self.quarantined_dir = self.threadpool.run_in_thread(
 
613
                quarantine_renamer, self.device_path, self.data_file)
 
614
            self.logger.increment('quarantines')
 
615
            return self.quarantined_dir
 
616
 
 
617
    def get_data_file_size(self):
 
618
        """
 
619
        Returns the os.path.getsize for the file.  Raises an exception if this
 
620
        file does not match the Content-Length stored in the metadata. Or if
 
621
        self.data_file does not exist.
 
622
 
 
623
        :returns: file size as an int
 
624
        :raises DiskFileError: on file size mismatch.
 
625
        :raises DiskFileNotExist: on file not existing (including deleted)
 
626
        """
 
627
        try:
 
628
            file_size = 0
 
629
            if self.data_file:
 
630
                file_size = self.threadpool.run_in_thread(
 
631
                    getsize, self.data_file)
 
632
                if 'Content-Length' in self.metadata:
 
633
                    metadata_size = int(self.metadata['Content-Length'])
 
634
                    if file_size != metadata_size:
 
635
                        raise DiskFileError(
 
636
                            'Content-Length of %s does not match file size '
 
637
                            'of %s' % (metadata_size, file_size))
 
638
                return file_size
 
639
        except OSError, err:
 
640
            if err.errno != errno.ENOENT:
 
641
                raise
 
642
        raise DiskFileNotExist('Data File does not exist.')