2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Test cases for timeoutqueue module.
11
from twisted.trial import unittest
12
from twisted.python import timeoutqueue
13
from twisted.internet import reactor, interfaces
15
class TimeoutQueueTest(unittest.TestCase):
18
self.q = timeoutqueue.TimeoutQueue()
27
def testTimeout(self):
32
except timeoutqueue.TimedOut:
35
raise AssertionError, "didn't time out"
40
threading.Thread(target=self.put).start()
42
assert time.time() - start < 2
46
raise AssertionError, "didn't get item we put in"
48
if interfaces.IReactorThreads(reactor, None) is None:
49
testGet.skip = "No thread support, no way to test putting during a blocked get"