~ubuntu-branches/ubuntu/vivid/neutron/vivid-proposed

« back to all changes in this revision

Viewing changes to neutron/plugins/vmware/common/sync.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:17:19 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150330111719-h0gx7233p4jkkgfh
Tags: 1:2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirements with upstream.
  - d/control: Add new dependency on oslo-log.
  - d/p/*: Rebase.
  - d/control,d/neutron-plugin-hyperv*: Dropped, decomposed into
    separate project upstream.
  - d/control,d/neutron-plugin-openflow*: Dropped, decomposed into
    separate project upstream.
  - d/neutron-common.install: Add neutron-rootwrap-daemon and 
    neutron-keepalived-state-change binaries.
  - d/rules: Ignore neutron-hyperv-agent when installing; only for Windows.
  - d/neutron-plugin-cisco.install: Drop neutron-cisco-cfg-agent as
    decomposed into separate project upstream.
  - d/neutron-plugin-vmware.install: Drop neutron-check-nsx-config and
    neutron-nsx-manage as decomposed into separate project upstream.
  - d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent.
* d/pydist-overrides: Add overrides for oslo packages.
* d/control: Fixup type in package description (LP: #1263539).
* d/p/fixup-driver-test-execution.patch: Cherry pick fix from upstream VCS
  to support unit test exection in out-of-tree vendor drivers.
* d/neutron-common.postinst: Allow general access to /etc/neutron but limit
  access to root/neutron to /etc/neutron/neutron.conf to support execution
  of unit tests in decomposed vendor drivers.
* d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent
  package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2013 VMware, Inc.
2
 
# All Rights Reserved
3
 
#
4
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5
 
#    not use this file except in compliance with the License. You may obtain
6
 
#    a copy of the License at
7
 
#
8
 
#         http://www.apache.org/licenses/LICENSE-2.0
9
 
#
10
 
#    Unless required by applicable law or agreed to in writing, software
11
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
 
#    License for the specific language governing permissions and limitations
14
 
#    under the License.
15
 
 
16
 
import random
17
 
 
18
 
from oslo.serialization import jsonutils
19
 
from oslo.utils import timeutils
20
 
 
21
 
from neutron.common import constants
22
 
from neutron.common import exceptions
23
 
from neutron import context
24
 
from neutron.db import external_net_db
25
 
from neutron.db import l3_db
26
 
from neutron.db import models_v2
27
 
from neutron.extensions import l3
28
 
from neutron.i18n import _LE, _LI, _LW
29
 
from neutron.openstack.common import log
30
 
from neutron.openstack.common import loopingcall
31
 
from neutron.plugins.vmware.api_client import exception as api_exc
32
 
from neutron.plugins.vmware.common import exceptions as nsx_exc
33
 
from neutron.plugins.vmware.common import nsx_utils
34
 
from neutron.plugins.vmware import nsxlib
35
 
from neutron.plugins.vmware.nsxlib import router as routerlib
36
 
from neutron.plugins.vmware.nsxlib import switch as switchlib
37
 
 
38
 
# Maximum page size for a single request
39
 
# NOTE(salv-orlando): This might become a version-dependent map should the
40
 
# limit be raised in future versions
41
 
MAX_PAGE_SIZE = 5000
42
 
 
43
 
LOG = log.getLogger(__name__)
44
 
 
45
 
 
46
 
class NsxCache(object):
47
 
    """A simple Cache for NSX resources.
48
 
 
49
 
    Associates resource id with resource hash to rapidly identify
50
 
    updated resources.
51
 
    Each entry in the cache also stores the following information:
52
 
    - changed: the resource in the cache has been altered following
53
 
      an update or a delete
54
 
    - hit: the resource has been visited during an update (and possibly
55
 
      left unchanged)
56
 
    - data: current resource data
57
 
    - data_bk: backup of resource data prior to its removal
58
 
    """
59
 
 
60
 
    def __init__(self):
61
 
        # Maps a uuid to the dict containing it
62
 
        self._uuid_dict_mappings = {}
63
 
        # Dicts for NSX cached resources
64
 
        self._lswitches = {}
65
 
        self._lswitchports = {}
66
 
        self._lrouters = {}
67
 
 
68
 
    def __getitem__(self, key):
69
 
        # uuids are unique across the various types of resources
70
 
        # TODO(salv-orlando): Avoid lookups over all dictionaries
71
 
        # when retrieving items
72
 
        # Fetch lswitches, lports, or lrouters
73
 
        resources = self._uuid_dict_mappings[key]
74
 
        return resources[key]
75
 
 
76
 
    def _clear_changed_flag_and_remove_from_cache(self, resources):
77
 
        # Clear the 'changed' attribute for all items
78
 
        for uuid, item in resources.items():
79
 
            if item.pop('changed', None) and not item.get('data'):
80
 
                # The item is not anymore in NSX, so delete it
81
 
                del resources[uuid]
82
 
                del self._uuid_dict_mappings[uuid]
83
 
                LOG.debug("Removed item %s from NSX object cache", uuid)
84
 
 
85
 
    def _update_resources(self, resources, new_resources, clear_changed=True):
86
 
        if clear_changed:
87
 
            self._clear_changed_flag_and_remove_from_cache(resources)
88
 
 
89
 
        def do_hash(item):
90
 
            return hash(jsonutils.dumps(item))
91
 
 
92
 
        # Parse new data and identify new, deleted, and updated resources
93
 
        for item in new_resources:
94
 
            item_id = item['uuid']
95
 
            if resources.get(item_id):
96
 
                new_hash = do_hash(item)
97
 
                if new_hash != resources[item_id]['hash']:
98
 
                    resources[item_id]['hash'] = new_hash
99
 
                    resources[item_id]['changed'] = True
100
 
                    resources[item_id]['data_bk'] = (
101
 
                        resources[item_id]['data'])
102
 
                    resources[item_id]['data'] = item
103
 
                # Mark the item as hit in any case
104
 
                resources[item_id]['hit'] = True
105
 
                LOG.debug("Updating item %s in NSX object cache", item_id)
106
 
            else:
107
 
                resources[item_id] = {'hash': do_hash(item)}
108
 
                resources[item_id]['hit'] = True
109
 
                resources[item_id]['changed'] = True
110
 
                resources[item_id]['data'] = item
111
 
                # add a uuid to dict mapping for easy retrieval
112
 
                # with __getitem__
113
 
                self._uuid_dict_mappings[item_id] = resources
114
 
                LOG.debug("Added item %s to NSX object cache", item_id)
115
 
 
116
 
    def _delete_resources(self, resources):
117
 
        # Mark for removal all the elements which have not been visited.
118
 
        # And clear the 'hit' attribute.
119
 
        for to_delete in [k for (k, v) in resources.iteritems()
120
 
                          if not v.pop('hit', False)]:
121
 
            resources[to_delete]['changed'] = True
122
 
            resources[to_delete]['data_bk'] = (
123
 
                resources[to_delete].pop('data', None))
124
 
 
125
 
    def _get_resource_ids(self, resources, changed_only):
126
 
        if changed_only:
127
 
            return [k for (k, v) in resources.iteritems()
128
 
                    if v.get('changed')]
129
 
        return resources.keys()
130
 
 
131
 
    def get_lswitches(self, changed_only=False):
132
 
        return self._get_resource_ids(self._lswitches, changed_only)
133
 
 
134
 
    def get_lrouters(self, changed_only=False):
135
 
        return self._get_resource_ids(self._lrouters, changed_only)
136
 
 
137
 
    def get_lswitchports(self, changed_only=False):
138
 
        return self._get_resource_ids(self._lswitchports, changed_only)
139
 
 
140
 
    def update_lswitch(self, lswitch):
141
 
        self._update_resources(self._lswitches, [lswitch], clear_changed=False)
142
 
 
143
 
    def update_lrouter(self, lrouter):
144
 
        self._update_resources(self._lrouters, [lrouter], clear_changed=False)
145
 
 
146
 
    def update_lswitchport(self, lswitchport):
147
 
        self._update_resources(self._lswitchports, [lswitchport],
148
 
                               clear_changed=False)
149
 
 
150
 
    def process_updates(self, lswitches=None,
151
 
                        lrouters=None, lswitchports=None):
152
 
        self._update_resources(self._lswitches, lswitches)
153
 
        self._update_resources(self._lrouters, lrouters)
154
 
        self._update_resources(self._lswitchports, lswitchports)
155
 
        return (self._get_resource_ids(self._lswitches, changed_only=True),
156
 
                self._get_resource_ids(self._lrouters, changed_only=True),
157
 
                self._get_resource_ids(self._lswitchports, changed_only=True))
158
 
 
159
 
    def process_deletes(self):
160
 
        self._delete_resources(self._lswitches)
161
 
        self._delete_resources(self._lrouters)
162
 
        self._delete_resources(self._lswitchports)
163
 
        return (self._get_resource_ids(self._lswitches, changed_only=True),
164
 
                self._get_resource_ids(self._lrouters, changed_only=True),
165
 
                self._get_resource_ids(self._lswitchports, changed_only=True))
166
 
 
167
 
 
168
 
class SyncParameters(object):
169
 
    """Defines attributes used by the synchronization procedure.
170
 
 
171
 
    chunk_size: Actual chunk size
172
 
    extra_chunk_size: Additional data to fetch because of chunk size
173
 
                      adjustment
174
 
    current_chunk: Counter of the current data chunk being synchronized
175
 
    Page cursors: markers for the next resource to fetch.
176
 
                 'start' means page cursor unset for fetching 1st page
177
 
    init_sync_performed: True if the initial synchronization concluded
178
 
    """
179
 
 
180
 
    def __init__(self, min_chunk_size):
181
 
        self.chunk_size = min_chunk_size
182
 
        self.extra_chunk_size = 0
183
 
        self.current_chunk = 0
184
 
        self.ls_cursor = 'start'
185
 
        self.lr_cursor = 'start'
186
 
        self.lp_cursor = 'start'
187
 
        self.init_sync_performed = False
188
 
        self.total_size = 0
189
 
 
190
 
 
191
 
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
192
 
    """Start a loopingcall for the synchronization task."""
193
 
    # Start a looping call to synchronize operational status
194
 
    # for neutron resources
195
 
    if not state_sync_interval:
196
 
        # do not start the looping call if specified
197
 
        # sync interval is 0
198
 
        return
199
 
    state_synchronizer = loopingcall.DynamicLoopingCall(
200
 
        func, sp=SyncParameters(min_chunk_size))
201
 
    state_synchronizer.start(
202
 
        periodic_interval_max=state_sync_interval)
203
 
    return state_synchronizer
204
 
 
205
 
 
206
 
class NsxSynchronizer(object):
207
 
 
208
 
    LS_URI = nsxlib._build_uri_path(
209
 
        switchlib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
210
 
        relations='LogicalSwitchStatus')
211
 
    LR_URI = nsxlib._build_uri_path(
212
 
        routerlib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
213
 
        relations='LogicalRouterStatus')
214
 
    LP_URI = nsxlib._build_uri_path(
215
 
        switchlib.LSWITCHPORT_RESOURCE,
216
 
        parent_resource_id='*',
217
 
        fields='uuid,tags,fabric_status_up',
218
 
        relations='LogicalPortStatus')
219
 
 
220
 
    def __init__(self, plugin, cluster, state_sync_interval,
221
 
                 req_delay, min_chunk_size, max_rand_delay=0):
222
 
        random.seed()
223
 
        self._nsx_cache = NsxCache()
224
 
        # Store parameters as instance members
225
 
        # NOTE(salv-orlando): apologies if it looks java-ish
226
 
        self._plugin = plugin
227
 
        self._cluster = cluster
228
 
        self._req_delay = req_delay
229
 
        self._sync_interval = state_sync_interval
230
 
        self._max_rand_delay = max_rand_delay
231
 
        # Validate parameters
232
 
        if self._sync_interval < self._req_delay:
233
 
            err_msg = (_("Minimum request delay:%(req_delay)s must not "
234
 
                         "exceed synchronization interval:%(sync_interval)s") %
235
 
                       {'req_delay': self._req_delay,
236
 
                        'sync_interval': self._sync_interval})
237
 
            LOG.error(err_msg)
238
 
            raise nsx_exc.NsxPluginException(err_msg=err_msg)
239
 
        # Backoff time in case of failures while fetching sync data
240
 
        self._sync_backoff = 1
241
 
        # Store the looping call in an instance variable to allow unit tests
242
 
        # for controlling its lifecycle
243
 
        self._sync_looping_call = _start_loopingcall(
244
 
            min_chunk_size, state_sync_interval, self._synchronize_state)
245
 
 
246
 
    def _get_tag_dict(self, tags):
247
 
        return dict((tag.get('scope'), tag['tag']) for tag in tags)
248
 
 
249
 
    def synchronize_network(self, context, neutron_network_data,
250
 
                            lswitches=None):
251
 
        """Synchronize a Neutron network with its NSX counterpart.
252
 
 
253
 
        This routine synchronizes a set of switches when a Neutron
254
 
        network is mapped to multiple lswitches.
255
 
        """
256
 
        if not lswitches:
257
 
            # Try to get logical switches from nsx
258
 
            try:
259
 
                lswitches = nsx_utils.fetch_nsx_switches(
260
 
                    context.session, self._cluster,
261
 
                    neutron_network_data['id'])
262
 
            except exceptions.NetworkNotFound:
263
 
                # TODO(salv-orlando): We should be catching
264
 
                # api_exc.ResourceNotFound here
265
 
                # The logical switch was not found
266
 
                LOG.warning(_LW("Logical switch for neutron network %s not "
267
 
                                "found on NSX."), neutron_network_data['id'])
268
 
                lswitches = []
269
 
            else:
270
 
                for lswitch in lswitches:
271
 
                    self._nsx_cache.update_lswitch(lswitch)
272
 
        # By default assume things go wrong
273
 
        status = constants.NET_STATUS_ERROR
274
 
        # In most cases lswitches will contain a single element
275
 
        for ls in lswitches:
276
 
            if not ls:
277
 
                # Logical switch was deleted
278
 
                break
279
 
            ls_status = ls['_relations']['LogicalSwitchStatus']
280
 
            if not ls_status['fabric_status']:
281
 
                status = constants.NET_STATUS_DOWN
282
 
                break
283
 
        else:
284
 
            # No switch was down or missing. Set status to ACTIVE unless
285
 
            # there were no switches in the first place!
286
 
            if lswitches:
287
 
                status = constants.NET_STATUS_ACTIVE
288
 
        # Update db object
289
 
        if status == neutron_network_data['status']:
290
 
            # do nothing
291
 
            return
292
 
 
293
 
        with context.session.begin(subtransactions=True):
294
 
            try:
295
 
                network = self._plugin._get_network(context,
296
 
                                                    neutron_network_data['id'])
297
 
            except exceptions.NetworkNotFound:
298
 
                pass
299
 
            else:
300
 
                network.status = status
301
 
                LOG.debug("Updating status for neutron resource %(q_id)s to:"
302
 
                          " %(status)s",
303
 
                          {'q_id': neutron_network_data['id'],
304
 
                           'status': status})
305
 
 
306
 
    def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
307
 
        if not ls_uuids and not scan_missing:
308
 
            return
309
 
        neutron_net_ids = set()
310
 
        neutron_nsx_mappings = {}
311
 
        # TODO(salvatore-orlando): Deal with the case the tag
312
 
        # has been tampered with
313
 
        for ls_uuid in ls_uuids:
314
 
            # If the lswitch has been deleted, get backup copy of data
315
 
            lswitch = (self._nsx_cache[ls_uuid].get('data') or
316
 
                       self._nsx_cache[ls_uuid].get('data_bk'))
317
 
            tags = self._get_tag_dict(lswitch['tags'])
318
 
            neutron_id = tags.get('quantum_net_id')
319
 
            neutron_net_ids.add(neutron_id)
320
 
            neutron_nsx_mappings[neutron_id] = (
321
 
                neutron_nsx_mappings.get(neutron_id, []) +
322
 
                [self._nsx_cache[ls_uuid]])
323
 
        # Fetch neutron networks from database
324
 
        filters = {'router:external': [False]}
325
 
        if not scan_missing:
326
 
            filters['id'] = neutron_net_ids
327
 
 
328
 
        networks = self._plugin._get_collection(
329
 
            ctx, models_v2.Network, self._plugin._make_network_dict,
330
 
            filters=filters)
331
 
 
332
 
        for network in networks:
333
 
            lswitches = neutron_nsx_mappings.get(network['id'], [])
334
 
            lswitches = [lsw.get('data') for lsw in lswitches]
335
 
            self.synchronize_network(ctx, network, lswitches)
336
 
 
337
 
    def synchronize_router(self, context, neutron_router_data,
338
 
                           lrouter=None):
339
 
        """Synchronize a neutron router with its NSX counterpart."""
340
 
        if not lrouter:
341
 
            # Try to get router from nsx
342
 
            try:
343
 
                # This query will return the logical router status too
344
 
                nsx_router_id = nsx_utils.get_nsx_router_id(
345
 
                    context.session, self._cluster, neutron_router_data['id'])
346
 
                if nsx_router_id:
347
 
                    lrouter = routerlib.get_lrouter(
348
 
                        self._cluster, nsx_router_id)
349
 
            except exceptions.NotFound:
350
 
                # NOTE(salv-orlando): We should be catching
351
 
                # api_exc.ResourceNotFound here
352
 
                # The logical router was not found
353
 
                LOG.warning(_LW("Logical router for neutron router %s not "
354
 
                                "found on NSX."), neutron_router_data['id'])
355
 
            if lrouter:
356
 
                # Update the cache
357
 
                self._nsx_cache.update_lrouter(lrouter)
358
 
 
359
 
        # Note(salv-orlando): It might worth adding a check to verify neutron
360
 
        # resource tag in nsx entity matches a Neutron id.
361
 
        # By default assume things go wrong
362
 
        status = constants.NET_STATUS_ERROR
363
 
        if lrouter:
364
 
            lr_status = (lrouter['_relations']
365
 
                         ['LogicalRouterStatus']
366
 
                         ['fabric_status'])
367
 
            status = (lr_status and
368
 
                      constants.NET_STATUS_ACTIVE
369
 
                      or constants.NET_STATUS_DOWN)
370
 
        # Update db object
371
 
        if status == neutron_router_data['status']:
372
 
            # do nothing
373
 
            return
374
 
 
375
 
        with context.session.begin(subtransactions=True):
376
 
            try:
377
 
                router = self._plugin._get_router(context,
378
 
                                                  neutron_router_data['id'])
379
 
            except l3.RouterNotFound:
380
 
                pass
381
 
            else:
382
 
                router.status = status
383
 
                LOG.debug("Updating status for neutron resource %(q_id)s to:"
384
 
                          " %(status)s",
385
 
                          {'q_id': neutron_router_data['id'],
386
 
                           'status': status})
387
 
 
388
 
    def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
389
 
        if not lr_uuids and not scan_missing:
390
 
            return
391
 
        # TODO(salvatore-orlando): Deal with the case the tag
392
 
        # has been tampered with
393
 
        neutron_router_mappings = {}
394
 
        for lr_uuid in lr_uuids:
395
 
            lrouter = (self._nsx_cache[lr_uuid].get('data') or
396
 
                       self._nsx_cache[lr_uuid].get('data_bk'))
397
 
            tags = self._get_tag_dict(lrouter['tags'])
398
 
            neutron_router_id = tags.get('q_router_id')
399
 
            if neutron_router_id:
400
 
                neutron_router_mappings[neutron_router_id] = (
401
 
                    self._nsx_cache[lr_uuid])
402
 
            else:
403
 
                LOG.warn(_LW("Unable to find Neutron router id for "
404
 
                             "NSX logical router: %s"), lr_uuid)
405
 
        # Fetch neutron routers from database
406
 
        filters = ({} if scan_missing else
407
 
                   {'id': neutron_router_mappings.keys()})
408
 
        routers = self._plugin._get_collection(
409
 
            ctx, l3_db.Router, self._plugin._make_router_dict,
410
 
            filters=filters)
411
 
        for router in routers:
412
 
            lrouter = neutron_router_mappings.get(router['id'])
413
 
            self.synchronize_router(
414
 
                ctx, router, lrouter and lrouter.get('data'))
415
 
 
416
 
    def synchronize_port(self, context, neutron_port_data,
417
 
                         lswitchport=None, ext_networks=None):
418
 
        """Synchronize a Neutron port with its NSX counterpart."""
419
 
        # Skip synchronization for ports on external networks
420
 
        if not ext_networks:
421
 
            ext_networks = [net['id'] for net in context.session.query(
422
 
                models_v2.Network).join(
423
 
                    external_net_db.ExternalNetwork,
424
 
                    (models_v2.Network.id ==
425
 
                     external_net_db.ExternalNetwork.network_id))]
426
 
        if neutron_port_data['network_id'] in ext_networks:
427
 
            with context.session.begin(subtransactions=True):
428
 
                neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
429
 
                return
430
 
 
431
 
        if not lswitchport:
432
 
            # Try to get port from nsx
433
 
            try:
434
 
                ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
435
 
                    context.session, self._cluster, neutron_port_data['id'])
436
 
                if lp_uuid:
437
 
                    lswitchport = switchlib.get_port(
438
 
                        self._cluster, ls_uuid, lp_uuid,
439
 
                        relations='LogicalPortStatus')
440
 
            except (exceptions.PortNotFoundOnNetwork):
441
 
                # NOTE(salv-orlando): We should be catching
442
 
                # api_exc.ResourceNotFound here instead
443
 
                # of PortNotFoundOnNetwork when the id exists but
444
 
                # the logical switch port was not found
445
 
                LOG.warning(_LW("Logical switch port for neutron port %s "
446
 
                                "not found on NSX."), neutron_port_data['id'])
447
 
                lswitchport = None
448
 
            else:
449
 
                # If lswitchport is not None, update the cache.
450
 
                # It could be none if the port was deleted from the backend
451
 
                if lswitchport:
452
 
                    self._nsx_cache.update_lswitchport(lswitchport)
453
 
        # Note(salv-orlando): It might worth adding a check to verify neutron
454
 
        # resource tag in nsx entity matches Neutron id.
455
 
        # By default assume things go wrong
456
 
        status = constants.PORT_STATUS_ERROR
457
 
        if lswitchport:
458
 
            lp_status = (lswitchport['_relations']
459
 
                         ['LogicalPortStatus']
460
 
                         ['fabric_status_up'])
461
 
            status = (lp_status and
462
 
                      constants.PORT_STATUS_ACTIVE
463
 
                      or constants.PORT_STATUS_DOWN)
464
 
 
465
 
        # Update db object
466
 
        if status == neutron_port_data['status']:
467
 
            # do nothing
468
 
            return
469
 
 
470
 
        with context.session.begin(subtransactions=True):
471
 
            try:
472
 
                port = self._plugin._get_port(context,
473
 
                                              neutron_port_data['id'])
474
 
            except exceptions.PortNotFound:
475
 
                pass
476
 
            else:
477
 
                port.status = status
478
 
                LOG.debug("Updating status for neutron resource %(q_id)s to:"
479
 
                          " %(status)s",
480
 
                          {'q_id': neutron_port_data['id'],
481
 
                           'status': status})
482
 
 
483
 
    def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
484
 
        if not lp_uuids and not scan_missing:
485
 
            return
486
 
        # Find Neutron port id by tag - the tag is already
487
 
        # loaded in memory, no reason for doing a db query
488
 
        # TODO(salvatore-orlando): Deal with the case the tag
489
 
        # has been tampered with
490
 
        neutron_port_mappings = {}
491
 
        for lp_uuid in lp_uuids:
492
 
            lport = (self._nsx_cache[lp_uuid].get('data') or
493
 
                     self._nsx_cache[lp_uuid].get('data_bk'))
494
 
            tags = self._get_tag_dict(lport['tags'])
495
 
            neutron_port_id = tags.get('q_port_id')
496
 
            if neutron_port_id:
497
 
                neutron_port_mappings[neutron_port_id] = (
498
 
                    self._nsx_cache[lp_uuid])
499
 
        # Fetch neutron ports from database
500
 
        # At the first sync we need to fetch all ports
501
 
        filters = ({} if scan_missing else
502
 
                   {'id': neutron_port_mappings.keys()})
503
 
        # TODO(salv-orlando): Work out a solution for avoiding
504
 
        # this query
505
 
        ext_nets = [net['id'] for net in ctx.session.query(
506
 
            models_v2.Network).join(
507
 
                external_net_db.ExternalNetwork,
508
 
                (models_v2.Network.id ==
509
 
                 external_net_db.ExternalNetwork.network_id))]
510
 
        ports = self._plugin._get_collection(
511
 
            ctx, models_v2.Port, self._plugin._make_port_dict,
512
 
            filters=filters)
513
 
        for port in ports:
514
 
            lswitchport = neutron_port_mappings.get(port['id'])
515
 
            self.synchronize_port(
516
 
                ctx, port, lswitchport and lswitchport.get('data'),
517
 
                ext_networks=ext_nets)
518
 
 
519
 
    def _get_chunk_size(self, sp):
520
 
        # NOTE(salv-orlando): Try to use __future__ for this routine only?
521
 
        ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
522
 
                 (float(self._sync_interval) / float(self._req_delay)))
523
 
        new_size = max(1.0, ratio) * float(sp.chunk_size)
524
 
        return int(new_size) + (new_size - int(new_size) > 0)
525
 
 
526
 
    def _fetch_data(self, uri, cursor, page_size):
527
 
        # If not cursor there is nothing to retrieve
528
 
        if cursor:
529
 
            if cursor == 'start':
530
 
                cursor = None
531
 
            # Chunk size tuning might, in some conditions, make it larger
532
 
            # than 5,000, which is the maximum page size allowed by the NSX
533
 
            # API. In this case the request should be split in multiple
534
 
            # requests. This is not ideal, and therefore a log warning will
535
 
            # be emitted.
536
 
            num_requests = page_size / (MAX_PAGE_SIZE + 1) + 1
537
 
            if num_requests > 1:
538
 
                LOG.warn(_LW("Requested page size is %(cur_chunk_size)d. "
539
 
                             "It might be necessary to do %(num_requests)d "
540
 
                             "round-trips to NSX for fetching data. Please "
541
 
                             "tune sync parameters to ensure chunk size "
542
 
                             "is less than %(max_page_size)d"),
543
 
                         {'cur_chunk_size': page_size,
544
 
                          'num_requests': num_requests,
545
 
                          'max_page_size': MAX_PAGE_SIZE})
546
 
            # Only the first request might return the total size,
547
 
            # subsequent requests will definetely not
548
 
            results, cursor, total_size = nsxlib.get_single_query_page(
549
 
                uri, self._cluster, cursor,
550
 
                min(page_size, MAX_PAGE_SIZE))
551
 
            for _req in range(num_requests - 1):
552
 
                # If no cursor is returned break the cycle as there is no
553
 
                # actual need to perform multiple requests (all fetched)
554
 
                # This happens when the overall size of resources exceeds
555
 
                # the maximum page size, but the number for each single
556
 
                # resource type is below this threshold
557
 
                if not cursor:
558
 
                    break
559
 
                req_results, cursor = nsxlib.get_single_query_page(
560
 
                    uri, self._cluster, cursor,
561
 
                    min(page_size, MAX_PAGE_SIZE))[:2]
562
 
                results.extend(req_results)
563
 
            # reset cursor before returning if we queried just to
564
 
            # know the number of entities
565
 
            return results, cursor if page_size else 'start', total_size
566
 
        return [], cursor, None
567
 
 
568
 
    def _fetch_nsx_data_chunk(self, sp):
569
 
        base_chunk_size = sp.chunk_size
570
 
        chunk_size = base_chunk_size + sp.extra_chunk_size
571
 
        LOG.info(_LI("Fetching up to %s resources "
572
 
                     "from NSX backend"), chunk_size)
573
 
        fetched = ls_count = lr_count = lp_count = 0
574
 
        lswitches = lrouters = lswitchports = []
575
 
        if sp.ls_cursor or sp.ls_cursor == 'start':
576
 
            (lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
577
 
                self.LS_URI, sp.ls_cursor, chunk_size)
578
 
            fetched = len(lswitches)
579
 
        if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
580
 
            (lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
581
 
                self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
582
 
        fetched += len(lrouters)
583
 
        if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
584
 
            (lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
585
 
                self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
586
 
        fetched += len(lswitchports)
587
 
        if sp.current_chunk == 0:
588
 
            # No cursors were provided. Then it must be possible to
589
 
            # calculate the total amount of data to fetch
590
 
            sp.total_size = ls_count + lr_count + lp_count
591
 
        LOG.debug("Total data size: %d", sp.total_size)
592
 
        sp.chunk_size = self._get_chunk_size(sp)
593
 
        # Calculate chunk size adjustment
594
 
        sp.extra_chunk_size = sp.chunk_size - base_chunk_size
595
 
        LOG.debug("Fetched %(num_lswitches)d logical switches, "
596
 
                  "%(num_lswitchports)d logical switch ports,"
597
 
                  "%(num_lrouters)d logical routers",
598
 
                  {'num_lswitches': len(lswitches),
599
 
                   'num_lswitchports': len(lswitchports),
600
 
                   'num_lrouters': len(lrouters)})
601
 
        return (lswitches, lrouters, lswitchports)
602
 
 
603
 
    def _synchronize_state(self, sp):
604
 
        # If the plugin has been destroyed, stop the LoopingCall
605
 
        if not self._plugin:
606
 
            raise loopingcall.LoopingCallDone()
607
 
        start = timeutils.utcnow()
608
 
        # Reset page cursor variables if necessary
609
 
        if sp.current_chunk == 0:
610
 
            sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
611
 
        LOG.info(_LI("Running state synchronization task. Chunk: %s"),
612
 
                 sp.current_chunk)
613
 
        # Fetch chunk_size data from NSX
614
 
        try:
615
 
            (lswitches, lrouters, lswitchports) = (
616
 
                self._fetch_nsx_data_chunk(sp))
617
 
        except (api_exc.RequestTimeout, api_exc.NsxApiException):
618
 
            sleep_interval = self._sync_backoff
619
 
            # Cap max back off to 64 seconds
620
 
            self._sync_backoff = min(self._sync_backoff * 2, 64)
621
 
            LOG.exception(_LE("An error occurred while communicating with "
622
 
                              "NSX backend. Will retry synchronization "
623
 
                              "in %d seconds"), sleep_interval)
624
 
            return sleep_interval
625
 
        LOG.debug("Time elapsed querying NSX: %s",
626
 
                  timeutils.utcnow() - start)
627
 
        if sp.total_size:
628
 
            num_chunks = ((sp.total_size / sp.chunk_size) +
629
 
                          (sp.total_size % sp.chunk_size != 0))
630
 
        else:
631
 
            num_chunks = 1
632
 
        LOG.debug("Number of chunks: %d", num_chunks)
633
 
        # Find objects which have changed on NSX side and need
634
 
        # to be synchronized
635
 
        LOG.debug("Processing NSX cache for updated objects")
636
 
        (ls_uuids, lr_uuids, lp_uuids) = self._nsx_cache.process_updates(
637
 
            lswitches, lrouters, lswitchports)
638
 
        # Process removed objects only at the last chunk
639
 
        scan_missing = (sp.current_chunk == num_chunks - 1 and
640
 
                        not sp.init_sync_performed)
641
 
        if sp.current_chunk == num_chunks - 1:
642
 
            LOG.debug("Processing NSX cache for deleted objects")
643
 
            self._nsx_cache.process_deletes()
644
 
            ls_uuids = self._nsx_cache.get_lswitches(
645
 
                changed_only=not scan_missing)
646
 
            lr_uuids = self._nsx_cache.get_lrouters(
647
 
                changed_only=not scan_missing)
648
 
            lp_uuids = self._nsx_cache.get_lswitchports(
649
 
                changed_only=not scan_missing)
650
 
        LOG.debug("Time elapsed hashing data: %s",
651
 
                  timeutils.utcnow() - start)
652
 
        # Get an admin context
653
 
        ctx = context.get_admin_context()
654
 
        # Synchronize with database
655
 
        self._synchronize_lswitches(ctx, ls_uuids,
656
 
                                    scan_missing=scan_missing)
657
 
        self._synchronize_lrouters(ctx, lr_uuids,
658
 
                                   scan_missing=scan_missing)
659
 
        self._synchronize_lswitchports(ctx, lp_uuids,
660
 
                                       scan_missing=scan_missing)
661
 
        # Increase chunk counter
662
 
        LOG.info(_LI("Synchronization for chunk %(chunk_num)d of "
663
 
                     "%(total_chunks)d performed"),
664
 
                 {'chunk_num': sp.current_chunk + 1,
665
 
                  'total_chunks': num_chunks})
666
 
        sp.current_chunk = (sp.current_chunk + 1) % num_chunks
667
 
        added_delay = 0
668
 
        if sp.current_chunk == 0:
669
 
            # Ensure init_sync_performed is True
670
 
            if not sp.init_sync_performed:
671
 
                sp.init_sync_performed = True
672
 
            # Add additional random delay
673
 
            added_delay = random.randint(0, self._max_rand_delay)
674
 
        LOG.debug("Time elapsed at end of sync: %s",
675
 
                  timeutils.utcnow() - start)
676
 
        return self._sync_interval / num_chunks + added_delay