1
# Licensed under the Apache License, Version 2.0 (the "License"); you may
2
# not use this file except in compliance with the License. You may obtain
3
# a copy of the License at
5
# http://www.apache.org/licenses/LICENSE-2.0
7
# Unless required by applicable law or agreed to in writing, software
8
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10
# License for the specific language governing permissions and limitations
16
from oslo_log import log as logging
17
from oslo_utils import reflection
19
from neutron.callbacks import events
20
from neutron.callbacks import exceptions
21
from neutron.callbacks import resources
22
from neutron.i18n import _LE, _LI
24
LOG = logging.getLogger(__name__)
27
class CallbacksManager(object):
28
"""A callback system that allows objects to cooperate in a loose manner."""
33
def subscribe(self, callback, resource, event):
34
"""Subscribe callback for a resource event.
36
The same callback may register for more than one event.
38
:param callback: the callback. It must raise or return a boolean.
39
:param resource: the resource. It must be a valid resource.
40
:param event: the event. It must be a valid event.
42
LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s",
43
{'callback': callback, 'resource': resource, 'event': event})
44
if resource not in resources.VALID:
45
raise exceptions.Invalid(element='resource', value=resource)
46
if event not in events.VALID:
47
raise exceptions.Invalid(element='event', value=event)
49
callback_id = _get_id(callback)
50
self._callbacks[resource][event][callback_id] = weakref.proxy(callback)
51
# We keep a copy of callbacks to speed the unsubscribe operation.
52
if callback_id not in self._index:
53
self._index[callback_id] = collections.defaultdict(set)
54
self._index[callback_id][resource].add(event)
56
def unsubscribe(self, callback, resource, event):
57
"""Unsubscribe callback from the registry.
59
:param callback: the callback.
60
:param resource: the resource.
61
:param event: the event.
63
LOG.debug("Unsubscribe: %(callback)s %(resource)s %(event)s",
64
{'callback': callback, 'resource': resource, 'event': event})
66
callback_id = self._find(callback)
68
LOG.debug("Callback %s not found", callback_id)
70
if resource and event:
71
del self._callbacks[resource][event][callback_id]
72
self._index[callback_id][resource].discard(event)
73
if not self._index[callback_id][resource]:
74
del self._index[callback_id][resource]
75
if not self._index[callback_id]:
76
del self._index[callback_id]
78
value = '%s,%s' % (resource, event)
79
raise exceptions.Invalid(element='resource,event', value=value)
81
def unsubscribe_by_resource(self, callback, resource):
82
"""Unsubscribe callback for any event associated to the resource.
84
:param callback: the callback.
85
:param resource: the resource.
87
callback_id = self._find(callback)
89
if resource in self._index[callback_id]:
90
for event in self._index[callback_id][resource]:
91
del self._callbacks[resource][event][callback_id]
92
del self._index[callback_id][resource]
93
if not self._index[callback_id]:
94
del self._index[callback_id]
96
def unsubscribe_all(self, callback):
97
"""Unsubscribe callback for all events and all resources.
100
:param callback: the callback.
102
callback_id = self._find(callback)
104
for resource, resource_events in self._index[callback_id].items():
105
for event in resource_events:
106
del self._callbacks[resource][event][callback_id]
107
del self._index[callback_id]
109
def notify(self, resource, event, trigger, **kwargs):
110
"""Notify all subscribed callback(s).
112
Dispatch the resource's event to the subscribed callbacks.
114
:param resource: the resource.
115
:param event: the event.
116
:param trigger: the trigger. A reference to the sender of the event.
118
errors = self._notify_loop(resource, event, trigger, **kwargs)
119
if errors and event.startswith(events.BEFORE):
120
abort_event = event.replace(
121
events.BEFORE, events.ABORT)
122
self._notify_loop(resource, abort_event, trigger)
123
raise exceptions.CallbackFailure(errors=errors)
126
"""Brings the manager to a clean slate."""
127
self._callbacks = collections.defaultdict(dict)
128
self._index = collections.defaultdict(dict)
129
for resource in resources.VALID:
130
for event in events.VALID:
131
self._callbacks[resource][event] = collections.defaultdict()
133
def _notify_loop(self, resource, event, trigger, **kwargs):
134
"""The notification loop."""
135
LOG.info(_LI("Notify callbacks for %(resource)s, %(event)s"),
136
{'resource': resource, 'event': event})
139
# TODO(armax): consider using a GreenPile
140
for callback_id, callback in self._callbacks[resource][event].items():
142
LOG.info(_LI("Calling callback %s"), callback_id)
143
callback(resource, event, trigger, **kwargs)
144
except Exception as e:
145
LOG.exception(_LE("Error during notification for "
146
"%(callback)s %(resource)s, %(event)s"),
147
{'callback': callback_id,
148
'resource': resource,
150
errors.append(exceptions.NotificationError(callback_id, e))
153
def _find(self, callback):
154
"""Return the callback_id if found, None otherwise."""
155
callback_id = _get_id(callback)
156
return callback_id if callback_id in self._index else None
159
def _get_id(callback):
160
"""Return a unique identifier for the callback."""
161
# TODO(armax): consider using something other than names
162
# https://www.python.org/dev/peps/pep-3155/, but this
163
# might be okay for now.
164
return reflection.get_callable_name(callback)