~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/obj/replicator.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Soren Hansen, Chuck Short
  • Date: 2012-09-07 19:02:36 UTC
  • mfrom: (1.2.12)
  • Revision ID: package-import@ubuntu.com-20120907190236-fqrmbzm7v6zivs8d
Tags: 1.7.0-0ubuntu1
[ Soren Hansen ]
* Update debian/watch to account for symbolically named tarballs and
  use newer URL.
* Run unit tests at build time.
* Fix Launchpad URLs in debian/watch.

[ Chuck Short ]
* New upstream release
* debian/control: Add pubthon-moc as a build dep
* debian/rules: Dont fail if testsuite fails.

Show diffs side-by-side

added added

removed removed

Lines of Context:
166
166
 
167
167
    hashed = 0
168
168
    hashes_file = join(partition_dir, HASH_FILE)
169
 
    with lock_path(partition_dir):
170
 
        modified = False
171
 
        hashes = {}
172
 
        try:
173
 
            with open(hashes_file, 'rb') as fp:
174
 
                hashes = pickle.load(fp)
175
 
        except Exception:
176
 
            do_listdir = True
177
 
        if do_listdir:
178
 
            hashes = dict(((suff, hashes.get(suff, None))
179
 
                       for suff in os.listdir(partition_dir)
180
 
                       if len(suff) == 3 and isdir(join(partition_dir, suff))))
 
169
    modified = False
 
170
    hashes = {}
 
171
    mtime = -1
 
172
    try:
 
173
        with open(hashes_file, 'rb') as fp:
 
174
            hashes = pickle.load(fp)
 
175
        mtime = os.path.getmtime(hashes_file)
 
176
    except Exception:
 
177
        do_listdir = True
 
178
    if do_listdir:
 
179
        for suff in os.listdir(partition_dir):
 
180
            if len(suff) == 3 and isdir(join(partition_dir, suff)):
 
181
                hashes.setdefault(suff, None)
 
182
        modified = True
 
183
    hashes.update((hash_, None) for hash_ in recalculate)
 
184
    for suffix, hash_ in hashes.items():
 
185
        if not hash_:
 
186
            suffix_dir = join(partition_dir, suffix)
 
187
            if isdir(suffix_dir):
 
188
                try:
 
189
                    hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
 
190
                    hashed += 1
 
191
                except OSError:
 
192
                    logging.exception(_('Error hashing suffix'))
 
193
            else:
 
194
                del hashes[suffix]
181
195
            modified = True
182
 
        for hash_ in recalculate:
183
 
            hashes[hash_] = None
184
 
        for suffix, hash_ in hashes.items():
185
 
            if not hash_:
186
 
                suffix_dir = join(partition_dir, suffix)
187
 
                if os.path.exists(suffix_dir):
188
 
                    try:
189
 
                        hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
190
 
                        hashed += 1
191
 
                    except OSError:
192
 
                        logging.exception(_('Error hashing suffix'))
193
 
                        hashes[suffix] = None
194
 
                else:
195
 
                    del hashes[suffix]
196
 
                modified = True
197
 
                sleep()
198
 
        if modified:
199
 
            write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
 
196
    if modified:
 
197
        with lock_path(partition_dir):
 
198
            if not os.path.exists(hashes_file) or \
 
199
                        os.path.getmtime(hashes_file) == mtime:
 
200
                write_pickle(
 
201
                    hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
 
202
                return hashed, hashes
 
203
        return get_hashes(partition_dir, recalculate, do_listdir,
 
204
                          reclaim_age)
 
205
    else:
200
206
        return hashed, hashes
201
207
 
202
208
 
203
 
def tpooled_get_hashes(*args, **kwargs):
 
209
def tpool_reraise(func, *args, **kwargs):
204
210
    """
205
211
    Hack to work around Eventlet's tpool not catching and reraising Timeouts.
206
 
    We return the Timeout, Timeout if it's raised, the caller looks for it
207
 
    and reraises it if found.
208
212
    """
209
 
    try:
210
 
        return get_hashes(*args, **kwargs)
211
 
    except Timeout, err:
212
 
        return err, err
 
213
    def inner():
 
214
        try:
 
215
            return func(*args, **kwargs)
 
216
        except BaseException, err:
 
217
            return err
 
218
    resp = tpool.execute(inner)
 
219
    if isinstance(resp, BaseException):
 
220
        raise resp
 
221
    return resp
213
222
 
214
223
 
215
224
class ObjectReplicator(Daemon):
392
401
        self.logger.increment('partition.update.count.%s' % (job['device'],))
393
402
        begin = time.time()
394
403
        try:
395
 
            hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
 
404
            hashed, local_hash = tpool_reraise(get_hashes, job['path'],
396
405
                    do_listdir=(self.replication_count % 10) == 0,
397
406
                    reclaim_age=self.reclaim_age)
398
 
            # See tpooled_get_hashes "Hack".
399
 
            if isinstance(hashed, BaseException):
400
 
                raise hashed
401
407
            self.suffix_hash += hashed
402
408
            self.logger.update_stats('suffix.hashes', hashed)
403
409
            attempts_left = len(job['nodes'])
428
434
                            local_hash[suffix] != remote_hash.get(suffix, -1)]
429
435
                    if not suffixes:
430
436
                        continue
431
 
                    hashed, recalc_hash = tpool.execute(tpooled_get_hashes,
 
437
                    hashed, recalc_hash = tpool_reraise(get_hashes,
432
438
                        job['path'], recalculate=suffixes,
433
439
                        reclaim_age=self.reclaim_age)
434
 
                    # See tpooled_get_hashes "Hack".
435
 
                    if isinstance(hashed, BaseException):
436
 
                        raise hashed
437
440
                    self.logger.update_stats('suffix.hashes', hashed)
438
441
                    local_hash = recalc_hash
439
442
                    suffixes = [suffix for suffix in local_hash if