~ubuntu-branches/ubuntu/karmic/pypy/karmic

« back to all changes in this revision

Viewing changes to pypy/module/thread/test/support.py

  • Committer: Bazaar Package Importer
  • Author(s): Alexandre Fayolle
  • Date: 2007-04-13 09:33:09 UTC
  • Revision ID: james.westby@ubuntu.com-20070413093309-yoojh4jcoocu2krz
Tags: upstream-1.0.0
ImportĀ upstreamĀ versionĀ 1.0.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import py
 
2
import time, gc
 
3
from pypy.conftest import gettestobjspace, option
 
4
from pypy.interpreter.gateway import ObjSpace, W_Root, interp2app_temp
 
5
 
 
6
 
 
7
def waitfor(space, w_condition, timeout=300.0):
 
8
    w_sleep = space.appexec([], "():\n import time; return time.sleep")
 
9
    adaptivedelay = 0.04
 
10
    limit = time.time() + timeout
 
11
    while time.time() <= limit:
 
12
        space.call_function(w_sleep, space.wrap(adaptivedelay))
 
13
        gc.collect()
 
14
        if space.is_true(space.call_function(w_condition)):
 
15
            return
 
16
        adaptivedelay *= 1.05
 
17
    print '*** timed out ***'
 
18
waitfor.unwrap_spec = [ObjSpace, W_Root, float]
 
19
 
 
20
 
 
21
class GenericTestThread:
 
22
 
 
23
    def setup_class(cls):
 
24
        space = gettestobjspace(usemodules=('thread', 'time'))
 
25
        cls.space = space
 
26
 
 
27
        if option.runappdirect:
 
28
            def plain_waitfor(condition, timeout=300.0):
 
29
                adaptivedelay = 0.04
 
30
                limit = time.time() + timeout
 
31
                while time.time() <= limit:
 
32
                    time.sleep(adaptivedelay)
 
33
                    gc.collect()
 
34
                    if condition():
 
35
                        return
 
36
                    adaptivedelay *= 1.05
 
37
                print '*** timed out ***'
 
38
                
 
39
            cls.w_waitfor = plain_waitfor
 
40
        else:
 
41
            cls.w_waitfor = space.wrap(interp2app_temp(waitfor))
 
42
        cls.w_busywait = space.appexec([], """():
 
43
            import time
 
44
            return time.sleep
 
45
        """)
 
46
 
 
47
##        cls.w_waitfor = space.appexec([], """():
 
48
##            import time
 
49
##            def waitfor(expr, timeout=10.0):
 
50
##                limit = time.time() + timeout
 
51
##                while time.time() <= limit:
 
52
##                    time.sleep(0.002)
 
53
##                    if expr():
 
54
##                        return
 
55
##                print '*** timed out ***'
 
56
##            return waitfor
 
57
##        """)
 
58
##        cls.w_busywait = space.appexec([], """():
 
59
##            import time
 
60
##            def busywait(t):
 
61
##                limit = time.time() + t
 
62
##                while time.time() <= limit:
 
63
##                    time.sleep(0.002)
 
64
##            return busywait
 
65
##        """)
 
66
 
 
67
##        space.appexec([], """():
 
68
##            import sys
 
69
##            sys.setcheckinterval(1)
 
70
##        """)