1
# Copyright 2013 VMware, Inc.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
18
from oslo.serialization import jsonutils
19
from oslo.utils import timeutils
21
from neutron.common import constants
22
from neutron.common import exceptions
23
from neutron import context
24
from neutron.db import external_net_db
25
from neutron.db import l3_db
26
from neutron.db import models_v2
27
from neutron.extensions import l3
28
from neutron.i18n import _LE, _LI, _LW
29
from neutron.openstack.common import log
30
from neutron.openstack.common import loopingcall
31
from neutron.plugins.vmware.api_client import exception as api_exc
32
from neutron.plugins.vmware.common import exceptions as nsx_exc
33
from neutron.plugins.vmware.common import nsx_utils
34
from neutron.plugins.vmware import nsxlib
35
from neutron.plugins.vmware.nsxlib import router as routerlib
36
from neutron.plugins.vmware.nsxlib import switch as switchlib
38
# Maximum page size for a single request
39
# NOTE(salv-orlando): This might become a version-dependent map should the
40
# limit be raised in future versions
43
LOG = log.getLogger(__name__)
46
class NsxCache(object):
47
"""A simple Cache for NSX resources.
49
Associates resource id with resource hash to rapidly identify
51
Each entry in the cache also stores the following information:
52
- changed: the resource in the cache has been altered following
54
- hit: the resource has been visited during an update (and possibly
56
- data: current resource data
57
- data_bk: backup of resource data prior to its removal
61
# Maps a uuid to the dict containing it
62
self._uuid_dict_mappings = {}
63
# Dicts for NSX cached resources
65
self._lswitchports = {}
68
def __getitem__(self, key):
69
# uuids are unique across the various types of resources
70
# TODO(salv-orlando): Avoid lookups over all dictionaries
71
# when retrieving items
72
# Fetch lswitches, lports, or lrouters
73
resources = self._uuid_dict_mappings[key]
76
def _clear_changed_flag_and_remove_from_cache(self, resources):
77
# Clear the 'changed' attribute for all items
78
for uuid, item in resources.items():
79
if item.pop('changed', None) and not item.get('data'):
80
# The item is not anymore in NSX, so delete it
82
del self._uuid_dict_mappings[uuid]
83
LOG.debug("Removed item %s from NSX object cache", uuid)
85
def _update_resources(self, resources, new_resources, clear_changed=True):
87
self._clear_changed_flag_and_remove_from_cache(resources)
90
return hash(jsonutils.dumps(item))
92
# Parse new data and identify new, deleted, and updated resources
93
for item in new_resources:
94
item_id = item['uuid']
95
if resources.get(item_id):
96
new_hash = do_hash(item)
97
if new_hash != resources[item_id]['hash']:
98
resources[item_id]['hash'] = new_hash
99
resources[item_id]['changed'] = True
100
resources[item_id]['data_bk'] = (
101
resources[item_id]['data'])
102
resources[item_id]['data'] = item
103
# Mark the item as hit in any case
104
resources[item_id]['hit'] = True
105
LOG.debug("Updating item %s in NSX object cache", item_id)
107
resources[item_id] = {'hash': do_hash(item)}
108
resources[item_id]['hit'] = True
109
resources[item_id]['changed'] = True
110
resources[item_id]['data'] = item
111
# add a uuid to dict mapping for easy retrieval
113
self._uuid_dict_mappings[item_id] = resources
114
LOG.debug("Added item %s to NSX object cache", item_id)
116
def _delete_resources(self, resources):
117
# Mark for removal all the elements which have not been visited.
118
# And clear the 'hit' attribute.
119
for to_delete in [k for (k, v) in resources.iteritems()
120
if not v.pop('hit', False)]:
121
resources[to_delete]['changed'] = True
122
resources[to_delete]['data_bk'] = (
123
resources[to_delete].pop('data', None))
125
def _get_resource_ids(self, resources, changed_only):
127
return [k for (k, v) in resources.iteritems()
129
return resources.keys()
131
def get_lswitches(self, changed_only=False):
132
return self._get_resource_ids(self._lswitches, changed_only)
134
def get_lrouters(self, changed_only=False):
135
return self._get_resource_ids(self._lrouters, changed_only)
137
def get_lswitchports(self, changed_only=False):
138
return self._get_resource_ids(self._lswitchports, changed_only)
140
def update_lswitch(self, lswitch):
141
self._update_resources(self._lswitches, [lswitch], clear_changed=False)
143
def update_lrouter(self, lrouter):
144
self._update_resources(self._lrouters, [lrouter], clear_changed=False)
146
def update_lswitchport(self, lswitchport):
147
self._update_resources(self._lswitchports, [lswitchport],
150
def process_updates(self, lswitches=None,
151
lrouters=None, lswitchports=None):
152
self._update_resources(self._lswitches, lswitches)
153
self._update_resources(self._lrouters, lrouters)
154
self._update_resources(self._lswitchports, lswitchports)
155
return (self._get_resource_ids(self._lswitches, changed_only=True),
156
self._get_resource_ids(self._lrouters, changed_only=True),
157
self._get_resource_ids(self._lswitchports, changed_only=True))
159
def process_deletes(self):
160
self._delete_resources(self._lswitches)
161
self._delete_resources(self._lrouters)
162
self._delete_resources(self._lswitchports)
163
return (self._get_resource_ids(self._lswitches, changed_only=True),
164
self._get_resource_ids(self._lrouters, changed_only=True),
165
self._get_resource_ids(self._lswitchports, changed_only=True))
168
class SyncParameters(object):
169
"""Defines attributes used by the synchronization procedure.
171
chunk_size: Actual chunk size
172
extra_chunk_size: Additional data to fetch because of chunk size
174
current_chunk: Counter of the current data chunk being synchronized
175
Page cursors: markers for the next resource to fetch.
176
'start' means page cursor unset for fetching 1st page
177
init_sync_performed: True if the initial synchronization concluded
180
def __init__(self, min_chunk_size):
181
self.chunk_size = min_chunk_size
182
self.extra_chunk_size = 0
183
self.current_chunk = 0
184
self.ls_cursor = 'start'
185
self.lr_cursor = 'start'
186
self.lp_cursor = 'start'
187
self.init_sync_performed = False
191
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
192
"""Start a loopingcall for the synchronization task."""
193
# Start a looping call to synchronize operational status
194
# for neutron resources
195
if not state_sync_interval:
196
# do not start the looping call if specified
199
state_synchronizer = loopingcall.DynamicLoopingCall(
200
func, sp=SyncParameters(min_chunk_size))
201
state_synchronizer.start(
202
periodic_interval_max=state_sync_interval)
203
return state_synchronizer
206
class NsxSynchronizer(object):
208
LS_URI = nsxlib._build_uri_path(
209
switchlib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
210
relations='LogicalSwitchStatus')
211
LR_URI = nsxlib._build_uri_path(
212
routerlib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
213
relations='LogicalRouterStatus')
214
LP_URI = nsxlib._build_uri_path(
215
switchlib.LSWITCHPORT_RESOURCE,
216
parent_resource_id='*',
217
fields='uuid,tags,fabric_status_up',
218
relations='LogicalPortStatus')
220
def __init__(self, plugin, cluster, state_sync_interval,
221
req_delay, min_chunk_size, max_rand_delay=0):
223
self._nsx_cache = NsxCache()
224
# Store parameters as instance members
225
# NOTE(salv-orlando): apologies if it looks java-ish
226
self._plugin = plugin
227
self._cluster = cluster
228
self._req_delay = req_delay
229
self._sync_interval = state_sync_interval
230
self._max_rand_delay = max_rand_delay
231
# Validate parameters
232
if self._sync_interval < self._req_delay:
233
err_msg = (_("Minimum request delay:%(req_delay)s must not "
234
"exceed synchronization interval:%(sync_interval)s") %
235
{'req_delay': self._req_delay,
236
'sync_interval': self._sync_interval})
238
raise nsx_exc.NsxPluginException(err_msg=err_msg)
239
# Backoff time in case of failures while fetching sync data
240
self._sync_backoff = 1
241
# Store the looping call in an instance variable to allow unit tests
242
# for controlling its lifecycle
243
self._sync_looping_call = _start_loopingcall(
244
min_chunk_size, state_sync_interval, self._synchronize_state)
246
def _get_tag_dict(self, tags):
247
return dict((tag.get('scope'), tag['tag']) for tag in tags)
249
def synchronize_network(self, context, neutron_network_data,
251
"""Synchronize a Neutron network with its NSX counterpart.
253
This routine synchronizes a set of switches when a Neutron
254
network is mapped to multiple lswitches.
257
# Try to get logical switches from nsx
259
lswitches = nsx_utils.fetch_nsx_switches(
260
context.session, self._cluster,
261
neutron_network_data['id'])
262
except exceptions.NetworkNotFound:
263
# TODO(salv-orlando): We should be catching
264
# api_exc.ResourceNotFound here
265
# The logical switch was not found
266
LOG.warning(_LW("Logical switch for neutron network %s not "
267
"found on NSX."), neutron_network_data['id'])
270
for lswitch in lswitches:
271
self._nsx_cache.update_lswitch(lswitch)
272
# By default assume things go wrong
273
status = constants.NET_STATUS_ERROR
274
# In most cases lswitches will contain a single element
277
# Logical switch was deleted
279
ls_status = ls['_relations']['LogicalSwitchStatus']
280
if not ls_status['fabric_status']:
281
status = constants.NET_STATUS_DOWN
284
# No switch was down or missing. Set status to ACTIVE unless
285
# there were no switches in the first place!
287
status = constants.NET_STATUS_ACTIVE
289
if status == neutron_network_data['status']:
293
with context.session.begin(subtransactions=True):
295
network = self._plugin._get_network(context,
296
neutron_network_data['id'])
297
except exceptions.NetworkNotFound:
300
network.status = status
301
LOG.debug("Updating status for neutron resource %(q_id)s to:"
303
{'q_id': neutron_network_data['id'],
306
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
307
if not ls_uuids and not scan_missing:
309
neutron_net_ids = set()
310
neutron_nsx_mappings = {}
311
# TODO(salvatore-orlando): Deal with the case the tag
312
# has been tampered with
313
for ls_uuid in ls_uuids:
314
# If the lswitch has been deleted, get backup copy of data
315
lswitch = (self._nsx_cache[ls_uuid].get('data') or
316
self._nsx_cache[ls_uuid].get('data_bk'))
317
tags = self._get_tag_dict(lswitch['tags'])
318
neutron_id = tags.get('quantum_net_id')
319
neutron_net_ids.add(neutron_id)
320
neutron_nsx_mappings[neutron_id] = (
321
neutron_nsx_mappings.get(neutron_id, []) +
322
[self._nsx_cache[ls_uuid]])
323
# Fetch neutron networks from database
324
filters = {'router:external': [False]}
326
filters['id'] = neutron_net_ids
328
networks = self._plugin._get_collection(
329
ctx, models_v2.Network, self._plugin._make_network_dict,
332
for network in networks:
333
lswitches = neutron_nsx_mappings.get(network['id'], [])
334
lswitches = [lsw.get('data') for lsw in lswitches]
335
self.synchronize_network(ctx, network, lswitches)
337
def synchronize_router(self, context, neutron_router_data,
339
"""Synchronize a neutron router with its NSX counterpart."""
341
# Try to get router from nsx
343
# This query will return the logical router status too
344
nsx_router_id = nsx_utils.get_nsx_router_id(
345
context.session, self._cluster, neutron_router_data['id'])
347
lrouter = routerlib.get_lrouter(
348
self._cluster, nsx_router_id)
349
except exceptions.NotFound:
350
# NOTE(salv-orlando): We should be catching
351
# api_exc.ResourceNotFound here
352
# The logical router was not found
353
LOG.warning(_LW("Logical router for neutron router %s not "
354
"found on NSX."), neutron_router_data['id'])
357
self._nsx_cache.update_lrouter(lrouter)
359
# Note(salv-orlando): It might worth adding a check to verify neutron
360
# resource tag in nsx entity matches a Neutron id.
361
# By default assume things go wrong
362
status = constants.NET_STATUS_ERROR
364
lr_status = (lrouter['_relations']
365
['LogicalRouterStatus']
367
status = (lr_status and
368
constants.NET_STATUS_ACTIVE
369
or constants.NET_STATUS_DOWN)
371
if status == neutron_router_data['status']:
375
with context.session.begin(subtransactions=True):
377
router = self._plugin._get_router(context,
378
neutron_router_data['id'])
379
except l3.RouterNotFound:
382
router.status = status
383
LOG.debug("Updating status for neutron resource %(q_id)s to:"
385
{'q_id': neutron_router_data['id'],
388
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
389
if not lr_uuids and not scan_missing:
391
# TODO(salvatore-orlando): Deal with the case the tag
392
# has been tampered with
393
neutron_router_mappings = {}
394
for lr_uuid in lr_uuids:
395
lrouter = (self._nsx_cache[lr_uuid].get('data') or
396
self._nsx_cache[lr_uuid].get('data_bk'))
397
tags = self._get_tag_dict(lrouter['tags'])
398
neutron_router_id = tags.get('q_router_id')
399
if neutron_router_id:
400
neutron_router_mappings[neutron_router_id] = (
401
self._nsx_cache[lr_uuid])
403
LOG.warn(_LW("Unable to find Neutron router id for "
404
"NSX logical router: %s"), lr_uuid)
405
# Fetch neutron routers from database
406
filters = ({} if scan_missing else
407
{'id': neutron_router_mappings.keys()})
408
routers = self._plugin._get_collection(
409
ctx, l3_db.Router, self._plugin._make_router_dict,
411
for router in routers:
412
lrouter = neutron_router_mappings.get(router['id'])
413
self.synchronize_router(
414
ctx, router, lrouter and lrouter.get('data'))
416
def synchronize_port(self, context, neutron_port_data,
417
lswitchport=None, ext_networks=None):
418
"""Synchronize a Neutron port with its NSX counterpart."""
419
# Skip synchronization for ports on external networks
421
ext_networks = [net['id'] for net in context.session.query(
422
models_v2.Network).join(
423
external_net_db.ExternalNetwork,
424
(models_v2.Network.id ==
425
external_net_db.ExternalNetwork.network_id))]
426
if neutron_port_data['network_id'] in ext_networks:
427
with context.session.begin(subtransactions=True):
428
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
432
# Try to get port from nsx
434
ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
435
context.session, self._cluster, neutron_port_data['id'])
437
lswitchport = switchlib.get_port(
438
self._cluster, ls_uuid, lp_uuid,
439
relations='LogicalPortStatus')
440
except (exceptions.PortNotFoundOnNetwork):
441
# NOTE(salv-orlando): We should be catching
442
# api_exc.ResourceNotFound here instead
443
# of PortNotFoundOnNetwork when the id exists but
444
# the logical switch port was not found
445
LOG.warning(_LW("Logical switch port for neutron port %s "
446
"not found on NSX."), neutron_port_data['id'])
449
# If lswitchport is not None, update the cache.
450
# It could be none if the port was deleted from the backend
452
self._nsx_cache.update_lswitchport(lswitchport)
453
# Note(salv-orlando): It might worth adding a check to verify neutron
454
# resource tag in nsx entity matches Neutron id.
455
# By default assume things go wrong
456
status = constants.PORT_STATUS_ERROR
458
lp_status = (lswitchport['_relations']
459
['LogicalPortStatus']
460
['fabric_status_up'])
461
status = (lp_status and
462
constants.PORT_STATUS_ACTIVE
463
or constants.PORT_STATUS_DOWN)
466
if status == neutron_port_data['status']:
470
with context.session.begin(subtransactions=True):
472
port = self._plugin._get_port(context,
473
neutron_port_data['id'])
474
except exceptions.PortNotFound:
478
LOG.debug("Updating status for neutron resource %(q_id)s to:"
480
{'q_id': neutron_port_data['id'],
483
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
484
if not lp_uuids and not scan_missing:
486
# Find Neutron port id by tag - the tag is already
487
# loaded in memory, no reason for doing a db query
488
# TODO(salvatore-orlando): Deal with the case the tag
489
# has been tampered with
490
neutron_port_mappings = {}
491
for lp_uuid in lp_uuids:
492
lport = (self._nsx_cache[lp_uuid].get('data') or
493
self._nsx_cache[lp_uuid].get('data_bk'))
494
tags = self._get_tag_dict(lport['tags'])
495
neutron_port_id = tags.get('q_port_id')
497
neutron_port_mappings[neutron_port_id] = (
498
self._nsx_cache[lp_uuid])
499
# Fetch neutron ports from database
500
# At the first sync we need to fetch all ports
501
filters = ({} if scan_missing else
502
{'id': neutron_port_mappings.keys()})
503
# TODO(salv-orlando): Work out a solution for avoiding
505
ext_nets = [net['id'] for net in ctx.session.query(
506
models_v2.Network).join(
507
external_net_db.ExternalNetwork,
508
(models_v2.Network.id ==
509
external_net_db.ExternalNetwork.network_id))]
510
ports = self._plugin._get_collection(
511
ctx, models_v2.Port, self._plugin._make_port_dict,
514
lswitchport = neutron_port_mappings.get(port['id'])
515
self.synchronize_port(
516
ctx, port, lswitchport and lswitchport.get('data'),
517
ext_networks=ext_nets)
519
def _get_chunk_size(self, sp):
520
# NOTE(salv-orlando): Try to use __future__ for this routine only?
521
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
522
(float(self._sync_interval) / float(self._req_delay)))
523
new_size = max(1.0, ratio) * float(sp.chunk_size)
524
return int(new_size) + (new_size - int(new_size) > 0)
526
def _fetch_data(self, uri, cursor, page_size):
527
# If not cursor there is nothing to retrieve
529
if cursor == 'start':
531
# Chunk size tuning might, in some conditions, make it larger
532
# than 5,000, which is the maximum page size allowed by the NSX
533
# API. In this case the request should be split in multiple
534
# requests. This is not ideal, and therefore a log warning will
536
num_requests = page_size / (MAX_PAGE_SIZE + 1) + 1
538
LOG.warn(_LW("Requested page size is %(cur_chunk_size)d. "
539
"It might be necessary to do %(num_requests)d "
540
"round-trips to NSX for fetching data. Please "
541
"tune sync parameters to ensure chunk size "
542
"is less than %(max_page_size)d"),
543
{'cur_chunk_size': page_size,
544
'num_requests': num_requests,
545
'max_page_size': MAX_PAGE_SIZE})
546
# Only the first request might return the total size,
547
# subsequent requests will definetely not
548
results, cursor, total_size = nsxlib.get_single_query_page(
549
uri, self._cluster, cursor,
550
min(page_size, MAX_PAGE_SIZE))
551
for _req in range(num_requests - 1):
552
# If no cursor is returned break the cycle as there is no
553
# actual need to perform multiple requests (all fetched)
554
# This happens when the overall size of resources exceeds
555
# the maximum page size, but the number for each single
556
# resource type is below this threshold
559
req_results, cursor = nsxlib.get_single_query_page(
560
uri, self._cluster, cursor,
561
min(page_size, MAX_PAGE_SIZE))[:2]
562
results.extend(req_results)
563
# reset cursor before returning if we queried just to
564
# know the number of entities
565
return results, cursor if page_size else 'start', total_size
566
return [], cursor, None
568
def _fetch_nsx_data_chunk(self, sp):
569
base_chunk_size = sp.chunk_size
570
chunk_size = base_chunk_size + sp.extra_chunk_size
571
LOG.info(_LI("Fetching up to %s resources "
572
"from NSX backend"), chunk_size)
573
fetched = ls_count = lr_count = lp_count = 0
574
lswitches = lrouters = lswitchports = []
575
if sp.ls_cursor or sp.ls_cursor == 'start':
576
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
577
self.LS_URI, sp.ls_cursor, chunk_size)
578
fetched = len(lswitches)
579
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
580
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
581
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
582
fetched += len(lrouters)
583
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
584
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
585
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
586
fetched += len(lswitchports)
587
if sp.current_chunk == 0:
588
# No cursors were provided. Then it must be possible to
589
# calculate the total amount of data to fetch
590
sp.total_size = ls_count + lr_count + lp_count
591
LOG.debug("Total data size: %d", sp.total_size)
592
sp.chunk_size = self._get_chunk_size(sp)
593
# Calculate chunk size adjustment
594
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
595
LOG.debug("Fetched %(num_lswitches)d logical switches, "
596
"%(num_lswitchports)d logical switch ports,"
597
"%(num_lrouters)d logical routers",
598
{'num_lswitches': len(lswitches),
599
'num_lswitchports': len(lswitchports),
600
'num_lrouters': len(lrouters)})
601
return (lswitches, lrouters, lswitchports)
603
def _synchronize_state(self, sp):
604
# If the plugin has been destroyed, stop the LoopingCall
606
raise loopingcall.LoopingCallDone()
607
start = timeutils.utcnow()
608
# Reset page cursor variables if necessary
609
if sp.current_chunk == 0:
610
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
611
LOG.info(_LI("Running state synchronization task. Chunk: %s"),
613
# Fetch chunk_size data from NSX
615
(lswitches, lrouters, lswitchports) = (
616
self._fetch_nsx_data_chunk(sp))
617
except (api_exc.RequestTimeout, api_exc.NsxApiException):
618
sleep_interval = self._sync_backoff
619
# Cap max back off to 64 seconds
620
self._sync_backoff = min(self._sync_backoff * 2, 64)
621
LOG.exception(_LE("An error occurred while communicating with "
622
"NSX backend. Will retry synchronization "
623
"in %d seconds"), sleep_interval)
624
return sleep_interval
625
LOG.debug("Time elapsed querying NSX: %s",
626
timeutils.utcnow() - start)
628
num_chunks = ((sp.total_size / sp.chunk_size) +
629
(sp.total_size % sp.chunk_size != 0))
632
LOG.debug("Number of chunks: %d", num_chunks)
633
# Find objects which have changed on NSX side and need
635
LOG.debug("Processing NSX cache for updated objects")
636
(ls_uuids, lr_uuids, lp_uuids) = self._nsx_cache.process_updates(
637
lswitches, lrouters, lswitchports)
638
# Process removed objects only at the last chunk
639
scan_missing = (sp.current_chunk == num_chunks - 1 and
640
not sp.init_sync_performed)
641
if sp.current_chunk == num_chunks - 1:
642
LOG.debug("Processing NSX cache for deleted objects")
643
self._nsx_cache.process_deletes()
644
ls_uuids = self._nsx_cache.get_lswitches(
645
changed_only=not scan_missing)
646
lr_uuids = self._nsx_cache.get_lrouters(
647
changed_only=not scan_missing)
648
lp_uuids = self._nsx_cache.get_lswitchports(
649
changed_only=not scan_missing)
650
LOG.debug("Time elapsed hashing data: %s",
651
timeutils.utcnow() - start)
652
# Get an admin context
653
ctx = context.get_admin_context()
654
# Synchronize with database
655
self._synchronize_lswitches(ctx, ls_uuids,
656
scan_missing=scan_missing)
657
self._synchronize_lrouters(ctx, lr_uuids,
658
scan_missing=scan_missing)
659
self._synchronize_lswitchports(ctx, lp_uuids,
660
scan_missing=scan_missing)
661
# Increase chunk counter
662
LOG.info(_LI("Synchronization for chunk %(chunk_num)d of "
663
"%(total_chunks)d performed"),
664
{'chunk_num': sp.current_chunk + 1,
665
'total_chunks': num_chunks})
666
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
668
if sp.current_chunk == 0:
669
# Ensure init_sync_performed is True
670
if not sp.init_sync_performed:
671
sp.init_sync_performed = True
672
# Add additional random delay
673
added_delay = random.randint(0, self._max_rand_delay)
674
LOG.debug("Time elapsed at end of sync: %s",
675
timeutils.utcnow() - start)
676
return self._sync_interval / num_chunks + added_delay