~allenap/maas/xxx-a-thon

« back to all changes in this revision

Viewing changes to src/maasserver/listener.py

  • Committer: Gavin Panella
  • Date: 2016-03-22 21:14:34 UTC
  • mfrom: (4657.1.157 maas)
  • Revision ID: gavin.panella@canonical.com-20160322211434-xzuovio86zvzo2js
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
52
52
    decoded or is not being handled."""
53
53
 
54
54
 
 
55
class PostgresListenerRegistrationError(Exception):
 
56
    """Error raised when registering a handler fails."""
 
57
 
 
58
 
 
59
class PostgresListenerUnregistrationError(Exception):
 
60
    """Error raised when unregistering a handler fails."""
 
61
 
 
62
 
55
63
@implementer(interfaces.IReadDescriptor)
56
64
class PostgresListenerService(Service, object):
57
65
    """Listens for NOTIFY messages from postgres.
84
92
        self.connectionFileno = None
85
93
        self.notifications = set()
86
94
        self.notifier = task.LoopingCall(self.handleNotifies)
 
95
        self.notifierDone = None
87
96
        self.connecting = None
88
97
        self.disconnecting = None
89
98
        self.registeredChannels = False
122
131
        kwargs['system'] = self.logPrefix()
123
132
        log.err(*args, **kwargs)
124
133
 
 
134
    def isSystemChannel(self, channel):
 
135
        """Return True if channel is a system channel."""
 
136
        return channel.startswith("sys_")
 
137
 
125
138
    def doRead(self):
126
139
        """Poll the connection and process any notifications."""
127
140
        try:
149
162
            notifies = self.connection.connection.notifies
150
163
            if len(notifies) != 0:
151
164
                for notify in notifies:
152
 
                    self.notifications.add((notify.channel, notify.payload))
 
165
                    if self.isSystemChannel(notify.channel):
 
166
                        # System level message; pass it to the registered
 
167
                        # handler immediately.
 
168
                        handler = self.listeners[notify.channel][0]
 
169
                        handler(notify.channel, notify.payload)
 
170
                    else:
 
171
                        # Place non-system messages into the queue to be
 
172
                        # processed.
 
173
                        self.notifications.add(
 
174
                            (notify.channel, notify.payload))
153
175
                # Delete the contents of the connection's notifies list so
154
176
                # that we don't process them a second time.
155
177
                del notifies[:]
182
204
        When a notification is received for that `channel` the `handler` will
183
205
        be called with the action and object id.
184
206
        """
185
 
        self.listeners[channel].append(handler)
 
207
        handlers = self.listeners[channel]
 
208
        if self.isSystemChannel(channel) and len(handlers) > 0:
 
209
            # A system can only be registered once. This is because the
 
210
            # message is passed directly to the handler and the `doRead`
 
211
            # method does not wait for it to finish if its a defer. This is
 
212
            # different from normal handlers where we will call each and wait
 
213
            # for all to resolve before continuing to the next event.
 
214
            raise PostgresListenerRegistrationError(
 
215
                "System channel '%s' has already been registered." % channel)
 
216
        else:
 
217
            handlers.append(handler)
186
218
        if self.registeredChannels and self.connection:
187
219
            # Channels have already been registered. Register the
188
220
            # new channel on the already existing connection.
189
221
            self.registerChannel(channel)
190
222
 
 
223
    def unregister(self, channel, handler):
 
224
        """Unregister listening for notifications from a channel.
 
225
 
 
226
        `handler` needs to be same handler that was registered.
 
227
        """
 
228
        if channel not in self.listeners:
 
229
            raise PostgresListenerUnregistrationError(
 
230
                "Channel '%s' is not registered with the listener." % channel)
 
231
        handlers = self.listeners[channel]
 
232
        if handler in handlers:
 
233
            handlers.remove(handler)
 
234
        else:
 
235
            raise PostgresListenerUnregistrationError(
 
236
                "Handler is not registered on that channel '%s'." % channel)
 
237
        if self.registeredChannels and self.connection and len(handlers) == 0:
 
238
            # Channels have already been registered. Unregister the channel.
 
239
            self.unregisterChannel(channel)
 
240
 
191
241
    @synchronous
192
242
    def createConnection(self):
193
243
        """Create new database connection."""
295
345
    def registerChannel(self, channel):
296
346
        """Register the channel."""
297
347
        with closing(self.connection.cursor()) as cursor:
298
 
            if channel.startswith("sys"):
299
 
                # This is a system channel so register it only once not one
300
 
                # for each action.
 
348
            if self.isSystemChannel(channel):
 
349
                # This is a system channel so listen only called once.
301
350
                cursor.execute("LISTEN %s;" % channel)
302
351
            else:
303
 
                # Not a system channel so register one for each action.
304
 
                for action in map_enum(ACTIONS).values():
 
352
                # Not a system channel so listen called once for each action.
 
353
                for action in sorted(map_enum(ACTIONS).values()):
305
354
                    cursor.execute("LISTEN %s_%s;" % (channel, action))
306
355
 
 
356
    def unregisterChannel(self, channel):
 
357
        """Unregister the channel."""
 
358
        with closing(self.connection.cursor()) as cursor:
 
359
            if self.isSystemChannel(channel):
 
360
                # This is a system channel so unlisten only called once.
 
361
                cursor.execute("UNLISTEN %s;" % channel)
 
362
            else:
 
363
                # Not a system channel so unlisten called once for each action.
 
364
                for action in sorted(map_enum(ACTIONS).values()):
 
365
                    cursor.execute("UNLISTEN %s_%s;" % (channel, action))
 
366
 
307
367
    def registerChannels(self):
308
368
        """Register the all the channels."""
309
369
        for channel in self.listeners.keys():
313
373
    def convertChannel(self, channel):
314
374
        """Convert the postgres channel to a registered channel and action.
315
375
 
316
 
        When the channel starts with "sys" then it is a system channel, when
317
 
        not a system channel its structured as {channel}_{action}. This is
318
 
        split to match the correct handler and action for that handler.
319
 
 
320
376
        :raise PostgresListenerNotifyError: When {channel} is not registered or
321
377
            {action} is not in `ACTIONS`.
322
378
        """
323
 
        if channel.startswith("sys"):
324
 
            return channel, None
325
 
        else:
326
 
            channel, action = channel.split('_', 1)
327
 
            if channel not in self.listeners:
328
 
                raise PostgresListenerNotifyError(
329
 
                    "%s is not a registered channel." % channel)
330
 
            if action not in map_enum(ACTIONS).values():
331
 
                raise PostgresListenerNotifyError(
332
 
                    "%s action is not supported." % action)
333
 
            return channel, action
 
379
        channel, action = channel.split('_', 1)
 
380
        if channel not in self.listeners:
 
381
            raise PostgresListenerNotifyError(
 
382
                "%s is not a registered channel." % channel)
 
383
        if action not in map_enum(ACTIONS).values():
 
384
            raise PostgresListenerNotifyError(
 
385
                "%s action is not supported." % action)
 
386
        return channel, action
334
387
 
335
388
    def runHandleNotify(self, delay=0, clock=reactor):
336
389
        """Defer later the `handleNotify`."""
337
390
        if not self.notifier.running:
338
 
            self.notifier.start(delay, now=False)
 
391
            self.notifierDone = self.notifier.start(delay, now=False)
339
392
 
340
393
    def cancelHandleNotify(self):
341
394
        """Cancel the deferred `handleNotify` call."""
342
395
        if self.notifier.running:
343
 
            done = self.notifier.deferred
344
396
            self.notifier.stop()
345
 
            return done
 
397
            return self.notifierDone
346
398
        else:
347
399
            return succeed(None)
348
400