~ubuntu-branches/ubuntu/precise/swift/precise-updates

« back to all changes in this revision

Viewing changes to swift/common/db_replicator.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-03-09 13:26:07 UTC
  • mto: This revision was merged to the branch mainline in revision 33.
  • Revision ID: package-import@ubuntu.com-20120309132607-vq3donlbftjdspbx
Tags: upstream-1.4.7
ImportĀ upstreamĀ versionĀ 1.4.7

Show diffs side-by-side

added added

removed removed

Lines of Context:
112
112
        swift_dir = conf.get('swift_dir', '/etc/swift')
113
113
        self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file))
114
114
        self.per_diff = int(conf.get('per_diff', 1000))
115
 
        self.run_pause = int(conf.get('run_pause', 30))
 
115
        self.max_diffs = int(conf.get('max_diffs') or 100)
 
116
        self.interval = int(conf.get('interval') or
 
117
                            conf.get('run_pause') or 30)
116
118
        self.vm_test_mode = conf.get(
117
119
            'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
118
120
        self.node_timeout = int(conf.get('node_timeout', 10))
125
127
        self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
126
128
                      'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
127
129
                      'remove': 0, 'empty': 0, 'remote_merge': 0,
128
 
                      'start': time.time()}
 
130
                      'start': time.time(), 'diff_capped': 0}
129
131
 
130
132
    def _report_stats(self):
131
133
        """Report the current stats to the logs."""
141
143
            % self.stats)
142
144
        self.logger.info(' '.join(['%s:%s' % item for item in
143
145
             self.stats.items() if item[0] in
144
 
             ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')]))
 
146
             ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty',
 
147
              'diff_capped')]))
145
148
 
146
149
    def _rsync_file(self, db_file, remote_file, whole_file=True):
147
150
        """
215
218
        self.logger.debug(_('Syncing chunks with %s'), http.host)
216
219
        sync_table = broker.get_syncs()
217
220
        objects = broker.get_items_since(point, self.per_diff)
218
 
        while len(objects):
 
221
        diffs = 0
 
222
        while len(objects) and diffs < self.max_diffs:
 
223
            diffs += 1
219
224
            with Timeout(self.node_timeout):
220
225
                response = http.replicate('merge_items', objects, local_id)
221
226
            if not response or response.status >= 300 or response.status < 200:
226
231
                return False
227
232
            point = objects[-1]['ROWID']
228
233
            objects = broker.get_items_since(point, self.per_diff)
229
 
        with Timeout(self.node_timeout):
230
 
            response = http.replicate('merge_syncs', sync_table)
231
 
        if response and response.status >= 200 and response.status < 300:
232
 
            broker.merge_syncs([{'remote_id': remote_id,
233
 
                    'sync_point': point}], incoming=False)
234
 
            return True
 
234
        if objects:
 
235
            self.logger.debug(_('Synchronization for %s has fallen more than '
 
236
                '%s rows behind; moving on and will try again next pass.') %
 
237
                (broker.db_file, self.max_diffs * self.per_diff))
 
238
            self.stats['diff_capped'] += 1
 
239
        else:
 
240
            with Timeout(self.node_timeout):
 
241
                response = http.replicate('merge_syncs', sync_table)
 
242
            if response and response.status >= 200 and response.status < 300:
 
243
                broker.merge_syncs([{'remote_id': remote_id,
 
244
                        'sync_point': point}], incoming=False)
 
245
                return True
235
246
        return False
236
247
 
237
248
    def _in_sync(self, rinfo, info, broker, local_sync):
360
371
        responses = []
361
372
        nodes = self.ring.get_part_nodes(int(partition))
362
373
        shouldbehere = bool([n for n in nodes if n['id'] == node_id])
363
 
        repl_nodes = [n for n in nodes if n['id'] != node_id]
 
374
        # See Footnote [1] for an explanation of the repl_nodes assignment.
 
375
        i = 0
 
376
        while i < len(nodes) and nodes[i]['id'] != node_id:
 
377
            i += 1
 
378
        repl_nodes = nodes[i + 1:] + nodes[:i]
364
379
        more_nodes = self.ring.get_more_nodes(int(partition))
365
380
        for node in repl_nodes:
366
381
            success = False
439
454
        """
440
455
        Replicate dbs under the given root in an infinite loop.
441
456
        """
 
457
        sleep(random.random() * self.interval)
442
458
        while True:
 
459
            begin = time.time()
443
460
            try:
444
461
                self.run_once()
445
462
            except (Exception, Timeout):
446
463
                self.logger.exception(_('ERROR trying to replicate'))
447
 
            sleep(self.run_pause)
 
464
            elapsed = time.time() - begin
 
465
            if elapsed < self.interval:
 
466
                sleep(self.interval - elapsed)
448
467
 
449
468
 
450
469
class ReplicatorRpc(object):
565
584
        new_broker.newid(args[0])
566
585
        renamer(old_filename, db_file)
567
586
        return HTTPNoContent()
 
587
 
 
588
# Footnote [1]:
 
589
#   This orders the nodes so that, given nodes a b c, a will contact b then c,
 
590
# b will contact c then a, and c will contact a then b -- in other words, each
 
591
# node will always contact the next node in the list first.
 
592
#   This helps in the case where databases are all way out of sync, so each
 
593
# node is likely to be sending to a different node than it's receiving from,
 
594
# rather than two nodes talking to each other, starving out the third.
 
595
#   If the third didn't even have a copy and the first two nodes were way out
 
596
# of sync, such starvation would mean the third node wouldn't get any copy
 
597
# until the first two nodes finally got in sync, which could take a while.
 
598
#   This new ordering ensures such starvation doesn't occur, making the data
 
599
# more durable.