7
from tests import LimitedTestCase, main, skip_with_pyevent
9
base_module_contents = """
12
print "base", socket, urllib
15
patching_module_contents = """
16
from eventlet.green import socket
17
from eventlet.green import urllib
18
from eventlet import patcher
19
print 'patcher', socket, urllib
20
patcher.inject('base', globals(), ('socket', socket), ('urllib', urllib))
24
import_module_contents = """
27
print "importing", patching, socket, patching.socket, patching.urllib
30
class ProcessBase(LimitedTestCase):
31
TEST_TIMEOUT=3 # starting processes is time-consuming
33
self._saved_syspath = sys.path
34
self.tempdir = tempfile.mkdtemp('_patcher_test')
37
sys.path = self._saved_syspath
38
shutil.rmtree(self.tempdir)
40
def write_to_tempfile(self, name, contents):
41
filename = os.path.join(self.tempdir, name + '.py')
42
fd = open(filename, "w")
46
def launch_subprocess(self, filename):
47
python_path = os.pathsep.join(sys.path + [self.tempdir])
48
new_env = os.environ.copy()
49
new_env['PYTHONPATH'] = python_path
50
if not filename.endswith('.py'):
51
filename = filename + '.py'
52
p = subprocess.Popen([sys.executable,
53
os.path.join(self.tempdir, filename)],
54
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
55
output, _ = p.communicate()
56
lines = output.split("\n")
59
def run_script(self, contents, modname=None):
62
self.write_to_tempfile(modname, contents)
63
return self.launch_subprocess(modname)
66
class ImportPatched(ProcessBase):
67
def test_patch_a_module(self):
68
self.write_to_tempfile("base", base_module_contents)
69
self.write_to_tempfile("patching", patching_module_contents)
70
self.write_to_tempfile("importing", import_module_contents)
71
output, lines = self.launch_subprocess('importing.py')
72
self.assert_(lines[0].startswith('patcher'), repr(output))
73
self.assert_(lines[1].startswith('base'), repr(output))
74
self.assert_(lines[2].startswith('importing'), repr(output))
75
self.assert_('eventlet.green.socket' in lines[1], repr(output))
76
self.assert_('eventlet.green.urllib' in lines[1], repr(output))
77
self.assert_('eventlet.green.socket' in lines[2], repr(output))
78
self.assert_('eventlet.green.urllib' in lines[2], repr(output))
79
self.assert_('eventlet.green.httplib' not in lines[2], repr(output))
81
def test_import_patched_defaults(self):
82
self.write_to_tempfile("base", base_module_contents)
84
from eventlet import patcher
85
base = patcher.import_patched('base')
86
print "newmod", base, base.socket, base.urllib.socket.socket
88
self.write_to_tempfile("newmod", new_mod)
89
output, lines = self.launch_subprocess('newmod.py')
90
self.assert_(lines[0].startswith('base'), repr(output))
91
self.assert_(lines[1].startswith('newmod'), repr(output))
92
self.assert_('eventlet.green.socket' in lines[1], repr(output))
93
self.assert_('GreenSocket' in lines[1], repr(output))
96
class MonkeyPatch(ProcessBase):
97
def test_patched_modules(self):
99
from eventlet import patcher
100
patcher.monkey_patch()
103
print "newmod", socket.socket, urllib.socket.socket
105
self.write_to_tempfile("newmod", new_mod)
106
output, lines = self.launch_subprocess('newmod.py')
107
self.assert_(lines[0].startswith('newmod'), repr(output))
108
self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output))
110
def test_early_patching(self):
112
from eventlet import patcher
113
patcher.monkey_patch()
118
self.write_to_tempfile("newmod", new_mod)
119
output, lines = self.launch_subprocess('newmod.py')
120
self.assertEqual(len(lines), 2, repr(output))
121
self.assert_(lines[0].startswith('newmod'), repr(output))
123
def test_late_patching(self):
127
from eventlet import patcher
128
patcher.monkey_patch()
132
self.write_to_tempfile("newmod", new_mod)
133
output, lines = self.launch_subprocess('newmod.py')
134
self.assertEqual(len(lines), 2, repr(output))
135
self.assert_(lines[0].startswith('newmod'), repr(output))
138
def test_typeerror(self):
140
from eventlet import patcher
141
patcher.monkey_patch(finagle=True)
143
self.write_to_tempfile("newmod", new_mod)
144
output, lines = self.launch_subprocess('newmod.py')
145
self.assert_(lines[-2].startswith('TypeError'), repr(output))
146
self.assert_('finagle' in lines[-2], repr(output))
149
def assert_boolean_logic(self, call, expected, not_expected=''):
150
expected_list = ", ".join(['"%s"' % x for x in expected.split(',') if len(x)])
151
not_expected_list = ", ".join(['"%s"' % x for x in not_expected.split(',') if len(x)])
153
from eventlet import patcher
156
assert patcher.is_monkey_patched(mod), mod
158
assert not patcher.is_monkey_patched(mod), mod
159
print "already_patched", ",".join(sorted(patcher.already_patched.keys()))
160
""" % (call, expected_list, not_expected_list)
161
self.write_to_tempfile("newmod", new_mod)
162
output, lines = self.launch_subprocess('newmod.py')
163
ap = 'already_patched'
164
self.assert_(lines[0].startswith(ap), repr(output))
165
patched_modules = lines[0][len(ap):].strip()
166
# psycopg might or might not be patched based on installed modules
167
patched_modules = patched_modules.replace("psycopg,", "")
169
patched_modules = patched_modules.replace("MySQLdb,", "")
170
self.assertEqual(patched_modules, expected,
171
"Logic:%s\nExpected: %s != %s" %(call, expected,
174
def test_boolean(self):
175
self.assert_boolean_logic("patcher.monkey_patch()",
176
'os,select,socket,thread,time')
178
def test_boolean_all(self):
179
self.assert_boolean_logic("patcher.monkey_patch(all=True)",
180
'os,select,socket,thread,time')
182
def test_boolean_all_single(self):
183
self.assert_boolean_logic("patcher.monkey_patch(all=True, socket=True)",
184
'os,select,socket,thread,time')
186
def test_boolean_all_negative(self):
187
self.assert_boolean_logic("patcher.monkey_patch(all=False, "\
188
"socket=False, select=True)",
191
def test_boolean_single(self):
192
self.assert_boolean_logic("patcher.monkey_patch(socket=True)",
195
def test_boolean_double(self):
196
self.assert_boolean_logic("patcher.monkey_patch(socket=True,"\
200
def test_boolean_negative(self):
201
self.assert_boolean_logic("patcher.monkey_patch(socket=False)",
202
'os,select,thread,time')
204
def test_boolean_negative2(self):
205
self.assert_boolean_logic("patcher.monkey_patch(socket=False,"\
209
def test_conflicting_specifications(self):
210
self.assert_boolean_logic("patcher.monkey_patch(socket=False, "\
215
test_monkey_patch_threading = """
216
def test_monkey_patch_threading():
219
for i in xrange(1000):
224
tpool.execute(time.sleep, 0.5)
227
w1 = eventlet.spawn(do_sleep)
230
assert tickcount[0] > 900
234
class Tpool(ProcessBase):
238
def test_simple(self):
241
from eventlet import patcher
242
patcher.monkey_patch()
243
from eventlet import tpool
244
print "newmod", tpool.execute(len, "hi")
245
print "newmod", tpool.execute(len, "hi2")
248
self.write_to_tempfile("newmod", new_mod)
249
output, lines = self.launch_subprocess('newmod.py')
250
self.assertEqual(len(lines), 3, output)
251
self.assert_(lines[0].startswith('newmod'), repr(output))
252
self.assert_('2' in lines[0], repr(output))
253
self.assert_('3' in lines[1], repr(output))
256
def test_unpatched_thread(self):
257
new_mod = """import eventlet
258
eventlet.monkey_patch(time=False, thread=False)
259
from eventlet import tpool
262
new_mod += test_monkey_patch_threading
263
new_mod += "\ntest_monkey_patch_threading()\n"
264
self.write_to_tempfile("newmod", new_mod)
265
output, lines = self.launch_subprocess('newmod.py')
266
self.assertEqual(len(lines), 2, lines)
269
def test_patched_thread(self):
270
new_mod = """import eventlet
271
eventlet.monkey_patch(time=False, thread=True)
272
from eventlet import tpool
275
new_mod += test_monkey_patch_threading
276
new_mod += "\ntest_monkey_patch_threading()\n"
277
self.write_to_tempfile("newmod", new_mod)
278
output, lines = self.launch_subprocess('newmod.py')
279
self.assertEqual(len(lines), 2, "\n".join(lines))
282
class Subprocess(ProcessBase):
283
def test_monkeypatched_subprocess(self):
284
new_mod = """import eventlet
285
eventlet.monkey_patch()
286
from eventlet.green import subprocess
288
subprocess.Popen(['/bin/true'], stdin=subprocess.PIPE)
291
self.write_to_tempfile("newmod", new_mod)
292
output, lines = self.launch_subprocess('newmod')
293
self.assertEqual(output, "done\n", output)
296
if __name__ == '__main__':