286
286
self.logger.increment('failures')
288
288
stop_at = time() + self.container_time
289
next_sync_point = None
289
290
while time() < stop_at and sync_point2 < sync_point1:
290
291
rows = broker.get_items_since(sync_point2, 1)
296
297
key = hash_path(info['account'], info['container'],
297
298
row['name'], raw_digest=True)
298
299
# This node will only initially sync out one third of the
299
# objects (if 3 replicas, 1/4 if 4, etc.). This section
300
# will attempt to sync previously skipped rows in case the
301
# other nodes didn't succeed.
302
if unpack_from('>I', key)[0] % \
303
len(nodes) != ordinal:
304
if not self.container_sync_row(row, sync_to, sync_key,
300
# objects (if 3 replicas, 1/4 if 4, etc.) and will skip
301
# problematic rows as needed in case of faults.
302
# This section will attempt to sync previously skipped
303
# rows in case the previous attempts by any of the nodes
305
if not self.container_sync_row(row, sync_to, sync_key,
307
if not next_sync_point:
308
next_sync_point = sync_point2
307
309
sync_point2 = row['ROWID']
308
310
broker.set_x_container_sync_points(None, sync_point2)
312
broker.set_x_container_sync_points(None, next_sync_point)
309
313
while time() < stop_at:
310
314
rows = broker.get_items_since(sync_point1, 1)
317
321
# objects (if 3 replicas, 1/4 if 4, etc.). It'll come back
318
322
# around to the section above and attempt to sync
319
323
# previously skipped rows in case the other nodes didn't
324
# succeed or in case it failed to do so the first time.
321
325
if unpack_from('>I', key)[0] % \
322
326
len(nodes) == ordinal:
323
if not self.container_sync_row(row, sync_to, sync_key,
327
self.container_sync_row(row, sync_to, sync_key,
326
329
sync_point1 = row['ROWID']
327
330
broker.set_x_container_sync_points(sync_point1, None)
328
331
self.container_syncs += 1
420
423
'sync_to': sync_to})
421
424
elif err.http_status == HTTP_NOT_FOUND:
422
425
self.logger.info(
423
_('Not found %(sync_from)r => %(sync_to)r'),
426
_('Not found %(sync_from)r => %(sync_to)r \
427
- object %(obj_name)r'),
424
428
{'sync_from': '%s/%s' %
425
429
(quote(info['account']), quote(info['container'])),
430
'sync_to': sync_to, 'obj_name': row['name']})
428
432
self.logger.exception(
429
433
_('ERROR Syncing %(db_file)s %(row)s'),