52
52
decoded or is not being handled."""
55
class PostgresListenerRegistrationError(Exception):
56
"""Error raised when registering a handler fails."""
59
class PostgresListenerUnregistrationError(Exception):
60
"""Error raised when unregistering a handler fails."""
55
63
@implementer(interfaces.IReadDescriptor)
56
64
class PostgresListenerService(Service, object):
57
65
"""Listens for NOTIFY messages from postgres.
84
92
self.connectionFileno = None
85
93
self.notifications = set()
86
94
self.notifier = task.LoopingCall(self.handleNotifies)
95
self.notifierDone = None
87
96
self.connecting = None
88
97
self.disconnecting = None
89
98
self.registeredChannels = False
122
131
kwargs['system'] = self.logPrefix()
123
132
log.err(*args, **kwargs)
134
def isSystemChannel(self, channel):
135
"""Return True if channel is a system channel."""
136
return channel.startswith("sys_")
125
138
def doRead(self):
126
139
"""Poll the connection and process any notifications."""
149
162
notifies = self.connection.connection.notifies
150
163
if len(notifies) != 0:
151
164
for notify in notifies:
152
self.notifications.add((notify.channel, notify.payload))
165
if self.isSystemChannel(notify.channel):
166
# System level message; pass it to the registered
167
# handler immediately.
168
handler = self.listeners[notify.channel][0]
169
handler(notify.channel, notify.payload)
171
# Place non-system messages into the queue to be
173
self.notifications.add(
174
(notify.channel, notify.payload))
153
175
# Delete the contents of the connection's notifies list so
154
176
# that we don't process them a second time.
182
204
When a notification is received for that `channel` the `handler` will
183
205
be called with the action and object id.
185
self.listeners[channel].append(handler)
207
handlers = self.listeners[channel]
208
if self.isSystemChannel(channel) and len(handlers) > 0:
209
# A system can only be registered once. This is because the
210
# message is passed directly to the handler and the `doRead`
211
# method does not wait for it to finish if its a defer. This is
212
# different from normal handlers where we will call each and wait
213
# for all to resolve before continuing to the next event.
214
raise PostgresListenerRegistrationError(
215
"System channel '%s' has already been registered." % channel)
217
handlers.append(handler)
186
218
if self.registeredChannels and self.connection:
187
219
# Channels have already been registered. Register the
188
220
# new channel on the already existing connection.
189
221
self.registerChannel(channel)
223
def unregister(self, channel, handler):
224
"""Unregister listening for notifications from a channel.
226
`handler` needs to be same handler that was registered.
228
if channel not in self.listeners:
229
raise PostgresListenerUnregistrationError(
230
"Channel '%s' is not registered with the listener." % channel)
231
handlers = self.listeners[channel]
232
if handler in handlers:
233
handlers.remove(handler)
235
raise PostgresListenerUnregistrationError(
236
"Handler is not registered on that channel '%s'." % channel)
237
if self.registeredChannels and self.connection and len(handlers) == 0:
238
# Channels have already been registered. Unregister the channel.
239
self.unregisterChannel(channel)
192
242
def createConnection(self):
193
243
"""Create new database connection."""
295
345
def registerChannel(self, channel):
296
346
"""Register the channel."""
297
347
with closing(self.connection.cursor()) as cursor:
298
if channel.startswith("sys"):
299
# This is a system channel so register it only once not one
348
if self.isSystemChannel(channel):
349
# This is a system channel so listen only called once.
301
350
cursor.execute("LISTEN %s;" % channel)
303
# Not a system channel so register one for each action.
304
for action in map_enum(ACTIONS).values():
352
# Not a system channel so listen called once for each action.
353
for action in sorted(map_enum(ACTIONS).values()):
305
354
cursor.execute("LISTEN %s_%s;" % (channel, action))
356
def unregisterChannel(self, channel):
357
"""Unregister the channel."""
358
with closing(self.connection.cursor()) as cursor:
359
if self.isSystemChannel(channel):
360
# This is a system channel so unlisten only called once.
361
cursor.execute("UNLISTEN %s;" % channel)
363
# Not a system channel so unlisten called once for each action.
364
for action in sorted(map_enum(ACTIONS).values()):
365
cursor.execute("UNLISTEN %s_%s;" % (channel, action))
307
367
def registerChannels(self):
308
368
"""Register the all the channels."""
309
369
for channel in self.listeners.keys():
313
373
def convertChannel(self, channel):
314
374
"""Convert the postgres channel to a registered channel and action.
316
When the channel starts with "sys" then it is a system channel, when
317
not a system channel its structured as {channel}_{action}. This is
318
split to match the correct handler and action for that handler.
320
376
:raise PostgresListenerNotifyError: When {channel} is not registered or
321
377
{action} is not in `ACTIONS`.
323
if channel.startswith("sys"):
326
channel, action = channel.split('_', 1)
327
if channel not in self.listeners:
328
raise PostgresListenerNotifyError(
329
"%s is not a registered channel." % channel)
330
if action not in map_enum(ACTIONS).values():
331
raise PostgresListenerNotifyError(
332
"%s action is not supported." % action)
333
return channel, action
379
channel, action = channel.split('_', 1)
380
if channel not in self.listeners:
381
raise PostgresListenerNotifyError(
382
"%s is not a registered channel." % channel)
383
if action not in map_enum(ACTIONS).values():
384
raise PostgresListenerNotifyError(
385
"%s action is not supported." % action)
386
return channel, action
335
388
def runHandleNotify(self, delay=0, clock=reactor):
336
389
"""Defer later the `handleNotify`."""
337
390
if not self.notifier.running:
338
self.notifier.start(delay, now=False)
391
self.notifierDone = self.notifier.start(delay, now=False)
340
393
def cancelHandleNotify(self):
341
394
"""Cancel the deferred `handleNotify` call."""
342
395
if self.notifier.running:
343
done = self.notifier.deferred
344
396
self.notifier.stop()
397
return self.notifierDone
347
399
return succeed(None)