2
#---------------------------------------------------------------------------
4
This module provides a publish-subscribe component that allows
5
listeners to subcribe to messages of a given topic. Contrary to the
6
original wxPython.lib.pubsub module (which it is based on), it uses
7
weak referencing to the subscribers so the lifetime of subscribers
8
is not affected by Publisher. Also, callable objects can be used in
9
addition to functions and bound methods. See Publisher class docs for
12
Thanks to Robb Shecter and Robin Dunn for having provided
13
the basis for this module (which now shares most of the concepts but
14
very little design or implementation with the original
17
The publisher is a singleton instance of the PublisherClass class. You
18
access the instance via the Publisher object available from the module::
20
from wx.lib.pubsub import Publisher
21
Publisher().subscribe(...)
22
Publisher().sendMessage(...)
25
:Author: Oliver Schoenborn
27
:Version: $Id: pubsub.py 69063 2011-09-11 18:19:32Z CJP $
28
:Copyright: \(c) 2004 Oliver Schoenborn
36
In class Publisher, I represent the topics-listener set as a tree
37
where each node is a topic, and contains a list of listeners of that
38
topic, and a dictionary of subtopics of that topic. When the Publisher
39
is told to send a message for a given topic, it traverses the tree
40
down to the topic for which a message is being generated, all
41
listeners on the way get sent the message.
43
Publisher currently uses a weak listener topic tree to store the
44
topics for each listener, and if a listener dies before being
45
unsubscribed, the tree is notified, and the tree eliminates the
48
Ideally, _TopicTreeNode would be a generic _TreeNode with named
49
subnodes, and _TopicTreeRoot would be a generic _Tree with named
50
nodes, and Publisher would store listeners in each node and a topic
51
tuple would be converted to a path in the tree. This would lead to a
52
much cleaner separation of concerns. But time is over, time to move on.
54
#---------------------------------------------------------------------------
56
# for function and method parameter counting:
57
from types import InstanceType
58
from inspect import getargspec, ismethod, isfunction
59
# for weakly bound methods:
60
from new import instancemethod as InstanceMethod
61
from weakref import ref as WeakRef
63
# -----------------------------------------------------------------------------
66
"""Return true if method is a bound method, false otherwise"""
67
assert ismethod(method)
68
return method.im_self is not None
71
def _paramMinCountFunc(function):
72
"""Given a function, return pair (min,d) where min is minimum # of
73
args required, and d is number of default arguments."""
74
assert isfunction(function)
75
(args, va, kwa, dflt) = getargspec(function)
76
lenDef = len(dflt or ())
77
lenArgs = len(args or ())
78
lenVA = int(va is not None)
79
return (lenArgs - lenDef + lenVA, lenDef)
82
def _paramMinCount(callableObject):
84
Given a callable object (function, method or callable instance),
85
return pair (min,d) where min is minimum # of args required, and d
86
is number of default arguments. The 'self' parameter, in the case
87
of methods, is not counted.
89
if type(callableObject) is InstanceType:
90
min, d = _paramMinCountFunc(callableObject.__call__.im_func)
92
elif ismethod(callableObject):
93
min, d = _paramMinCountFunc(callableObject.im_func)
95
elif isfunction(callableObject):
96
return _paramMinCountFunc(callableObject)
98
raise 'Cannot determine type of callable: '+repr(callableObject)
101
def _tupleize(items):
102
"""Convert items to tuple if not already one,
103
so items must be a list, tuple or non-sequence"""
104
if isinstance(items, list):
105
raise TypeError, 'Not allowed to tuple-ize a list'
106
elif isinstance(items, (str, unicode)) and items.find('.') != -1:
107
items = tuple(items.split('.'))
108
elif not isinstance(items, tuple):
113
def _getCallableName(callable):
114
"""Get name for a callable, ie function, bound
115
method or callable instance"""
116
if ismethod(callable):
117
return '%s.%s ' % (callable.im_self, callable.im_func.func_name)
118
elif isfunction(callable):
119
return '%s ' % callable.__name__
121
return '%s ' % callable
124
def _removeItem(item, fromList):
125
"""Attempt to remove item from fromList, return true
126
if successful, false otherwise."""
128
fromList.remove(item)
134
# -----------------------------------------------------------------------------
137
"""Represent a weak bound method, i.e. a method doesn't keep alive the
138
object that it is bound to. It uses WeakRef which, used on its own,
139
produces weak methods that are dead on creation, not very useful.
140
Typically, you will use the getRef() function instead of using
141
this class directly. """
143
def __init__(self, method, notifyDead = None):
144
"""The method must be bound. notifyDead will be called when
145
object that method is bound to dies. """
146
assert ismethod(method)
147
if method.im_self is None:
148
raise ValueError, "We need a bound method!"
149
if notifyDead is None:
150
self.objRef = WeakRef(method.im_self)
152
self.objRef = WeakRef(method.im_self, notifyDead)
153
self.fun = method.im_func
154
self.cls = method.im_class
157
"""Returns a new.instancemethod if object for method still alive.
158
Otherwise return None. Note that instancemethod causes a
159
strong reference to object to be created, so shouldn't save
160
the return value of this call. Note also that this __call__
161
is required only for compatibility with WeakRef.ref(), otherwise
162
there would be more efficient ways of providing this functionality."""
163
if self.objRef() is None:
166
return InstanceMethod(self.fun, self.objRef(), self.cls)
168
def __eq__(self, method2):
169
"""Two WeakMethod objects compare equal if they refer to the same method
170
of the same instance. Thanks to Josiah Carlson for patch and clarifications
171
on how dict uses eq/cmp and hashing. """
172
if not isinstance(method2, _WeakMethod):
174
return self.fun is method2.fun \
175
and self.objRef() is method2.objRef() \
176
and self.objRef() is not None
179
"""Hash is an optimization for dict searches, it need not
180
return different numbers for every different object. Some objects
181
are not hashable (eg objects of classes derived from dict) so no
182
hash(objRef()) in there, and hash(self.cls) would only be useful
183
in the rare case where instance method was rebound. """
184
return hash(self.fun)
188
if self.objRef() is None:
190
obj = '<%s at %s%s>' % (self.__class__, id(self), dead)
193
def refs(self, weakRef):
194
"""Return true if we are storing same object referred to by weakRef."""
195
return self.objRef == weakRef
198
def _getWeakRef(obj, notifyDead=None):
199
"""Get a weak reference to obj. If obj is a bound method, a _WeakMethod
200
object, that behaves like a WeakRef, is returned, if it is
201
anything else a WeakRef is returned. If obj is an unbound method,
202
a ValueError will be raised."""
204
createRef = _WeakMethod
208
if notifyDead is None:
209
return createRef(obj)
211
return createRef(obj, notifyDead)
214
# -----------------------------------------------------------------------------
216
def getStrAllTopics():
217
"""Function to call if, for whatever reason, you need to know
218
explicitely what is the string to use to indicate 'all topics'."""
222
# alias, easier to see where used
223
ALL_TOPICS = getStrAllTopics()
225
# -----------------------------------------------------------------------------
229
"""Encapsulate a weak reference to a method of a TopicTreeNode
230
in such a way that the method can be called, if the node is
231
still alive, but the callback does not *keep* the node alive.
232
Also, define two methods, preNotify() and noNotify(), which can
233
be redefined to something else, very useful for testing.
236
def __init__(self, obj):
237
self.objRef = _getWeakRef(obj)
239
def __call__(self, weakCB):
240
notify = self.objRef()
241
if notify is not None:
242
self.preNotify(weakCB)
247
def preNotify(self, dead):
248
"""'Gets called just before our callback (self.objRef) is called"""
252
"""Gets called if the TopicTreeNode for this callback is dead"""
256
class _TopicTreeNode:
257
"""A node in the topic tree. This contains a list of callables
258
that are interested in the topic that this node is associated
259
with, and contains a dictionary of subtopics, whose associated
260
values are other _TopicTreeNodes. The topic of a node is not stored
261
in the node, so that the tree can be implemented as a dictionary
262
rather than a list, for ease of use (and, likely, performance).
264
Note that it uses _NodeCallback to encapsulate a callback for
265
when a registered listener dies, possible thanks to WeakRef.
266
Whenever this callback is called, the onDeadListener() function,
267
passed in at construction time, is called (unless it is None).
270
def __init__(self, topicPath, onDeadListenerWeakCB):
271
self.__subtopics = {}
272
self.__callables = []
273
self.__topicPath = topicPath
274
self.__onDeadListenerWeakCB = onDeadListenerWeakCB
276
def getPathname(self):
277
"""The complete node path to us, ie., the topic tuple that would lead to us"""
278
return self.__topicPath
280
def createSubtopic(self, subtopic, topicPath):
281
"""Create a child node for subtopic"""
282
return self.__subtopics.setdefault(subtopic,
283
_TopicTreeNode(topicPath, self.__onDeadListenerWeakCB))
285
def hasSubtopic(self, subtopic):
286
"""Return true only if topic string is one of subtopics of this node"""
287
return self.__subtopics.has_key(subtopic)
289
def getNode(self, subtopic):
290
"""Return ref to node associated with subtopic"""
291
return self.__subtopics[subtopic]
293
def addCallable(self, callable):
294
"""Add a callable to list of callables for this topic node"""
296
id = self.__callables.index(_getWeakRef(callable))
297
return self.__callables[id]
299
wrCall = _getWeakRef(callable, _NodeCallback(self.__notifyDead))
300
self.__callables.append(wrCall)
303
def getCallables(self):
304
"""Get callables associated with this topic node"""
305
return [cb() for cb in self.__callables if cb() is not None]
307
def hasCallable(self, callable):
308
"""Return true if callable in this node"""
310
self.__callables.index(_getWeakRef(callable))
315
def sendMessage(self, message):
316
"""Send a message to our callables"""
318
for cb in self.__callables:
320
if listener is not None:
325
def removeCallable(self, callable):
326
"""Remove weak callable from our node (and return True).
327
Does nothing if not here (and returns False)."""
329
self.__callables.remove(_getWeakRef(callable))
334
def clearCallables(self):
335
"""Abandon list of callables to caller. We no longer have
336
any callables after this method is called."""
337
tmpList = [cb for cb in self.__callables if cb() is not None]
338
self.__callables = []
341
def __notifyDead(self, dead):
342
"""Gets called when a listener dies, thanks to WeakRef"""
343
#print 'TreeNODE', `self`, 'received death certificate for ', dead
345
if self.__onDeadListenerWeakCB is not None:
346
cb = self.__onDeadListenerWeakCB()
350
def __cleanupDead(self):
351
"""Remove all dead objects from list of callables"""
352
self.__callables = [cb for cb in self.__callables if cb() is not None]
355
"""Print us in a not-so-friendly, but readable way, good for debugging."""
357
for callable in self.getCallables():
358
strVal.append(_getCallableName(callable))
359
for topic, node in self.__subtopics.iteritems():
360
strVal.append(' (%s: %s)' %(topic, node))
361
return ''.join(strVal)
364
class _TopicTreeRoot(_TopicTreeNode):
366
The root of the tree knows how to access other node of the
367
tree and is the gateway of the tree user to the tree nodes.
368
It can create topics, and and remove callbacks, etc.
370
For efficiency, it stores a dictionary of listener-topics,
371
so that unsubscribing a listener just requires finding the
372
topics associated to a listener, and finding the corresponding
373
nodes of the tree. Without it, unsubscribing would require
374
that we search the whole tree for all nodes that contain
375
given listener. Since Publisher is a singleton, it will
376
contain all topics in the system so it is likely to be a large
377
tree. However, it is possible that in some runs, unsubscribe()
378
is called very little by the user, in which case most unsubscriptions
379
are automatic, ie caused by the listeners dying. In this case,
380
a flag is set to indicate that the dictionary should be cleaned up
381
at the next opportunity. This is not necessary, it is just an
386
self.__callbackDict = {}
387
self.__callbackDictCleanup = 0
388
# all child nodes will call our __rootNotifyDead method
389
# when one of their registered listeners dies
390
_TopicTreeNode.__init__(self, (ALL_TOPICS,),
391
_getWeakRef(self.__rootNotifyDead))
393
def addTopic(self, topic, listener):
394
"""Add topic to tree if doesnt exist, and add listener to topic node"""
395
assert isinstance(topic, tuple)
396
topicNode = self.__getTreeNode(topic, make=True)
397
weakCB = topicNode.addCallable(listener)
398
assert topicNode.hasCallable(listener)
400
theList = self.__callbackDict.setdefault(weakCB, [])
401
assert self.__callbackDict.has_key(weakCB)
402
# add it only if we don't already have it
404
weakTopicNode = WeakRef(topicNode)
405
theList.index(weakTopicNode)
407
theList.append(weakTopicNode)
408
assert self.__callbackDict[weakCB].index(weakTopicNode) >= 0
410
def getTopics(self, listener):
411
"""Return the list of topics for given listener"""
412
weakNodes = self.__callbackDict.get(_getWeakRef(listener), [])
413
return [weakNode().getPathname() for weakNode in weakNodes
414
if weakNode() is not None]
416
def isSubscribed(self, listener, topic=None):
417
"""Return true if listener is registered for topic specified.
418
If no topic specified, return true if subscribed to something.
419
Use topic=getStrAllTopics() to determine if a listener will receive
420
messages for all topics."""
421
weakCB = _getWeakRef(listener)
423
return self.__callbackDict.has_key(weakCB)
425
topicPath = _tupleize(topic)
426
for weakNode in self.__callbackDict[weakCB]:
427
if topicPath == weakNode().getPathname():
431
def unsubscribe(self, listener, topicList):
432
"""Remove listener from given list of topics. If topicList
433
doesn't have any topics for which listener has subscribed,
435
weakCB = _getWeakRef(listener)
436
if not self.__callbackDict.has_key(weakCB):
439
cbNodes = self.__callbackDict[weakCB]
440
if topicList is None:
441
for weakNode in cbNodes:
442
weakNode().removeCallable(listener)
443
del self.__callbackDict[weakCB]
446
for weakNode in cbNodes:
448
if node is not None and node.getPathname() in topicList:
449
success = node.removeCallable(listener)
450
assert success == True
451
cbNodes.remove(weakNode)
452
assert not self.isSubscribed(listener, node.getPathname())
454
def unsubAll(self, topicList, onNoSuchTopic):
455
"""Unsubscribe all listeners registered for any topic in
456
topicList. If a topic in the list does not exist, and
457
onNoSuchTopic is not None, a call
458
to onNoSuchTopic(topic) is done for that topic."""
459
for topic in topicList:
460
node = self.__getTreeNode(topic)
462
weakCallables = node.clearCallables()
463
for callable in weakCallables:
464
weakNodes = self.__callbackDict[callable]
465
success = _removeItem(WeakRef(node), weakNodes)
466
assert success == True
468
del self.__callbackDict[callable]
469
elif onNoSuchTopic is not None:
472
def sendMessage(self, topic, message, onTopicNeverCreated):
473
"""Send a message for given topic to all registered listeners. If
474
topic doesn't exist, call onTopicNeverCreated(topic)."""
475
# send to the all-toipcs listeners
476
deliveryCount = _TopicTreeNode.sendMessage(self, message)
477
# send to those who listen to given topic or any of its supertopics
479
for topicItem in topic:
480
assert topicItem != ''
481
if node.hasSubtopic(topicItem):
482
node = node.getNode(topicItem)
483
deliveryCount += node.sendMessage(message)
484
else: # topic never created, don't bother continuing
485
if onTopicNeverCreated is not None:
486
onTopicNeverCreated(topic)
490
def numListeners(self):
491
"""Return a pair (live, dead) with count of live and dead listeners in tree"""
493
for cb in self.__callbackDict:
500
# clean up the callback dictionary after how many dead listeners
501
callbackDeadLimit = 10
503
def __rootNotifyDead(self, dead):
504
#print 'TreeROOT received death certificate for ', dead
505
self.__callbackDictCleanup += 1
506
if self.__callbackDictCleanup > _TopicTreeRoot.callbackDeadLimit:
507
self.__callbackDictCleanup = 0
508
oldDict = self.__callbackDict
509
self.__callbackDict = {}
510
for weakCB, weakNodes in oldDict.iteritems():
511
if weakCB() is not None:
512
self.__callbackDict[weakCB] = weakNodes
514
def __getTreeNode(self, topic, make=False):
515
"""Return the tree node for 'topic' from the topic tree. If it
516
doesnt exist and make=True, create it first."""
517
# if the all-topics, give root;
518
if topic == (ALL_TOPICS,):
521
# not root, so traverse tree
524
for topicItem in topic:
526
if topicItem == ALL_TOPICS:
527
raise ValueError, 'Topic tuple must not contain ""'
529
node = node.createSubtopic(topicItem, path)
530
elif node.hasSubtopic(topicItem):
531
node = node.getNode(topicItem)
537
def printCallbacks(self):
538
strVal = ['Callbacks:\n']
539
for listener, weakTopicNodes in self.__callbackDict.iteritems():
540
topics = [topic() for topic in weakTopicNodes if topic() is not None]
541
strVal.append(' %s: %s\n' % (_getCallableName(listener()), topics))
542
return ''.join(strVal)
545
return 'all: %s' % _TopicTreeNode.__str__(self)
548
# -----------------------------------------------------------------------------
550
class _SingletonKey: pass
552
class PublisherClass:
554
The publish/subscribe manager. It keeps track of which listeners
555
are interested in which topics (see subscribe()), and sends a
556
Message for a given topic to listeners that have subscribed to
557
that topic, with optional user data (see sendMessage()).
559
The three important concepts for Publisher are:
561
- listener: a function, bound method or
562
callable object that can be called with one parameter
563
(not counting 'self' in the case of methods). The parameter
564
will be a reference to a Message object. E.g., these listeners
568
def __call__(self, a, b=1): pass # can be called with only one arg
569
def meth(self, a): pass # takes only one arg
570
def meth2(self, a=2, b=''): pass # can be called with one arg
572
def func(a, b=''): pass
575
Publisher().subscribe(foo) # functor
576
Publisher().subscribe(foo.meth) # bound method
577
Publisher().subscribe(foo.meth2) # bound method
578
Publisher().subscribe(func) # function
580
The three types of callables all have arguments that allow a call
581
with only one argument. In every case, the parameter 'a' will contain
584
- topic: a single word, a tuple of words, or a string containing a
585
set of words separated by dots, for example: 'sports.baseball'.
586
A tuple or a dotted notation string denotes a hierarchy of
587
topics from most general to least. For example, a listener of
590
('sports','baseball')
592
would receive messages for these topics::
594
('sports', 'baseball') # because same
595
('sports', 'baseball', 'highscores') # because more specific
599
'sports' # because more general
600
('sports',) # because more general
601
() or ('') # because only for those listening to 'all' topics
602
('news') # because different topic
604
- message: this is an instance of Message, containing the topic for
605
which the message was sent, and any data the sender specified.
607
:note: This class is visible to importers of pubsub only as a
608
Singleton. I.e., every time you execute 'Publisher()', it's
609
actually the same instance of PublisherClass that is
610
returned. So to use, just do'Publisher().method()'.
614
__ALL_TOPICS_TPL = (ALL_TOPICS, )
616
def __init__(self, singletonKey):
617
"""Construct a Publisher. This can only be done by the pubsub
618
module. You just use pubsub.Publisher()."""
619
if not isinstance(singletonKey, _SingletonKey):
620
raise invalid_argument("Use Publisher() to get access to singleton")
621
self.__messageCount = 0
622
self.__deliveryCount = 0
623
self.__topicTree = _TopicTreeRoot()
629
def getDeliveryCount(self):
630
"""How many listeners have received a message since beginning of run"""
631
return self.__deliveryCount
633
def getMessageCount(self):
634
"""How many times sendMessage() was called since beginning of run"""
635
return self.__messageCount
637
def subscribe(self, listener, topic = ALL_TOPICS):
639
Subscribe listener for given topic. If topic is not specified,
640
listener will be subscribed for all topics (that listener will
641
receive a Message for any topic for which a message is generated).
643
This method may be called multiple times for one listener,
644
registering it with many topics. It can also be invoked many
645
times for a particular topic, each time with a different
646
listener. See the class doc for requirements on listener and
649
:note: The listener is held by Publisher() only by *weak*
650
reference. This means you must ensure you have at
651
least one strong reference to listener, otherwise it
652
will be DOA ("dead on arrival"). This is particularly
653
easy to forget when wrapping a listener method in a
654
proxy object (e.g. to bind some of its parameters),
658
def listener(self, event): pass
660
def __init__(self, fun): self.fun = fun
661
def __call__(self, *args): self.fun(*args)
663
Publisher().subscribe( Wrapper(foo.listener) ) # whoops: DOA!
664
wrapper = Wrapper(foo.listener)
665
Publisher().subscribe(wrapper) # good!
667
:note: Calling this method for the same listener, with two
668
topics in the same branch of the topic hierarchy, will
669
cause the listener to be notified twice when a message
670
for the deepest topic is sent. E.g.
671
subscribe(listener, 't1') and then subscribe(listener,
672
('t1','t2')) means that when calling sendMessage('t1'),
673
listener gets one message, but when calling
674
sendMessage(('t1','t2')), listener gets message twice.
677
self.validate(listener)
680
raise TypeError, 'Topic must be either a word, tuple of '\
681
'words, or getStrAllTopics()'
683
self.__topicTree.addTopic(_tupleize(topic), listener)
685
def isSubscribed(self, listener, topic=None):
686
"""Return true if listener has subscribed to topic specified.
687
If no topic specified, return true if subscribed to something.
688
Use topic=getStrAllTopics() to determine if a listener will receive
689
messages for all topics."""
690
return self.__topicTree.isSubscribed(listener, topic)
692
def validate(self, listener):
693
"""Similar to isValid(), but raises a TypeError exception if not valid"""
695
if not callable(listener):
696
raise TypeError, 'Listener '+`listener`+' must be a '\
697
'function, bound method or instance.'
698
# ok, callable, but if method, is it bound:
699
elif ismethod(listener) and not _isbound(listener):
700
raise TypeError, 'Listener '+`listener`+\
701
' is a method but it is unbound!'
703
# check that it takes the right number of parameters
704
min, d = _paramMinCount(listener)
706
raise TypeError, 'Listener '+`listener`+" can't"\
707
' require more than one parameter!'
708
if min <= 0 and d == 0:
709
raise TypeError, 'Listener '+`listener`+' lacking arguments!'
711
assert (min == 0 and d>0) or (min == 1)
713
def isValid(self, listener):
714
"""Return true only if listener will be able to subscribe to
717
self.validate(listener)
722
def unsubAll(self, topics=None, onNoSuchTopic=None):
723
"""Unsubscribe all listeners subscribed for topics. Topics can
724
be a single topic (string or tuple) or a list of topics (ie
725
list containing strings and/or tuples). If topics is not
726
specified, all listeners for all topics will be unsubscribed,
727
ie. the Publisher singleton will have no topics and no listeners
728
left. If onNoSuchTopic is given, it will be called as
729
onNoSuchTopic(topic) for each topic that is unknown.
733
self.__topicTree = _TopicTreeRoot()
736
# make sure every topics are in tuple form
737
if isinstance(topics, list):
738
topicList = [_tupleize(x) for x in topics]
740
topicList = [_tupleize(topics)]
742
# unsub every listener of topics
743
self.__topicTree.unsubAll(topicList, onNoSuchTopic)
745
def unsubscribe(self, listener, topics=None):
746
"""Unsubscribe listener. If topics not specified, listener is
747
completely unsubscribed. Otherwise, it is unsubscribed only
748
for the topic (the usual tuple) or list of topics (ie a list
749
of tuples) specified. Nothing happens if listener is not actually
750
subscribed to any of the topics.
752
Note that if listener subscribed for two topics (a,b) and (a,c),
753
then unsubscribing for topic (a) will do nothing. You must
754
use getAssociatedTopics(listener) and give unsubscribe() the returned
755
list (or a subset thereof).
757
self.validate(listener)
759
if topics is not None:
760
if isinstance(topics, list):
761
topicList = [_tupleize(x) for x in topics]
763
topicList = [_tupleize(topics)]
765
self.__topicTree.unsubscribe(listener, topicList)
767
def getAssociatedTopics(self, listener):
768
"""Return a list of topics the given listener is registered with.
769
Returns [] if listener never subscribed.
771
:attention: when using the return of this method to compare to
772
expected list of topics, remember that topics that are
773
not in the form of a tuple appear as a one-tuple in
774
the return. E.g. if you have subscribed a listener to
775
'topic1' and ('topic2','subtopic2'), this method
778
associatedTopics = [('topic1',), ('topic2','subtopic2')]
780
return self.__topicTree.getTopics(listener)
782
def sendMessage(self, topic=ALL_TOPICS,
783
data=None, onTopicNeverCreated=None,
785
"""Send a message for given topic, with optional data, to
786
subscribed listeners. If topic is not specified, only the
787
listeners that are interested in all topics will receive message.
788
The onTopicNeverCreated is an optional callback of your choice that
789
will be called if the topic given was never created (i.e. it, or
790
one of its subtopics, was never subscribed to by any listener).
791
It will be called as onTopicNeverCreated(topic)."""
792
aTopic = _tupleize(topic)
793
message = Message(aTopic, data, context=context)
794
self.__messageCount += 1
796
# send to those who listen to all topics
797
self.__deliveryCount += \
798
self.__topicTree.sendMessage(aTopic, message, onTopicNeverCreated)
805
"""Allows for singleton"""
809
return str(self.__topicTree)
811
# Create the Publisher singleton. We prevent users from (inadvertently)
812
# instantiating more than one object, by requiring a key that is
813
# accessible only to module. From
814
# this point forward any calls to Publisher() will invoke the __call__
815
# of this instance which just returns itself.
817
# The only flaw with this approach is that you can't derive a new
818
# class from Publisher without jumping through hoops. If this ever
819
# becomes an issue then a new Singleton implementaion will need to be
821
_key = _SingletonKey()
822
Publisher = PublisherClass(_key)
825
#---------------------------------------------------------------------------
829
A simple container object for the two components of a message: the
830
topic and the user data. An instance of Message is given to your
831
listener when called by Publisher().sendMessage(topic) (if your
832
listener callback was registered for that topic).
834
def __init__(self, topic, data, context=None):
837
self.context = context
839
Data = property(lambda self: self.data,
840
lambda self, data: setattr(self, 'data', data))
841
Context = property(lambda self: self.context,
842
lambda self, ctx: setattr(self, 'context', value))
843
Type = property(lambda self: self.topic)
846
return '[Topic: '+`self.topic`+', Data: '+`self.data`+']'
848
def GetContext(self):
849
"""Get the context that this message was sent in"""
853
"""Return the messages data/value"""
857
"""Return the type of message"""
860
#---------------------------------------------------------------------------
864
# Code for a simple command-line test
868
print '----------- Done %s -----------' % funcName
871
def testFunc00(): pass
872
def testFunc21(a,b,c=1): pass
873
def testFuncA(*args): pass
874
def testFuncAK(*args,**kwds): pass
875
def testFuncK(**kwds): pass
878
def testMeth(self,a,b): pass
879
def __call__(self, a): pass
881
def __call__(self, *args): pass
883
assert _paramMinCount(testFunc00)==(0,0)
884
assert _paramMinCount(testFunc21)==(2,1)
885
assert _paramMinCount(testFuncA) ==(1,0)
886
assert _paramMinCount(testFuncAK)==(1,0)
887
assert _paramMinCount(testFuncK) ==(0,0)
889
assert _paramMinCount(Foo.testMeth)==(2,0)
890
assert _paramMinCount(foo.testMeth)==(2,0)
891
assert _paramMinCount(foo)==(1,0)
892
assert _paramMinCount(Foo2())==(1,0)
897
#------------------------
899
_NodeCallback.notified = 0
900
def testPreNotifyNode(self, dead):
901
_NodeCallback.notified += 1
902
print 'testPreNotifyNODE heard notification of', `dead`
903
_NodeCallback.preNotify = testPreNotifyNode
908
def __init__(self, s):
910
def __call__(self, msg):
911
print 'WS#', self.s, ' received msg ', msg
915
def testPreNotifyRoot(dead):
916
print 'testPreNotifyROOT heard notification of', `dead`
918
node = _TopicTreeNode((ALL_TOPICS,), WeakRef(testPreNotifyRoot))
919
boo, baz, bid = WS('boo'), WS('baz'), WS('bid')
920
node.addCallable(boo)
921
node.addCallable(baz)
922
node.addCallable(boo)
923
assert node.getCallables() == [boo,baz]
924
assert node.hasCallable(boo)
926
node.removeCallable(bid) # no-op
927
assert node.hasCallable(baz)
928
assert node.getCallables() == [boo,baz]
930
node.removeCallable(boo)
931
assert node.getCallables() == [baz]
932
assert node.hasCallable(baz)
933
assert not node.hasCallable(boo)
935
node.removeCallable(baz)
936
assert node.getCallables() == []
937
assert not node.hasCallable(baz)
939
node2 = node.createSubtopic('st1', ('st1',))
940
node3 = node.createSubtopic('st2', ('st2',))
941
cb1, cb2, cb = WS('st1_cb1'), WS('st1_cb2'), WS('st2_cb')
942
node2.addCallable(cb1)
943
node2.addCallable(cb2)
944
node3.addCallable(cb)
945
node2.createSubtopic('st3', ('st1','st3'))
946
node2.createSubtopic('st4', ('st1','st4'))
949
assert str(node) == ' (st1: st1_cb1 st1_cb2 (st4: ) (st3: )) (st2: st2_cb )'
951
# verify send message, and that a dead listener does not get sent one
952
delivered = node2.sendMessage('hello')
953
assert delivered == 2
955
delivered = node2.sendMessage('hello')
956
assert delivered == 1
957
assert _NodeCallback.notified == 1
962
#------------------------
966
def __call__(self, a): pass
967
def fun(self, b): pass
968
def fun2(self, b=1): pass
969
def fun3(self, a, b=2): pass
970
def badFun(self): pass
972
def badFun3(self, a, b): pass
977
server.validate(foo.fun)
978
server.validate(foo.fun2)
979
server.validate(foo.fun3)
980
assert not server.isValid(foo.badFun)
981
assert not server.isValid(foo.badFun2)
982
assert not server.isValid(foo.badFun3)
987
#------------------------
989
class SimpleListener:
990
def __init__(self, number):
992
def __call__(self, message = ''):
993
print 'Callable #%s got the message "%s"' %(self.number, message)
994
def notify(self, message):
995
print '%s.notify() got the message "%s"' %(self.number, message)
997
return "SimpleListener_%s" % self.number
1000
publisher = Publisher()
1003
topic2 = ('history','middle age')
1004
topic3 = ('politics','UN')
1005
topic4 = ('politics','NATO')
1006
topic5 = ('politics','NATO','US')
1008
lisnr1 = SimpleListener(1)
1009
lisnr2 = SimpleListener(2)
1010
def func(message, a=1):
1011
print 'Func received message "%s"' % message
1013
lisnr4 = lambda x: 'Lambda received message "%s"' % x
1015
assert not publisher.isSubscribed(lisnr1)
1016
assert not publisher.isSubscribed(lisnr2)
1017
assert not publisher.isSubscribed(lisnr3)
1018
assert not publisher.isSubscribed(lisnr4)
1020
publisher.subscribe(lisnr1, topic1)
1021
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,)]
1022
publisher.subscribe(lisnr1, topic2)
1023
publisher.subscribe(lisnr1, topic1) # do it again, should be no-op
1024
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1025
publisher.subscribe(lisnr2.notify, topic3)
1026
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1027
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1028
publisher.subscribe(lisnr3, topic5)
1029
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1030
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1031
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1032
publisher.subscribe(lisnr4)
1034
print "Publisher tree: ", publisher
1035
assert publisher.isSubscribed(lisnr1)
1036
assert publisher.isSubscribed(lisnr1, topic1)
1037
assert publisher.isSubscribed(lisnr1, topic2)
1038
assert publisher.isSubscribed(lisnr2.notify)
1039
assert publisher.isSubscribed(lisnr3, topic5)
1040
assert publisher.isSubscribed(lisnr4, ALL_TOPICS)
1041
expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 ))'
1042
print "Publisher tree: ", publisher
1043
assert str(publisher) == expectTopicTree
1045
publisher.unsubscribe(lisnr1, 'booboo') # should do nothing
1046
assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
1047
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1048
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1049
publisher.unsubscribe(lisnr1, topic1)
1050
assert publisher.getAssociatedTopics(lisnr1) == [topic2]
1051
assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
1052
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1053
publisher.unsubscribe(lisnr1, topic2)
1054
publisher.unsubscribe(lisnr1, topic2)
1055
publisher.unsubscribe(lisnr2.notify, topic3)
1056
publisher.unsubscribe(lisnr3, topic5)
1057
assert publisher.getAssociatedTopics(lisnr1) == []
1058
assert publisher.getAssociatedTopics(lisnr2.notify) == []
1059
assert publisher.getAssociatedTopics(lisnr3) == []
1060
publisher.unsubscribe(lisnr4)
1062
expectTopicTree = 'all: (politics: (UN: ) (NATO: (US: ))) (history: (middle age: ))'
1063
print "Publisher tree: ", publisher
1064
assert str(publisher) == expectTopicTree
1065
assert publisher.getDeliveryCount() == 0
1066
assert publisher.getMessageCount() == 0
1068
publisher.unsubAll()
1069
assert str(publisher) == 'all: '
1071
done('testSubscribe')
1074
#------------------------
1077
publisher = Publisher()
1080
topic2 = ('history','middle age')
1081
topic3 = ('politics','UN')
1082
topic4 = ('politics','NATO')
1083
topic5 = ('politics','NATO','US')
1085
lisnr1 = SimpleListener(1)
1086
lisnr2 = SimpleListener(2)
1087
def func(message, a=1):
1088
print 'Func received message "%s"' % message
1090
lisnr4 = lambda x: 'Lambda received message "%s"' % x
1092
publisher.subscribe(lisnr1, topic1)
1093
publisher.subscribe(lisnr1, topic2)
1094
publisher.subscribe(lisnr2.notify, topic3)
1095
publisher.subscribe(lisnr3, topic2)
1096
publisher.subscribe(lisnr3, topic5)
1097
publisher.subscribe(lisnr4)
1099
expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 func ))'
1100
print "Publisher tree: ", publisher
1101
assert str(publisher) == expectTopicTree
1103
publisher.unsubAll(topic1)
1104
assert publisher.getAssociatedTopics(lisnr1) == [topic2]
1105
assert not publisher.isSubscribed(lisnr1, topic1)
1107
publisher.unsubAll(topic2)
1109
assert publisher.getAssociatedTopics(lisnr1) == []
1110
assert publisher.getAssociatedTopics(lisnr3) == [topic5]
1111
assert not publisher.isSubscribed(lisnr1)
1112
assert publisher.isSubscribed(lisnr3, topic5)
1114
#print "Publisher tree: ", publisher
1115
expectTopicTree = 'all: <lambda> (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
1116
assert str(publisher) == expectTopicTree
1117
publisher.unsubAll(ALL_TOPICS)
1118
#print "Publisher tree: ", publisher
1119
expectTopicTree = 'all: (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
1120
assert str(publisher) == expectTopicTree
1122
publisher.unsubAll()
1123
done('testUnsubAll')
1126
#------------------------
1129
publisher = Publisher()
1133
def __init__(self, num):
1135
def __call__(self, b):
1136
called.append( 'TL%scb' % self.number )
1137
def notify(self, b):
1138
called.append( 'TL%sm' % self.number )
1139
def funcListener(b):
1140
called.append('func')
1142
lisnr1 = TestListener(1)
1143
lisnr2 = TestListener(2)
1144
lisnr3 = funcListener
1145
lisnr4 = lambda x: called.append('lambda')
1149
topic3 = ('politics','UN')
1150
topic4 = ('politics','NATO','US')
1151
topic5 = ('politics','NATO')
1153
publisher.subscribe(lisnr1, topic1)
1154
publisher.subscribe(lisnr2, topic2)
1155
publisher.subscribe(lisnr2.notify, topic2)
1156
publisher.subscribe(lisnr3, topic4)
1157
publisher.subscribe(lisnr4)
1161
# setup ok, now test send/receipt
1162
publisher.sendMessage(topic1)
1163
assert called == ['lambda','TL1cb']
1165
publisher.sendMessage(topic2)
1166
assert called == ['lambda','TL2cb','TL2m']
1168
publisher.sendMessage(topic3)
1169
assert called == ['lambda','TL1cb']
1171
publisher.sendMessage(topic4)
1172
assert called == ['lambda','TL1cb','func']
1174
publisher.sendMessage(topic5)
1175
assert called == ['lambda','TL1cb']
1176
assert publisher.getDeliveryCount() == 12
1177
assert publisher.getMessageCount() == 5
1179
# test weak referencing works:
1180
_NodeCallback.notified = 0
1183
publisher.sendMessage(topic2)
1184
assert called == ['lambda']
1185
assert _NodeCallback.notified == 2
1190
assert _NodeCallback.notified == 5
1193
# verify if weak references work as expected
1194
print '------ Starting testDead ----------'
1195
node = _TopicTreeNode('t1', None)
1196
lisnr1 = SimpleListener(1)
1197
lisnr2 = SimpleListener(2)
1198
lisnr3 = SimpleListener(3)
1199
lisnr4 = SimpleListener(4)
1201
node.addCallable(lisnr1)
1202
node.addCallable(lisnr2)
1203
node.addCallable(lisnr3)
1204
node.addCallable(lisnr4)
1206
print 'Deleting listeners first'
1207
_NodeCallback.notified = 0
1210
assert _NodeCallback.notified == 2
1212
print 'Deleting node first'
1213
_NodeCallback.notified = 0
1217
assert _NodeCallback.notified == 0
1219
lisnr1 = SimpleListener(1)
1220
lisnr2 = SimpleListener(2)
1221
lisnr3 = SimpleListener(3)
1222
lisnr4 = SimpleListener(4)
1224
# try same with root of tree
1225
node = _TopicTreeRoot()
1226
node.addTopic(('',), lisnr1)
1227
node.addTopic(('',), lisnr2)
1228
node.addTopic(('',), lisnr3)
1229
node.addTopic(('',), lisnr4)
1230
# add objects that will die immediately to see if cleanup occurs
1231
# this must be done visually as it is a low-level detail
1232
_NodeCallback.notified = 0
1233
_TopicTreeRoot.callbackDeadLimit = 3
1234
node.addTopic(('',), SimpleListener(5))
1235
node.addTopic(('',), SimpleListener(6))
1236
node.addTopic(('',), SimpleListener(7))
1237
print node.numListeners()
1238
assert node.numListeners() == (4, 3)
1239
node.addTopic(('',), SimpleListener(8))
1240
assert node.numListeners() == (4, 0)
1241
assert _NodeCallback.notified == 4
1243
print 'Deleting listeners first'
1244
_NodeCallback.notified = 0
1247
assert _NodeCallback.notified == 2
1248
print 'Deleting node first'
1249
_NodeCallback.notified = 0
1253
assert _NodeCallback.notified == 0
1259
print 'Exiting tests'
1260
#---------------------------------------------------------------------------
1262
if __name__ == '__main__':