2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Test cases for timeoutqueue module.
11
from twisted.python import timeoutqueue
12
from twisted.trial import unittest, util
13
from twisted.internet import reactor, interfaces
15
timeoutqueueSuppression = util.suppress(
16
message="timeoutqueue is deprecated since Twisted 8.0",
17
category=DeprecationWarning)
20
class TimeoutQueueTest(unittest.TestCase):
22
Test L{timeoutqueue.TimeoutQueue} class.
32
def test_timeout(self):
33
q = self.q = timeoutqueue.TimeoutQueue()
37
except timeoutqueue.TimedOut:
40
self.fail("Didn't time out")
41
test_timeout.suppress = [timeoutqueueSuppression]
44
q = self.q = timeoutqueue.TimeoutQueue()
47
threading.Thread(target=self.put).start()
49
assert time.time() - start < 2
53
self.fail("Didn't get item we put in")
54
test_get.suppress = [timeoutqueueSuppression]
56
def test_deprecation(self):
58
Test that L{timeoutqueue.TimeoutQueue} prints a warning message.
61
return timeoutqueue.TimeoutQueue()
62
self.q = self.assertWarns(
64
"timeoutqueue is deprecated since Twisted 8.0",
68
if interfaces.IReactorThreads(reactor, None) is None:
69
test_get.skip = "No thread support, no way to test putting during a blocked get"