151
151
'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'),
152
152
'VM_UDF_CREATED': ('udf',),
153
153
'VM_UDF_CREATE_ERROR': ('path', 'error'),
154
'VM_SHARE_SUBSCRIBED': ('share',),
155
'VM_SHARE_SUBSCRIBE_ERROR': ('share_id', 'error'),
156
'VM_SHARE_UNSUBSCRIBED': ('share',),
157
'VM_SHARE_UNSUBSCRIBE_ERROR': ('share_id', 'error'),
154
158
'VM_SHARE_CREATED': ('share_id',),
155
159
'VM_SHARE_DELETED': ('share',),
156
160
'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),
163
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
167
DEFAULT_HANDLER = "handle_default" # receives (event_name, **kwargs)
166
170
class EventQueue(object):
231
235
if obj not in self._listeners:
232
236
self._listeners.append(obj)
234
def push(self, event_name, *args, **kwargs):
238
def push(self, event_name, **kwargs):
235
239
"""Receives a push for all events.
237
241
The signature for each event is forced on each method, not in this
238
242
'push' arguments.
240
log_msg = "push_event: %s, args:%s, kw:%s"
244
log_msg = "push_event: %s, kwargs: %s"
241
245
if event_name.endswith('DELETE'):
242
246
# log every DELETE in INFO level
243
self.log.info(log_msg, event_name, args, kwargs)
247
self.log.info(log_msg, event_name, kwargs)
244
248
elif event_name == 'SYS_USER_CONNECT':
245
self.log.debug(log_msg, event_name, '*', '*')
249
self.log.debug(log_msg, event_name, '*')
247
self.log.debug(log_msg, event_name, args, kwargs)
251
self.log.debug(log_msg, event_name, kwargs)
249
253
# get the event parameters
255
259
raise InvalidEventError(msg)
257
261
# validate that the received arguments are ok
259
if len(args) > len(event_params):
260
msg = "Too many arguments! (should receive %s)" % event_params
263
event_params = event_params[len(args):]
265
262
s_eventparms = set(event_params)
266
263
s_kwargs = set(kwargs.keys())
267
264
if s_eventparms != s_kwargs:
271
268
raise TypeError(msg)
273
270
# check if we are currently dispatching an event
274
self.dispatch_queue.put((event_name, args, kwargs))
271
self.dispatch_queue.put((event_name, kwargs))
275
272
if not self.dispatching:
276
273
self.dispatching = True
279
event_name, args, kwargs = \
280
self.dispatch_queue.get(block=False)
281
self._dispatch(event_name, *args, **kwargs)
276
event_name, kwargs = self.dispatch_queue.get(block=False)
277
self._dispatch(event_name, **kwargs)
283
279
self.dispatching = False
284
280
for callable in self.empty_event_queue_callbacks.copy():
288
def _dispatch(self, event_name, *args, **kwargs):
284
def _dispatch(self, event_name, **kwargs):
289
285
""" push the event to all listeners. """
290
286
# check listeners to see if have the proper method, and call it
291
287
meth_name = "handle_" + event_name
296
292
method = self._get_listener_method(listener, meth_name, event_name)
297
293
if method is not None:
299
method(*args, **kwargs)
300
296
except Exception:
301
297
self.log.exception("Error encountered while handling: %s"
302
298
" in %s", event_name, listener)