3
from pypy.conftest import gettestobjspace, option
4
from pypy.interpreter.gateway import ObjSpace, W_Root, interp2app_temp
7
def waitfor(space, w_condition, timeout=300.0):
8
w_sleep = space.appexec([], "():\n import time; return time.sleep")
10
limit = time.time() + timeout
11
while time.time() <= limit:
12
space.call_function(w_sleep, space.wrap(adaptivedelay))
14
if space.is_true(space.call_function(w_condition)):
17
print '*** timed out ***'
18
waitfor.unwrap_spec = [ObjSpace, W_Root, float]
21
class GenericTestThread:
24
space = gettestobjspace(usemodules=('thread', 'time'))
27
if option.runappdirect:
28
def plain_waitfor(condition, timeout=300.0):
30
limit = time.time() + timeout
31
while time.time() <= limit:
32
time.sleep(adaptivedelay)
37
print '*** timed out ***'
39
cls.w_waitfor = plain_waitfor
41
cls.w_waitfor = space.wrap(interp2app_temp(waitfor))
42
cls.w_busywait = space.appexec([], """():
47
## cls.w_waitfor = space.appexec([], """():
49
## def waitfor(expr, timeout=10.0):
50
## limit = time.time() + timeout
51
## while time.time() <= limit:
55
## print '*** timed out ***'
58
## cls.w_busywait = space.appexec([], """():
61
## limit = time.time() + t
62
## while time.time() <= limit:
67
## space.appexec([], """():
69
## sys.setcheckinterval(1)