335
335
:raises RingValidationError: problem was found with the ring.
338
# "len" showed up in profling, so it's just computed once.
339
dev_len = len(self.devs)
338
340
if sum(d['parts'] for d in self._iter_devs()) != \
339
341
self.parts * self.replicas:
340
342
raise exceptions.RingValidationError(
345
347
# dev_usage[dev_id] will equal the number of partitions assigned to
347
dev_usage = array('I', (0 for _junk in xrange(len(self.devs))))
349
dev_usage = array('I', (0 for _junk in xrange(dev_len)))
348
350
for part2dev in self._replica2part2dev:
349
351
for dev_id in part2dev:
350
352
dev_usage[dev_id] += 1
352
354
for part in xrange(self.parts):
353
355
for replica in xrange(self.replicas):
354
356
dev_id = self._replica2part2dev[replica][part]
355
if dev_id >= len(self.devs) or not self.devs[dev_id]:
357
if dev_id >= dev_len or not self.devs[dev_id]:
356
358
raise exceptions.RingValidationError(
357
359
"Partition %d, replica %d was not allocated "
478
480
elapsed_hours = int(time() - self._last_part_moves_epoch) / 3600
479
481
for part in xrange(self.parts):
480
self._last_part_moves[part] = \
481
min(self._last_part_moves[part] + elapsed_hours, 0xff)
482
# The "min(self._last_part_moves[part] + elapsed_hours, 0xff)"
483
# which was here showed up in profiling, so it got inlined.
484
last_plus_elapsed = self._last_part_moves[part] + elapsed_hours
485
if last_plus_elapsed < 0xff:
486
self._last_part_moves[part] = last_plus_elapsed
488
self._last_part_moves[part] = 0xff
482
489
self._last_part_moves_epoch = int(time())
484
491
def _gather_reassign_parts(self):
487
494
gathering from removed devices, insufficiently-far-apart replicas, and
488
495
overweight drives.
497
# inline memoization of tiers_for_dev() results (profiling reveals it
490
501
# First we gather partitions from removed devices. Since removed
491
502
# devices usually indicate device failures, we have no choice but to
492
503
# reassign these partitions. However, we mark them as moved so later
514
525
# First, add up the count of replicas at each tier for each
516
replicas_at_tier = defaultdict(lambda: 0)
527
# replicas_at_tier was a "lambda: 0" defaultdict, but profiling
528
# revealed the lambda invocation as a significant cost.
529
replicas_at_tier = {}
517
530
for replica in xrange(self.replicas):
518
531
dev = self.devs[self._replica2part2dev[replica][part]]
519
for tier in tiers_for_dev(dev):
520
replicas_at_tier[tier] += 1
532
if dev['id'] not in tfd:
533
tfd[dev['id']] = tiers_for_dev(dev)
534
for tier in tfd[dev['id']]:
535
if tier not in replicas_at_tier:
536
replicas_at_tier[tier] = 1
538
replicas_at_tier[tier] += 1
522
540
# Now, look for partitions not yet spread out enough and not
523
541
# recently moved.
524
542
for replica in xrange(self.replicas):
525
543
dev = self.devs[self._replica2part2dev[replica][part]]
526
544
removed_replica = False
527
for tier in tiers_for_dev(dev):
528
if (replicas_at_tier[tier] > max_allowed_replicas[tier] and
545
if dev['id'] not in tfd:
546
tfd[dev['id']] = tiers_for_dev(dev)
547
for tier in tfd[dev['id']]:
549
if tier in replicas_at_tier:
550
rep_at_tier = replicas_at_tier[tier]
551
if (rep_at_tier > max_allowed_replicas[tier] and
529
552
self._last_part_moves[part] >=
530
553
self.min_part_hours):
531
554
self._last_part_moves[part] = 0
535
558
removed_replica = True
537
560
if removed_replica:
538
for tier in tiers_for_dev(dev):
561
if dev['id'] not in tfd:
562
tfd[dev['id']] = tiers_for_dev(dev)
563
for tier in tfd[dev['id']]:
539
564
replicas_at_tier[tier] -= 1
541
566
# Last, we gather partitions from devices that are "overweight" because