1
# Copyright: (c) 2004 Oliver Schoenborn
2
# License: wxWidgets(based on LGPL,http://www.wxwidgets.org/about/newlicen.htm)
4
#---------------------------------------------------------------------------
6
This module provides a publish-subscribe component that allows
7
listeners to subcribe to messages of a given topic. Contrary to the
8
original wxPython.lib.pubsub module (which it is based on), it uses
9
weak referencing to the subscribers so the lifetime of subscribers
10
is not affected by Publisher. Also, callable objects can be used in
11
addition to functions and bound methods. See Publisher class docs for
14
Thanks to Robb Shecter and Robin Dunn for having provided
15
the basis for this module (which now shares most of the concepts but
16
very little design or implementation with the original
19
The publisher is a singleton instance of the PublisherClass class. You
20
access the instance via the Publisher object available from the module::
22
from wx.lib.pubsub import Publisher
23
Publisher().subscribe(...)
24
Publisher().sendMessage(...)
27
:Author: Oliver Schoenborn
29
:Version: $Id: pubsub.py,v 1.8 2006/06/11 00:12:59 RD Exp $
30
:Copyright: \(c) 2004 Oliver Schoenborn
38
In class Publisher, I represent the topics-listener set as a tree
39
where each node is a topic, and contains a list of listeners of that
40
topic, and a dictionary of subtopics of that topic. When the Publisher
41
is told to send a message for a given topic, it traverses the tree
42
down to the topic for which a message is being generated, all
43
listeners on the way get sent the message.
45
Publisher currently uses a weak listener topic tree to store the
46
topics for each listener, and if a listener dies before being
47
unsubscribed, the tree is notified, and the tree eliminates the
50
Ideally, _TopicTreeNode would be a generic _TreeNode with named
51
subnodes, and _TopicTreeRoot would be a generic _Tree with named
52
nodes, and Publisher would store listeners in each node and a topic
53
tuple would be converted to a path in the tree. This would lead to a
54
much cleaner separation of concerns. But time is over, time to move on.
56
#---------------------------------------------------------------------------
58
# for function and method parameter counting:
59
from types import InstanceType
60
from inspect import getargspec, ismethod, isfunction
61
# for weakly bound methods:
62
from new import instancemethod as InstanceMethod
63
from weakref import ref as WeakRef
65
# -----------------------------------------------------------------------------
68
"""Return true if method is a bound method, false otherwise"""
69
assert ismethod(method)
70
return method.im_self is not None
73
def _paramMinCountFunc(function):
74
"""Given a function, return pair (min,d) where min is minimum # of
75
args required, and d is number of default arguments."""
76
assert isfunction(function)
77
(args, va, kwa, dflt) = getargspec(function)
78
lenDef = len(dflt or ())
79
lenArgs = len(args or ())
80
lenVA = int(va is not None)
81
return (lenArgs - lenDef + lenVA, lenDef)
84
def _paramMinCount(callableObject):
86
Given a callable object (function, method or callable instance),
87
return pair (min,d) where min is minimum # of args required, and d
88
is number of default arguments. The 'self' parameter, in the case
89
of methods, is not counted.
91
if type(callableObject) is InstanceType:
92
min, d = _paramMinCountFunc(callableObject.__call__.im_func)
94
elif ismethod(callableObject):
95
min, d = _paramMinCountFunc(callableObject.im_func)
97
elif isfunction(callableObject):
98
return _paramMinCountFunc(callableObject)
100
raise 'Cannot determine type of callable: '+repr(callableObject)
103
def _tupleize(items):
104
"""Convert items to tuple if not already one,
105
so items must be a list, tuple or non-sequence"""
106
if isinstance(items, list):
107
raise TypeError, 'Not allowed to tuple-ize a list'
108
elif isinstance(items, (str, unicode)) and items.find('.') != -1:
109
items = tuple(items.split('.'))
110
elif not isinstance(items, tuple):
115
def _getCallableName(callable):
116
"""Get name for a callable, ie function, bound
117
method or callable instance"""
118
if ismethod(callable):
119
return '%s.%s ' % (callable.im_self, callable.im_func.func_name)
120
elif isfunction(callable):
121
return '%s ' % callable.__name__
123
return '%s ' % callable
126
def _removeItem(item, fromList):
127
"""Attempt to remove item from fromList, return true
128
if successful, false otherwise."""
130
fromList.remove(item)
136
# -----------------------------------------------------------------------------
139
"""Represent a weak bound method, i.e. a method doesn't keep alive the
140
object that it is bound to. It uses WeakRef which, used on its own,
141
produces weak methods that are dead on creation, not very useful.
142
Typically, you will use the getRef() function instead of using
143
this class directly. """
145
def __init__(self, method, notifyDead = None):
146
"""The method must be bound. notifyDead will be called when
147
object that method is bound to dies. """
148
assert ismethod(method)
149
if method.im_self is None:
150
raise ValueError, "We need a bound method!"
151
if notifyDead is None:
152
self.objRef = WeakRef(method.im_self)
154
self.objRef = WeakRef(method.im_self, notifyDead)
155
self.fun = method.im_func
156
self.cls = method.im_class
159
"""Returns a new.instancemethod if object for method still alive.
160
Otherwise return None. Note that instancemethod causes a
161
strong reference to object to be created, so shouldn't save
162
the return value of this call. Note also that this __call__
163
is required only for compatibility with WeakRef.ref(), otherwise
164
there would be more efficient ways of providing this functionality."""
165
if self.objRef() is None:
168
return InstanceMethod(self.fun, self.objRef(), self.cls)
170
def __eq__(self, method2):
171
"""Two WeakMethod objects compare equal if they refer to the same method
172
of the same instance. Thanks to Josiah Carlson for patch and clarifications
173
on how dict uses eq/cmp and hashing. """
174
if not isinstance(method2, _WeakMethod):
176
return self.fun is method2.fun \
177
and self.objRef() is method2.objRef() \
178
and self.objRef() is not None
181
"""Hash is an optimization for dict searches, it need not
182
return different numbers for every different object. Some objects
183
are not hashable (eg objects of classes derived from dict) so no
184
hash(objRef()) in there, and hash(self.cls) would only be useful
185
in the rare case where instance method was rebound. """
186
return hash(self.fun)
190
if self.objRef() is None:
192
obj = '<%s at %s%s>' % (self.__class__, id(self), dead)
195
def refs(self, weakRef):
196
"""Return true if we are storing same object referred to by weakRef."""
197
return self.objRef == weakRef
200
def _getWeakRef(obj, notifyDead=None):
201
"""Get a weak reference to obj. If obj is a bound method, a _WeakMethod
202
object, that behaves like a WeakRef, is returned, if it is
203
anything else a WeakRef is returned. If obj is an unbound method,
204
a ValueError will be raised."""
206
createRef = _WeakMethod
210
if notifyDead is None:
211
return createRef(obj)
213
return createRef(obj, notifyDead)
216
# -----------------------------------------------------------------------------
218
def getStrAllTopics():
219
"""Function to call if, for whatever reason, you need to know
220
explicitely what is the string to use to indicate 'all topics'."""
224
# alias, easier to see where used
225
ALL_TOPICS = getStrAllTopics()
227
# -----------------------------------------------------------------------------
231
"""Encapsulate a weak reference to a method of a TopicTreeNode
232
in such a way that the method can be called, if the node is
233
still alive, but the callback does not *keep* the node alive.
234
Also, define two methods, preNotify() and noNotify(), which can
235
be redefined to something else, very useful for testing.
238
def __init__(self, obj):
239
self.objRef = _getWeakRef(obj)
241
def __call__(self, weakCB):
242
notify = self.objRef()
243
if notify is not None:
244
self.preNotify(weakCB)
249
def preNotify(self, dead):
250
"""'Gets called just before our callback (self.objRef) is called"""
254
"""Gets called if the TopicTreeNode for this callback is dead"""
258
class _TopicTreeNode:
259
"""A node in the topic tree. This contains a list of callables
260
that are interested in the topic that this node is associated
261
with, and contains a dictionary of subtopics, whose associated
262
values are other _TopicTreeNodes. The topic of a node is not stored
263
in the node, so that the tree can be implemented as a dictionary
264
rather than a list, for ease of use (and, likely, performance).
266
Note that it uses _NodeCallback to encapsulate a callback for
267
when a registered listener dies, possible thanks to WeakRef.
268
Whenever this callback is called, the onDeadListener() function,
269
passed in at construction time, is called (unless it is None).
272
def __init__(self, topicPath, onDeadListenerWeakCB):
273
self.__subtopics = {}
274
self.__callables = []
275
self.__topicPath = topicPath
276
self.__onDeadListenerWeakCB = onDeadListenerWeakCB
278
def getPathname(self):
279
"""The complete node path to us, ie., the topic tuple that would lead to us"""
280
return self.__topicPath
282
def createSubtopic(self, subtopic, topicPath):
283
"""Create a child node for subtopic"""
284
return self.__subtopics.setdefault(subtopic,
285
_TopicTreeNode(topicPath, self.__onDeadListenerWeakCB))
287
def hasSubtopic(self, subtopic):
288
"""Return true only if topic string is one of subtopics of this node"""
289
return self.__subtopics.has_key(subtopic)
291
def getNode(self, subtopic):
292
"""Return ref to node associated with subtopic"""
293
return self.__subtopics[subtopic]
295
def addCallable(self, callable):
296
"""Add a callable to list of callables for this topic node"""
298
id = self.__callables.index(_getWeakRef(callable))
299
return self.__callables[id]
301
wrCall = _getWeakRef(callable, _NodeCallback(self.__notifyDead))
302
self.__callables.append(wrCall)
305
def getCallables(self):
306
"""Get callables associated with this topic node"""
307
return [cb() for cb in self.__callables if cb() is not None]
309
def hasCallable(self, callable):
310
"""Return true if callable in this node"""
312
self.__callables.index(_getWeakRef(callable))
317
def sendMessage(self, message):
318
"""Send a message to our callables"""
320
for cb in self.__callables:
322
if listener is not None:
327
def removeCallable(self, callable):
328
"""Remove weak callable from our node (and return True).
329
Does nothing if not here (and returns False)."""
331
self.__callables.remove(_getWeakRef(callable))
336
def clearCallables(self):
337
"""Abandon list of callables to caller. We no longer have
338
any callables after this method is called."""
339
tmpList = [cb for cb in self.__callables if cb() is not None]
340
self.__callables = []
343
def __notifyDead(self, dead):
344
"""Gets called when a listener dies, thanks to WeakRef"""
345
#print 'TreeNODE', `self`, 'received death certificate for ', dead
347
if self.__onDeadListenerWeakCB is not None:
348
cb = self.__onDeadListenerWeakCB()
352
def __cleanupDead(self):
353
"""Remove all dead objects from list of callables"""
354
self.__callables = [cb for cb in self.__callables if cb() is not None]
357
"""Print us in a not-so-friendly, but readable way, good for debugging."""
359
for callable in self.getCallables():
360
strVal.append(_getCallableName(callable))
361
for topic, node in self.__subtopics.iteritems():
362
strVal.append(' (%s: %s)' %(topic, node))
363
return ''.join(strVal)
366
class _TopicTreeRoot(_TopicTreeNode):
368
The root of the tree knows how to access other node of the
369
tree and is the gateway of the tree user to the tree nodes.
370
It can create topics, and and remove callbacks, etc.
372
For efficiency, it stores a dictionary of listener-topics,
373
so that unsubscribing a listener just requires finding the
374
topics associated to a listener, and finding the corresponding
375
nodes of the tree. Without it, unsubscribing would require
376
that we search the whole tree for all nodes that contain
377
given listener. Since Publisher is a singleton, it will
378
contain all topics in the system so it is likely to be a large
379
tree. However, it is possible that in some runs, unsubscribe()
380
is called very little by the user, in which case most unsubscriptions
381
are automatic, ie caused by the listeners dying. In this case,
382
a flag is set to indicate that the dictionary should be cleaned up
383
at the next opportunity. This is not necessary, it is just an
388
self.__callbackDict = {}
389
self.__callbackDictCleanup = 0
390
# all child nodes will call our __rootNotifyDead method
391
# when one of their registered listeners dies
392
_TopicTreeNode.__init__(self, (ALL_TOPICS,),
393
_getWeakRef(self.__rootNotifyDead))
395
def addTopic(self, topic, listener):
396
"""Add topic to tree if doesnt exist, and add listener to topic node"""
397
assert isinstance(topic, tuple)
398
topicNode = self.__getTreeNode(topic, make=True)
399
weakCB = topicNode.addCallable(listener)
400
assert topicNode.hasCallable(listener)
402
theList = self.__callbackDict.setdefault(weakCB, [])
403
assert self.__callbackDict.has_key(weakCB)
404
# add it only if we don't already have it
406
weakTopicNode = WeakRef(topicNode)
407
theList.index(weakTopicNode)
409
theList.append(weakTopicNode)
410
assert self.__callbackDict[weakCB].index(weakTopicNode) >= 0
412
def getTopics(self, listener):
413
"""Return the list of topics for given listener"""
414
weakNodes = self.__callbackDict.get(_getWeakRef(listener), [])
415
return [weakNode().getPathname() for weakNode in weakNodes
416
if weakNode() is not None]
418
def isSubscribed(self, listener, topic=None):
419
"""Return true if listener is registered for topic specified.
420
If no topic specified, return true if subscribed to something.
421
Use topic=getStrAllTopics() to determine if a listener will receive
422
messages for all topics."""
423
weakCB = _getWeakRef(listener)
425
return self.__callbackDict.has_key(weakCB)
427
topicPath = _tupleize(topic)
428
for weakNode in self.__callbackDict[weakCB]:
429
if topicPath == weakNode().getPathname():
433
def unsubscribe(self, listener, topicList):
434
"""Remove listener from given list of topics. If topicList
435
doesn't have any topics for which listener has subscribed,
437
weakCB = _getWeakRef(listener)
438
if not self.__callbackDict.has_key(weakCB):
441
cbNodes = self.__callbackDict[weakCB]
442
if topicList is None:
443
for weakNode in cbNodes:
444
weakNode().removeCallable(listener)
445
del self.__callbackDict[weakCB]
448
for weakNode in cbNodes:
450
if node is not None and node.getPathname() in topicList:
451
success = node.removeCallable(listener)
452
assert success == True
453
cbNodes.remove(weakNode)
454
assert not self.isSubscribed(listener, node.getPathname())
456
def unsubAll(self, topicList, onNoSuchTopic):
457
"""Unsubscribe all listeners registered for any topic in
458
topicList. If a topic in the list does not exist, and
459
onNoSuchTopic is not None, a call
460
to onNoSuchTopic(topic) is done for that topic."""
461
for topic in topicList:
462
node = self.__getTreeNode(topic)
464
weakCallables = node.clearCallables()
465
for callable in weakCallables:
466
weakNodes = self.__callbackDict[callable]
467
success = _removeItem(WeakRef(node), weakNodes)
468
assert success == True
470
del self.__callbackDict[callable]
471
elif onNoSuchTopic is not None:
474
def sendMessage(self, topic, message, onTopicNeverCreated):
475
"""Send a message for given topic to all registered listeners. If
476
topic doesn't exist, call onTopicNeverCreated(topic)."""
477
# send to the all-toipcs listeners
478
deliveryCount = _TopicTreeNode.sendMessage(self, message)
479
# send to those who listen to given topic or any of its supertopics
481
for topicItem in topic:
482
assert topicItem != ''
483
if node.hasSubtopic(topicItem):
484
node = node.getNode(topicItem)
485
deliveryCount += node.sendMessage(message)
486
else: # topic never created, don't bother continuing
487
if onTopicNeverCreated is not None:
488
onTopicNeverCreated(topic)
492
def numListeners(self):
493
"""Return a pair (live, dead) with count of live and dead listeners in tree"""
495
for cb in self.__callbackDict:
502
# clean up the callback dictionary after how many dead listeners
503
callbackDeadLimit = 10
505
def __rootNotifyDead(self, dead):
506
#print 'TreeROOT received death certificate for ', dead
507
self.__callbackDictCleanup += 1
508
if self.__callbackDictCleanup > _TopicTreeRoot.callbackDeadLimit:
509
self.__callbackDictCleanup = 0
510
oldDict = self.__callbackDict
511
self.__callbackDict = {}
512
for weakCB, weakNodes in oldDict.iteritems():
513
if weakCB() is not None:
514
self.__callbackDict[weakCB] = weakNodes
516
def __getTreeNode(self, topic, make=False):
517
"""Return the tree node for 'topic' from the topic tree. If it
518
doesnt exist and make=True, create it first."""
519
# if the all-topics, give root;
520
if topic == (ALL_TOPICS,):
523
# not root, so traverse tree
526
for topicItem in topic:
528
if topicItem == ALL_TOPICS:
529
raise ValueError, 'Topic tuple must not contain ""'
531
node = node.createSubtopic(topicItem, path)
532
elif node.hasSubtopic(topicItem):
533
node = node.getNode(topicItem)
539
def printCallbacks(self):
540
strVal = ['Callbacks:\n']
541
for listener, weakTopicNodes in self.__callbackDict.iteritems():
542
topics = [topic() for topic in weakTopicNodes if topic() is not None]
543
strVal.append(' %s: %s\n' % (_getCallableName(listener()), topics))
544
return ''.join(strVal)
547
return 'all: %s' % _TopicTreeNode.__str__(self)
550
# -----------------------------------------------------------------------------
552
class _SingletonKey: pass
554
class PublisherClass:
556
The publish/subscribe manager. It keeps track of which listeners
557
are interested in which topics (see subscribe()), and sends a
558
Message for a given topic to listeners that have subscribed to
559
that topic, with optional user data (see sendMessage()).
561
The three important concepts for Publisher are:
563
- listener: a function, bound method or
564
callable object that can be called with one parameter
565
(not counting 'self' in the case of methods). The parameter
566
will be a reference to a Message object. E.g., these listeners
570
def __call__(self, a, b=1): pass # can be called with only one arg
571
def meth(self, a): pass # takes only one arg
572
def meth2(self, a=2, b=''): pass # can be called with one arg
574
def func(a, b=''): pass
577
Publisher().subscribe(foo) # functor
578
Publisher().subscribe(foo.meth) # bound method
579
Publisher().subscribe(foo.meth2) # bound method
580
Publisher().subscribe(func) # function
582
The three types of callables all have arguments that allow a call
583
with only one argument. In every case, the parameter 'a' will contain
586
- topic: a single word, a tuple of words, or a string containing a
587
set of words separated by dots, for example: 'sports.baseball'.
588
A tuple or a dotted notation string denotes a hierarchy of
589
topics from most general to least. For example, a listener of
592
('sports','baseball')
594
would receive messages for these topics::
596
('sports', 'baseball') # because same
597
('sports', 'baseball', 'highscores') # because more specific
601
'sports' # because more general
602
('sports',) # because more general
603
() or ('') # because only for those listening to 'all' topics
604
('news') # because different topic
606
- message: this is an instance of Message, containing the topic for
607
which the message was sent, and any data the sender specified.
609
:note: This class is visible to importers of pubsub only as a
610
Singleton. I.e., every time you execute 'Publisher()', it's
611
actually the same instance of PublisherClass that is
612
returned. So to use, just do'Publisher().method()'.
616
__ALL_TOPICS_TPL = (ALL_TOPICS, )
618
def __init__(self, singletonKey):
619
"""Construct a Publisher. This can only be done by the pubsub
620
module. You just use pubsub.Publisher()."""
621
if not isinstance(singletonKey, _SingletonKey):
622
raise invalid_argument("Use Publisher() to get access to singleton")
623
self.__messageCount = 0
624
self.__deliveryCount = 0
625
self.__topicTree = _TopicTreeRoot()
631
def getDeliveryCount(self):
632
"""How many listeners have received a message since beginning of run"""
633
return self.__deliveryCount
635
def getMessageCount(self):
636
"""How many times sendMessage() was called since beginning of run"""
637
return self.__messageCount
639
def subscribe(self, listener, topic = ALL_TOPICS):
641
Subscribe listener for given topic. If topic is not specified,
642
listener will be subscribed for all topics (that listener will
643
receive a Message for any topic for which a message is generated).
645
This method may be called multiple times for one listener,
646
registering it with many topics. It can also be invoked many
647
times for a particular topic, each time with a different
648
listener. See the class doc for requirements on listener and
651
:note: The listener is held by Publisher() only by *weak*
652
reference. This means you must ensure you have at
653
least one strong reference to listener, otherwise it
654
will be DOA ("dead on arrival"). This is particularly
655
easy to forget when wrapping a listener method in a
656
proxy object (e.g. to bind some of its parameters),
660
def listener(self, event): pass
662
def __init__(self, fun): self.fun = fun
663
def __call__(self, *args): self.fun(*args)
665
Publisher().subscribe( Wrapper(foo.listener) ) # whoops: DOA!
666
wrapper = Wrapper(foo.listener)
667
Publisher().subscribe(wrapper) # good!
669
:note: Calling this method for the same listener, with two
670
topics in the same branch of the topic hierarchy, will
671
cause the listener to be notified twice when a message
672
for the deepest topic is sent. E.g.
673
subscribe(listener, 't1') and then subscribe(listener,
674
('t1','t2')) means that when calling sendMessage('t1'),
675
listener gets one message, but when calling
676
sendMessage(('t1','t2')), listener gets message twice.
679
self.validate(listener)
682
raise TypeError, 'Topic must be either a word, tuple of '\
683
'words, or getStrAllTopics()'
685
self.__topicTree.addTopic(_tupleize(topic), listener)
687
def isSubscribed(self, listener, topic=None):
688
"""Return true if listener has subscribed to topic specified.
689
If no topic specified, return true if subscribed to something.
690
Use topic=getStrAllTopics() to determine if a listener will receive
691
messages for all topics."""
692
return self.__topicTree.isSubscribed(listener, topic)
694
def validate(self, listener):
695
"""Similar to isValid(), but raises a TypeError exception if not valid"""
697
if not callable(listener):
698
raise TypeError, 'Listener '+`listener`+' must be a '\
699
'function, bound method or instance.'
700
# ok, callable, but if method, is it bound:
701
elif ismethod(listener) and not _isbound(listener):
702
raise TypeError, 'Listener '+`listener`+\
703
' is a method but it is unbound!'
705
# check that it takes the right number of parameters
706
min, d = _paramMinCount(listener)
708
raise TypeError, 'Listener '+`listener`+" can't"\
709
' require more than one parameter!'
710
if min <= 0 and d == 0:
711
raise TypeError, 'Listener '+`listener`+' lacking arguments!'
713
assert (min == 0 and d>0) or (min == 1)
715
def isValid(self, listener):
716
"""Return true only if listener will be able to subscribe to
719
self.validate(listener)
724
def unsubAll(self, topics=None, onNoSuchTopic=None):
725
"""Unsubscribe all listeners subscribed for topics. Topics can
726
be a single topic (string or tuple) or a list of topics (ie
727
list containing strings and/or tuples). If topics is not
728
specified, all listeners for all topics will be unsubscribed,
729
ie. the Publisher singleton will have no topics and no listeners
730
left. If onNoSuchTopic is given, it will be called as
731
onNoSuchTopic(topic) for each topic that is unknown.
735
self.__topicTree = _TopicTreeRoot()
738
# make sure every topics are in tuple form
739
if isinstance(topics, list):
740
topicList = [_tupleize(x) for x in topics]
742
topicList = [_tupleize(topics)]
744
# unsub every listener of topics
745
self.__topicTree.unsubAll(topicList, onNoSuchTopic)
747
def unsubscribe(self, listener, topics=None):
748
"""Unsubscribe listener. If topics not specified, listener is
749
completely unsubscribed. Otherwise, it is unsubscribed only
750
for the topic (the usual tuple) or list of topics (ie a list
751
of tuples) specified. Nothing happens if listener is not actually
752
subscribed to any of the topics.
754
Note that if listener subscribed for two topics (a,b) and (a,c),
755
then unsubscribing for topic (a) will do nothing. You must
756
use getAssociatedTopics(listener) and give unsubscribe() the returned
757
list (or a subset thereof).
759
self.validate(listener)
761
if topics is not None:
762
if isinstance(topics, list):
763
topicList = [_tupleize(x) for x in topics]
765
topicList = [_tupleize(topics)]
767
self.__topicTree.unsubscribe(listener, topicList)
769
def getAssociatedTopics(self, listener):
770
"""Return a list of topics the given listener is registered with.
771
Returns [] if listener never subscribed.
773
:attention: when using the return of this method to compare to
774
expected list of topics, remember that topics that are
775
not in the form of a tuple appear as a one-tuple in
776
the return. E.g. if you have subscribed a listener to
777
'topic1' and ('topic2','subtopic2'), this method
780
associatedTopics = [('topic1',), ('topic2','subtopic2')]
782
return self.__topicTree.getTopics(listener)
784
def sendMessage(self, topic=ALL_TOPICS, data=None, onTopicNeverCreated=None):
785
"""Send a message for given topic, with optional data, to
786
subscribed listeners. If topic is not specified, only the
787
listeners that are interested in all topics will receive message.
788
The onTopicNeverCreated is an optional callback of your choice that
789
will be called if the topic given was never created (i.e. it, or
790
one of its subtopics, was never subscribed to by any listener).
791
It will be called as onTopicNeverCreated(topic)."""
792
aTopic = _tupleize(topic)
793
message = Message(aTopic, data)
794
self.__messageCount += 1
796
# send to those who listen to all topics
797
self.__deliveryCount += \
798
self.__topicTree.sendMessage(aTopic, message, onTopicNeverCreated)
805
"""Allows for singleton"""
809
return str(self.__topicTree)
811
# Create the Publisher singleton. We prevent users from (inadvertently)
812
# instantiating more than one object, by requiring a key that is
813
# accessible only to module. From
814
# this point forward any calls to Publisher() will invoke the __call__
815
# of this instance which just returns itself.
817
# The only flaw with this approach is that you can't derive a new
818
# class from Publisher without jumping through hoops. If this ever
819
# becomes an issue then a new Singleton implementaion will need to be
821
_key = _SingletonKey()
822
Publisher = PublisherClass(_key)
825
#---------------------------------------------------------------------------
829
A simple container object for the two components of a message: the
830
topic and the user data. An instance of Message is given to your
831
listener when called by Publisher().sendMessage(topic) (if your
832
listener callback was registered for that topic).
834
def __init__(self, topic, data):
839
return '[Topic: '+`self.topic`+', Data: '+`self.data`+']'
842
#---------------------------------------------------------------------------
846
# Code for a simple command-line test
850
print '----------- Done %s -----------' % funcName
853
def testFunc00(): pass
854
def testFunc21(a,b,c=1): pass
855
def testFuncA(*args): pass
856
def testFuncAK(*args,**kwds): pass
857
def testFuncK(**kwds): pass
860
def testMeth(self,a,b): pass
861
def __call__(self, a): pass
863
def __call__(self, *args): pass
865
assert _paramMinCount(testFunc00)==(0,0)
866
assert _paramMinCount(testFunc21)==(2,1)
867
assert _paramMinCount(testFuncA) ==(1,0)
868
assert _paramMinCount(testFuncAK)==(1,0)
869
assert _paramMinCount(testFuncK) ==(0,0)
871
assert _paramMinCount(Foo.testMeth)==(2,0)
872
assert _paramMinCount(foo.testMeth)==(2,0)
873
assert _paramMinCount(foo)==(1,0)
874
assert _paramMinCount(Foo2())==(1,0)
879
#------------------------
881
_NodeCallback.notified = 0
882
def testPreNotifyNode(self, dead):
883
_NodeCallback.notified += 1
884
print 'testPreNotifyNODE heard notification of', `dead`
885
_NodeCallback.preNotify = testPreNotifyNode
890
def __init__(self, s):
892
def __call__(self, msg):
893
print 'WS#', self.s, ' received msg ', msg
897
def testPreNotifyRoot(dead):
898
print 'testPreNotifyROOT heard notification of', `dead`
900
node = _TopicTreeNode((ALL_TOPICS,), WeakRef(testPreNotifyRoot))
901
boo, baz, bid = WS('boo'), WS('baz'), WS('bid')
902
node.addCallable(boo)
903
node.addCallable(baz)
904
node.addCallable(boo)
905
assert node.getCallables() == [boo,baz]
906
assert node.hasCallable(boo)
908
node.removeCallable(bid) # no-op
909
assert node.hasCallable(baz)
910
assert node.getCallables() == [boo,baz]
912
node.removeCallable(boo)
913
assert node.getCallables() == [baz]
914
assert node.hasCallable(baz)
915
assert not node.hasCallable(boo)
917
node.removeCallable(baz)
918
assert node.getCallables() == []
919
assert not node.hasCallable(baz)
921
node2 = node.createSubtopic('st1', ('st1',))
922
node3 = node.createSubtopic('st2', ('st2',))
923
cb1, cb2, cb = WS('st1_cb1'), WS('st1_cb2'), WS('st2_cb')
924
node2.addCallable(cb1)
925
node2.addCallable(cb2)
926
node3.addCallable(cb)
927
node2.createSubtopic('st3', ('st1','st3'))
928
node2.createSubtopic('st4', ('st1','st4'))
931
assert str(node) == ' (st1: st1_cb1 st1_cb2 (st4: ) (st3: )) (st2: st2_cb )'
933
# verify send message, and that a dead listener does not get sent one
934
delivered = node2.sendMessage('hello')
935
assert delivered == 2
937
delivered = node2.sendMessage('hello')
938
assert delivered == 1
939
assert _NodeCallback.notified == 1
944
#------------------------
948
def __call__(self, a): pass
949
def fun(self, b): pass
950
def fun2(self, b=1): pass
951
def fun3(self, a, b=2): pass
952
def badFun(self): pass
954
def badFun3(self, a, b): pass
959
server.validate(foo.fun)
960
server.validate(foo.fun2)
961
server.validate(foo.fun3)
962
assert not server.isValid(foo.badFun)
963
assert not server.isValid(foo.badFun2)
964
assert not server.isValid(foo.badFun3)
969
#------------------------
971
class SimpleListener:
972
def __init__(self, number):
974
def __call__(self, message = ''):
975
print 'Callable #%s got the message "%s"' %(self.number, message)
976
def notify(self, message):
977
print '%s.notify() got the message "%s"' %(self.number, message)
979
return "SimpleListener_%s" % self.number
982
publisher = Publisher()
985
topic2 = ('history','middle age')
986
topic3 = ('politics','UN')
987
topic4 = ('politics','NATO')
988
topic5 = ('politics','NATO','US')
990
lisnr1 = SimpleListener(1)
991
lisnr2 = SimpleListener(2)
992
def func(message, a=1):
993
print 'Func received message "%s"' % message
995
lisnr4 = lambda x: 'Lambda received message "%s"' % x
997
assert not publisher.isSubscribed(lisnr1)
998
assert not publisher.isSubscribed(lisnr2)
999
assert not publisher.isSubscribed(lisnr3)
1000
assert not publisher.isSubscribed(lisnr4)
1002
publisher.subscribe(lisnr1, topic1)
1003
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,)]
1004
publisher.subscribe(lisnr1, topic2)
1005
publisher.subscribe(lisnr1, topic1) # do it again, should be no-op
1006
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1007
publisher.subscribe(lisnr2.notify, topic3)
1008
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1009
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1010
publisher.subscribe(lisnr3, topic5)
1011
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1012
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1013
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1014
publisher.subscribe(lisnr4)
1016
print "Publisher tree: ", publisher
1017
assert publisher.isSubscribed(lisnr1)
1018
assert publisher.isSubscribed(lisnr1, topic1)
1019
assert publisher.isSubscribed(lisnr1, topic2)
1020
assert publisher.isSubscribed(lisnr2.notify)
1021
assert publisher.isSubscribed(lisnr3, topic5)
1022
assert publisher.isSubscribed(lisnr4, ALL_TOPICS)
1023
expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 ))'
1024
print "Publisher tree: ", publisher
1025
assert str(publisher) == expectTopicTree
1027
publisher.unsubscribe(lisnr1, 'booboo') # should do nothing
1028
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1029
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1030
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1031
publisher.unsubscribe(lisnr1, topic1)
1032
assert publisher.getAssociatedTopics(lisnr1) == [topic2]
1033
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1034
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1035
publisher.unsubscribe(lisnr1, topic2)
1036
publisher.unsubscribe(lisnr1, topic2)
1037
publisher.unsubscribe(lisnr2.notify, topic3)
1038
publisher.unsubscribe(lisnr3, topic5)
1039
assert publisher.getAssociatedTopics(lisnr1) == []
1040
assert publisher.getAssociatedTopics(lisnr2.notify) == []
1041
assert publisher.getAssociatedTopics(lisnr3) == []
1042
publisher.unsubscribe(lisnr4)
1044
expectTopicTree = 'all: (politics: (UN: ) (NATO: (US: ))) (history: (middle age: ))'
1045
print "Publisher tree: ", publisher
1046
assert str(publisher) == expectTopicTree
1047
assert publisher.getDeliveryCount() == 0
1048
assert publisher.getMessageCount() == 0
1050
publisher.unsubAll()
1051
assert str(publisher) == 'all: '
1053
done('testSubscribe')
1056
#------------------------
1059
publisher = Publisher()
1062
topic2 = ('history','middle age')
1063
topic3 = ('politics','UN')
1064
topic4 = ('politics','NATO')
1065
topic5 = ('politics','NATO','US')
1067
lisnr1 = SimpleListener(1)
1068
lisnr2 = SimpleListener(2)
1069
def func(message, a=1):
1070
print 'Func received message "%s"' % message
1072
lisnr4 = lambda x: 'Lambda received message "%s"' % x
1074
publisher.subscribe(lisnr1, topic1)
1075
publisher.subscribe(lisnr1, topic2)
1076
publisher.subscribe(lisnr2.notify, topic3)
1077
publisher.subscribe(lisnr3, topic2)
1078
publisher.subscribe(lisnr3, topic5)
1079
publisher.subscribe(lisnr4)
1081
expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 func ))'
1082
print "Publisher tree: ", publisher
1083
assert str(publisher) == expectTopicTree
1085
publisher.unsubAll(topic1)
1086
assert publisher.getAssociatedTopics(lisnr1) == [topic2]
1087
assert not publisher.isSubscribed(lisnr1, topic1)
1089
publisher.unsubAll(topic2)
1091
assert publisher.getAssociatedTopics(lisnr1) == []
1092
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1093
assert not publisher.isSubscribed(lisnr1)
1094
assert publisher.isSubscribed(lisnr3, topic5)
1096
#print "Publisher tree: ", publisher
1097
expectTopicTree = 'all: <lambda> (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
1098
assert str(publisher) == expectTopicTree
1099
publisher.unsubAll(ALL_TOPICS)
1100
#print "Publisher tree: ", publisher
1101
expectTopicTree = 'all: (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
1102
assert str(publisher) == expectTopicTree
1104
publisher.unsubAll()
1105
done('testUnsubAll')
1108
#------------------------
1111
publisher = Publisher()
1115
def __init__(self, num):
1117
def __call__(self, b):
1118
called.append( 'TL%scb' % self.number )
1119
def notify(self, b):
1120
called.append( 'TL%sm' % self.number )
1121
def funcListener(b):
1122
called.append('func')
1124
lisnr1 = TestListener(1)
1125
lisnr2 = TestListener(2)
1126
lisnr3 = funcListener
1127
lisnr4 = lambda x: called.append('lambda')
1131
topic3 = ('politics','UN')
1132
topic4 = ('politics','NATO','US')
1133
topic5 = ('politics','NATO')
1135
publisher.subscribe(lisnr1, topic1)
1136
publisher.subscribe(lisnr2, topic2)
1137
publisher.subscribe(lisnr2.notify, topic2)
1138
publisher.subscribe(lisnr3, topic4)
1139
publisher.subscribe(lisnr4)
1143
# setup ok, now test send/receipt
1144
publisher.sendMessage(topic1)
1145
assert called == ['lambda','TL1cb']
1147
publisher.sendMessage(topic2)
1148
assert called == ['lambda','TL2cb','TL2m']
1150
publisher.sendMessage(topic3)
1151
assert called == ['lambda','TL1cb']
1153
publisher.sendMessage(topic4)
1154
assert called == ['lambda','TL1cb','func']
1156
publisher.sendMessage(topic5)
1157
assert called == ['lambda','TL1cb']
1158
assert publisher.getDeliveryCount() == 12
1159
assert publisher.getMessageCount() == 5
1161
# test weak referencing works:
1162
_NodeCallback.notified = 0
1165
publisher.sendMessage(topic2)
1166
assert called == ['lambda']
1167
assert _NodeCallback.notified == 2
1172
assert _NodeCallback.notified == 5
1175
# verify if weak references work as expected
1176
print '------ Starting testDead ----------'
1177
node = _TopicTreeNode('t1', None)
1178
lisnr1 = SimpleListener(1)
1179
lisnr2 = SimpleListener(2)
1180
lisnr3 = SimpleListener(3)
1181
lisnr4 = SimpleListener(4)
1183
node.addCallable(lisnr1)
1184
node.addCallable(lisnr2)
1185
node.addCallable(lisnr3)
1186
node.addCallable(lisnr4)
1188
print 'Deleting listeners first'
1189
_NodeCallback.notified = 0
1192
assert _NodeCallback.notified == 2
1194
print 'Deleting node first'
1195
_NodeCallback.notified = 0
1199
assert _NodeCallback.notified == 0
1201
lisnr1 = SimpleListener(1)
1202
lisnr2 = SimpleListener(2)
1203
lisnr3 = SimpleListener(3)
1204
lisnr4 = SimpleListener(4)
1206
# try same with root of tree
1207
node = _TopicTreeRoot()
1208
node.addTopic(('',), lisnr1)
1209
node.addTopic(('',), lisnr2)
1210
node.addTopic(('',), lisnr3)
1211
node.addTopic(('',), lisnr4)
1212
# add objects that will die immediately to see if cleanup occurs
1213
# this must be done visually as it is a low-level detail
1214
_NodeCallback.notified = 0
1215
_TopicTreeRoot.callbackDeadLimit = 3
1216
node.addTopic(('',), SimpleListener(5))
1217
node.addTopic(('',), SimpleListener(6))
1218
node.addTopic(('',), SimpleListener(7))
1219
print node.numListeners()
1220
assert node.numListeners() == (4, 3)
1221
node.addTopic(('',), SimpleListener(8))
1222
assert node.numListeners() == (4, 0)
1223
assert _NodeCallback.notified == 4
1225
print 'Deleting listeners first'
1226
_NodeCallback.notified = 0
1229
assert _NodeCallback.notified == 2
1230
print 'Deleting node first'
1231
_NodeCallback.notified = 0
1235
assert _NodeCallback.notified == 0
1241
print 'Exiting tests'
1242
#---------------------------------------------------------------------------
1244
if __name__ == '__main__':