1
# Copyright (C) 2008 by the Free Software Foundation, Inc.
3
# This program is free software; you can redistribute it and/or
4
# modify it under the terms of the GNU General Public License
5
# as published by the Free Software Foundation; either version 2
6
# of the License, or (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
18
"""Various test helpers."""
23
'make_testable_runner',
28
def make_testable_runner(runner_class):
29
"""Create a queue runner that runs until its queue is empty.
31
:param runner_class: An IRunner
32
:return: A runner instance.
35
class EmptyingRunner(runner_class):
36
"""Stop processing when the queue is empty."""
38
def _doperiodic(self):
39
"""Stop when the queue is empty."""
40
self._stop = (len(self._switchboard.files) == 0)
42
return EmptyingRunner()
47
def __init__(self, **kws):
48
for key, value in kws.items():
49
setattr(self, key, value)
52
def get_queue_messages(queue):
53
"""Return and clear all the messages in the given queue.
55
:param queue: An ISwitchboard
56
:return: A list of 2-tuples where each item contains the message and
60
for filebase in queue.files:
61
msg, msgdata = queue.dequeue(filebase)
62
messages.append(_Bag(msg=msg, msgdata=msgdata))
63
queue.finish(filebase)