~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/test/test_timeoutqueue.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Test cases for timeoutqueue module.
 
7
"""
 
8
 
 
9
import time
 
10
 
 
11
from twisted.trial import unittest
 
12
from twisted.python import timeoutqueue
 
13
from twisted.internet import reactor, interfaces
 
14
 
 
15
class TimeoutQueueTest(unittest.TestCase):
 
16
    
 
17
    def setUp(self):
 
18
        self.q = timeoutqueue.TimeoutQueue()
 
19
    
 
20
    def tearDown(self):
 
21
        del self.q
 
22
    
 
23
    def put(self):
 
24
        time.sleep(1)
 
25
        self.q.put(1)
 
26
        
 
27
    def testTimeout(self):
 
28
        q = self.q
 
29
 
 
30
        try:
 
31
            q.wait(1)
 
32
        except timeoutqueue.TimedOut:
 
33
            pass
 
34
        else:
 
35
            raise AssertionError, "didn't time out"
 
36
 
 
37
    def testGet(self):
 
38
        q = self.q
 
39
        start = time.time()
 
40
        threading.Thread(target=self.put).start()
 
41
        q.wait(1.5)
 
42
        assert time.time() - start < 2
 
43
 
 
44
        result = q.get(0)
 
45
        if result != 1:
 
46
            raise AssertionError, "didn't get item we put in"
 
47
 
 
48
    if interfaces.IReactorThreads(reactor, None) is None:
 
49
        testGet.skip = "No thread support, no way to test putting during a blocked get"
 
50
    else:
 
51
        global threading
 
52
        import threading