168
168
hashes_file = join(partition_dir, HASH_FILE)
169
with lock_path(partition_dir):
173
with open(hashes_file, 'rb') as fp:
174
hashes = pickle.load(fp)
178
hashes = dict(((suff, hashes.get(suff, None))
179
for suff in os.listdir(partition_dir)
180
if len(suff) == 3 and isdir(join(partition_dir, suff))))
173
with open(hashes_file, 'rb') as fp:
174
hashes = pickle.load(fp)
175
mtime = os.path.getmtime(hashes_file)
179
for suff in os.listdir(partition_dir):
180
if len(suff) == 3 and isdir(join(partition_dir, suff)):
181
hashes.setdefault(suff, None)
183
hashes.update((hash_, None) for hash_ in recalculate)
184
for suffix, hash_ in hashes.items():
186
suffix_dir = join(partition_dir, suffix)
187
if isdir(suffix_dir):
189
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
192
logging.exception(_('Error hashing suffix'))
182
for hash_ in recalculate:
184
for suffix, hash_ in hashes.items():
186
suffix_dir = join(partition_dir, suffix)
187
if os.path.exists(suffix_dir):
189
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
192
logging.exception(_('Error hashing suffix'))
193
hashes[suffix] = None
199
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
197
with lock_path(partition_dir):
198
if not os.path.exists(hashes_file) or \
199
os.path.getmtime(hashes_file) == mtime:
201
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
202
return hashed, hashes
203
return get_hashes(partition_dir, recalculate, do_listdir,
200
206
return hashed, hashes
203
def tpooled_get_hashes(*args, **kwargs):
209
def tpool_reraise(func, *args, **kwargs):
205
211
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
206
We return the Timeout, Timeout if it's raised, the caller looks for it
207
and reraises it if found.
210
return get_hashes(*args, **kwargs)
215
return func(*args, **kwargs)
216
except BaseException, err:
218
resp = tpool.execute(inner)
219
if isinstance(resp, BaseException):
215
224
class ObjectReplicator(Daemon):
392
401
self.logger.increment('partition.update.count.%s' % (job['device'],))
393
402
begin = time.time()
395
hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
404
hashed, local_hash = tpool_reraise(get_hashes, job['path'],
396
405
do_listdir=(self.replication_count % 10) == 0,
397
406
reclaim_age=self.reclaim_age)
398
# See tpooled_get_hashes "Hack".
399
if isinstance(hashed, BaseException):
401
407
self.suffix_hash += hashed
402
408
self.logger.update_stats('suffix.hashes', hashed)
403
409
attempts_left = len(job['nodes'])
428
434
local_hash[suffix] != remote_hash.get(suffix, -1)]
431
hashed, recalc_hash = tpool.execute(tpooled_get_hashes,
437
hashed, recalc_hash = tpool_reraise(get_hashes,
432
438
job['path'], recalculate=suffixes,
433
439
reclaim_age=self.reclaim_age)
434
# See tpooled_get_hashes "Hack".
435
if isinstance(hashed, BaseException):
437
440
self.logger.update_stats('suffix.hashes', hashed)
438
441
local_hash = recalc_hash
439
442
suffixes = [suffix for suffix in local_hash if