1
import socket as _orig_sock
2
from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows
3
from eventlet import event
4
from eventlet import greenio
5
from eventlet import debug
6
from eventlet.support import get_errno
7
from eventlet.green import socket
8
from eventlet.green import time
15
import tempfile, shutil
17
def bufsized(sock, size=1):
18
""" Resize both send and receive buffers on a socket.
19
Useful for testing trampoline. Returns the socket.
22
>>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
24
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
25
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
29
"""Return the minimum buffer size that the platform supports."""
30
test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
31
test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
32
return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
35
def using_epoll_hub(_f):
36
from eventlet.hubs import get_hub
38
return 'epolls' in type(get_hub()).__module__
43
class TestGreenSocket(LimitedTestCase):
44
def assertWriteToClosedFileRaises(self, fd):
45
if sys.version_info[0]<3:
46
# 2.x socket._fileobjects are odd: writes don't check
47
# whether the socket is closed or not, and you get an
48
# AttributeError during flush if it is closed
50
self.assertRaises(Exception, fd.flush)
52
# 3.x io write to closed file-like pbject raises ValueError
53
self.assertRaises(ValueError, fd.write, 'a')
55
def test_connect_timeout(self):
56
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
58
gs = greenio.GreenSocket(s)
60
gs.connect(('192.0.2.1', 80))
61
self.fail("socket.timeout not raised")
62
except socket.timeout, e:
63
self.assert_(hasattr(e, 'args'))
64
self.assertEqual(e.args[0], 'timed out')
65
except socket.error, e:
66
# unreachable is also a valid outcome
67
if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
70
def test_accept_timeout(self):
71
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76
gs = greenio.GreenSocket(s)
79
self.fail("socket.timeout not raised")
80
except socket.timeout, e:
81
self.assert_(hasattr(e, 'args'))
82
self.assertEqual(e.args[0], 'timed out')
84
def test_connect_ex_timeout(self):
85
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87
gs = greenio.GreenSocket(s)
88
e = gs.connect_ex(('192.0.2.1', 80))
89
if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH):
90
self.assertEquals(e, errno.EAGAIN)
92
def test_recv_timeout(self):
93
listener = greenio.GreenSocket(socket.socket())
94
listener.bind(('', 0))
99
# accept the connection in another greenlet
100
sock, addr = listener.accept()
103
gt = eventlet.spawn(server)
105
addr = listener.getsockname()
107
client = greenio.GreenSocket(socket.socket())
108
client.settimeout(0.1)
114
self.fail("socket.timeout not raised")
115
except socket.timeout, e:
116
self.assert_(hasattr(e, 'args'))
117
self.assertEqual(e.args[0], 'timed out')
122
def test_recvfrom_timeout(self):
123
gs = greenio.GreenSocket(
124
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
130
self.fail("socket.timeout not raised")
131
except socket.timeout, e:
132
self.assert_(hasattr(e, 'args'))
133
self.assertEqual(e.args[0], 'timed out')
135
def test_recvfrom_into_timeout(self):
136
buf = buffer(array.array('B'))
138
gs = greenio.GreenSocket(
139
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
144
gs.recvfrom_into(buf)
145
self.fail("socket.timeout not raised")
146
except socket.timeout, e:
147
self.assert_(hasattr(e, 'args'))
148
self.assertEqual(e.args[0], 'timed out')
150
def test_recv_into_timeout(self):
151
buf = buffer(array.array('B'))
153
listener = greenio.GreenSocket(socket.socket())
154
listener.bind(('', 0))
159
# accept the connection in another greenlet
160
sock, addr = listener.accept()
163
gt = eventlet.spawn(server)
165
addr = listener.getsockname()
167
client = greenio.GreenSocket(socket.socket())
168
client.settimeout(0.1)
173
client.recv_into(buf)
174
self.fail("socket.timeout not raised")
175
except socket.timeout, e:
176
self.assert_(hasattr(e, 'args'))
177
self.assertEqual(e.args[0], 'timed out')
182
def test_send_timeout(self):
183
listener = bufsized(eventlet.listen(('', 0)))
187
# accept the connection in another greenlet
188
sock, addr = listener.accept()
189
sock = bufsized(sock)
192
gt = eventlet.spawn(server)
194
addr = listener.getsockname()
196
client = bufsized(greenio.GreenSocket(socket.socket()))
199
client.settimeout(0.00001)
200
msg = s2b("A")*(100000) # large enough number to overwhelm most buffers
203
# want to exceed the size of the OS buffer so it'll block in a
206
total_sent += client.send(msg)
207
self.fail("socket.timeout not raised")
208
except socket.timeout, e:
209
self.assert_(hasattr(e, 'args'))
210
self.assertEqual(e.args[0], 'timed out')
215
def test_sendall_timeout(self):
216
listener = greenio.GreenSocket(socket.socket())
217
listener.bind(('', 0))
222
# accept the connection in another greenlet
223
sock, addr = listener.accept()
226
gt = eventlet.spawn(server)
228
addr = listener.getsockname()
230
client = greenio.GreenSocket(socket.socket())
231
client.settimeout(0.1)
235
msg = s2b("A")*(8*1024*1024)
237
# want to exceed the size of the OS buffer so it'll block
239
self.fail("socket.timeout not raised")
240
except socket.timeout, e:
241
self.assert_(hasattr(e, 'args'))
242
self.assertEqual(e.args[0], 'timed out')
247
def test_close_with_makefile(self):
248
def accept_close_early(listener):
249
# verify that the makefile and the socket are truly independent
250
# by closing the socket prior to using the made file
252
conn, addr = listener.accept()
253
fd = conn.makefile('w')
257
self.assertWriteToClosedFileRaises(fd)
258
self.assertRaises(socket.error, conn.send, s2b('b'))
262
def accept_close_late(listener):
263
# verify that the makefile and the socket are truly independent
264
# by closing the made file and then sending a character
266
conn, addr = listener.accept()
267
fd = conn.makefile('w')
272
self.assertWriteToClosedFileRaises(fd)
273
self.assertRaises(socket.error, conn.send, s2b('b'))
277
def did_it_work(server):
278
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
279
client.connect(('127.0.0.1', server.getsockname()[1]))
280
fd = client.makefile()
282
assert fd.readline() == 'hello\n'
283
assert fd.read() == ''
286
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
287
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
288
server.bind(('0.0.0.0', 0))
290
killer = eventlet.spawn(accept_close_early, server)
294
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
295
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
296
server.bind(('0.0.0.0', 0))
298
killer = eventlet.spawn(accept_close_late, server)
302
def test_del_closes_socket(self):
303
def accept_once(listener):
304
# delete/overwrite the original conn
305
# object, only keeping the file object around
306
# closing the file object should close everything
308
conn, addr = listener.accept()
309
conn = conn.makefile('w')
310
conn.write('hello\n')
312
self.assertWriteToClosedFileRaises(conn)
315
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
317
server.bind(('127.0.0.1', 0))
319
killer = eventlet.spawn(accept_once, server)
320
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
321
client.connect(('127.0.0.1', server.getsockname()[1]))
322
fd = client.makefile()
324
assert fd.read() == 'hello\n'
325
assert fd.read() == ''
329
def test_full_duplex(self):
330
large_data = s2b('*') * 10 * min_buf_size()
331
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
332
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
333
listener.bind(('127.0.0.1', 0))
337
def send_large(sock):
338
sock.sendall(large_data)
340
def read_large(sock):
341
result = sock.recv(len(large_data))
342
while len(result) < len(large_data):
343
result += sock.recv(len(large_data))
344
self.assertEquals(result, large_data)
347
(sock, addr) = listener.accept()
348
sock = bufsized(sock)
349
send_large_coro = eventlet.spawn(send_large, sock)
351
result = sock.recv(10)
352
expected = s2b('hello world')
353
while len(result) < len(expected):
354
result += sock.recv(10)
355
self.assertEquals(result, expected)
356
send_large_coro.wait()
358
server_evt = eventlet.spawn(server)
359
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
360
client.connect(('127.0.0.1', listener.getsockname()[1]))
362
large_evt = eventlet.spawn(read_large, client)
364
client.sendall(s2b('hello world'))
369
def test_sendall(self):
370
# test adapted from Marcus Cavanaugh's email
371
# it may legitimately take a while, but will eventually complete
374
def test_sendall_impl(many_bytes):
375
bufsize = max(many_bytes//15, 2)
376
def sender(listener):
377
(sock, addr) = listener.accept()
378
sock = bufsized(sock, size=bufsize)
379
sock.sendall(s2b('x')*many_bytes)
380
sock.sendall(s2b('y')*second_bytes)
382
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
383
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
384
listener.bind(("", 0))
386
sender_coro = eventlet.spawn(sender, listener)
387
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
388
client.connect(('127.0.0.1', listener.getsockname()[1]))
389
bufsized(client, size=bufsize)
391
while total < many_bytes:
392
data = client.recv(min(many_bytes - total, many_bytes//10))
398
while total < second_bytes:
399
data = client.recv(second_bytes)
407
for how_many in (1000, 10000, 100000, 1000000):
408
test_sendall_impl(how_many)
410
def test_wrap_socket(self):
416
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
417
sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
418
sock.bind(('127.0.0.1', 0))
420
ssl_sock = ssl.wrap_socket(sock)
422
def test_timeout_and_final_write(self):
423
# This test verifies that a write on a socket that we've
424
# stopped listening for doesn't result in an incorrect switch
425
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
426
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
427
server.bind(('127.0.0.1', 0))
429
bound_port = server.getsockname()[1]
432
s2, addr = server.accept()
433
wrap_wfile = s2.makefile('w')
436
wrap_wfile.write('hi')
438
evt.send('sent via event')
440
from eventlet import event
442
eventlet.spawn(sender, evt)
443
eventlet.sleep(0) # lets the socket enter accept mode, which
444
# is necessary for connect to succeed on windows
446
# try and get some data off of this pipe
447
# but bail before any is sent
448
eventlet.Timeout(0.01)
449
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
450
client.connect(('127.0.0.1', bound_port))
451
wrap_rfile = client.makefile()
452
_c = wrap_rfile.read(1)
454
except eventlet.TimeoutError:
458
self.assertEquals(result, 'sent via event')
463
def test_raised_multiple_readers(self):
464
debug.hub_prevent_multiple_readers(True)
466
def handle(sock, addr):
469
raise eventlet.StopServe()
470
listener = eventlet.listen(('127.0.0.1', 0))
471
server = eventlet.spawn(eventlet.serve,
477
s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
478
a = eventlet.spawn(reader, s)
480
self.assertRaises(RuntimeError, s.recv, 1)
485
@skip_if(using_epoll_hub)
486
def test_closure(self):
487
def spam_to_me(address):
488
sock = eventlet.connect(address)
491
sock.sendall('hello world')
492
except socket.error, e:
493
if get_errno(e)== errno.EPIPE:
497
server = eventlet.listen(('127.0.0.1', 0))
498
sender = eventlet.spawn(spam_to_me, server.getsockname())
499
client, address = server.accept()
505
data = client.recv(1024)
507
except socket.error, e:
508
# we get an EBADF because client is closed in the same process
509
# (but a different greenthread)
510
if get_errno(e) != errno.EBADF:
516
reader = eventlet.spawn(reader)
517
eventlet.spawn_n(closer)
521
def test_invalid_connection(self):
522
# find an unused port by creating a socket then closing it
523
port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1]
524
self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
526
class TestGreenPipe(LimitedTestCase):
529
super(self.__class__, self).setUp()
530
self.tempdir = tempfile.mkdtemp('_green_pipe_test')
533
shutil.rmtree(self.tempdir)
534
super(self.__class__, self).tearDown()
538
rf = greenio.GreenPipe(r, 'r');
539
wf = greenio.GreenPipe(w, 'w', 0);
540
def sender(f, content):
542
eventlet.sleep(0.0001)
546
one_line = "12345\n";
547
eventlet.spawn(sender, wf, one_line*5)
551
self.assertEquals(line, one_line)
552
self.assertEquals(rf.readline(), '')
554
def test_pipe_read(self):
555
# ensure that 'readline' works properly on GreenPipes when data is not
556
# immediately available (fd is nonblocking, was raising EAGAIN)
557
# also ensures that readline() terminates on '\n' and '\r\n'
560
r = greenio.GreenPipe(r)
561
w = greenio.GreenPipe(w, 'w')
572
gt = eventlet.spawn(writer)
577
self.assertEquals(line, 'line\n')
580
self.assertEquals(line, 'line\r\n')
584
def test_pipe_writes_large_messages(self):
587
r = greenio.GreenPipe(r)
588
w = greenio.GreenPipe(w, 'w')
590
large_message = "".join([1024*chr(i) for i in xrange(65)])
592
w.write(large_message)
595
gt = eventlet.spawn(writer)
599
expected = 1024*chr(i)
600
self.assertEquals(buf, expected,
601
"expected=%r..%r, found=%r..%r iter=%d"
602
% (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
605
def test_seek_on_buffered_pipe(self):
606
f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
607
self.assertEquals(f.tell(),0)
609
self.assertEquals(f.tell(),0)
610
f.write('1234567890')
612
self.assertEquals(f.tell(),10)
615
self.assertEqual(value, '1')
616
self.assertEquals(f.tell(),1)
618
self.assertEqual(value, '2')
619
self.assertEquals(f.tell(),2)
621
self.assertEqual(f.readline(), '34567890')
623
self.assertEqual(f.readline(), '1234567890')
625
self.assertEqual(f.readline(), '')
627
def test_truncate(self):
628
f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
629
f.write('1234567890')
631
self.assertEquals(f.tell(), 9)
634
class TestGreenIoLong(LimitedTestCase):
635
TEST_TIMEOUT=10 # the test here might take a while depending on the OS
637
def test_multiple_readers(self, clibufsize=False):
638
debug.hub_prevent_multiple_readers(False)
639
recvsize = 2 * min_buf_size()
640
sendsize = 10 * recvsize
641
# test that we can have multiple coroutines reading
642
# from the same fd. We make no guarantees about which one gets which
643
# bytes, but they should both get at least some
644
def reader(sock, results):
646
data = sock.recv(recvsize)
653
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
654
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
655
listener.bind(('127.0.0.1', 0))
658
(sock, addr) = listener.accept()
659
sock = bufsized(sock)
661
c1 = eventlet.spawn(reader, sock, results1)
662
c2 = eventlet.spawn(reader, sock, results2)
672
server_coro = eventlet.spawn(server)
673
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
674
client.connect(('127.0.0.1', listener.getsockname()[1]))
676
bufsized(client, size=sendsize)
679
client.sendall(s2b('*') * sendsize)
683
self.assert_(len(results1) > 0)
684
self.assert_(len(results2) > 0)
685
debug.hub_prevent_multiple_readers()
687
@skipped # by rdw because it fails but it's not clear how to make it pass
689
def test_multiple_readers2(self):
690
self.test_multiple_readers(clibufsize=True)
692
class TestGreenIoStarvation(LimitedTestCase):
693
# fixme: this doesn't succeed, because of eventlet's predetermined
694
# ordering. two processes, one with server, one with client eventlets
695
# might be more reliable?
697
TEST_TIMEOUT=300 # the test here might take a while depending on the OS
698
@skipped # by rdw, because it fails but it's not clear how to make it pass
700
def test_server_starvation(self, sendloops=15):
701
recvsize = 2 * min_buf_size()
702
sendsize = 10000 * recvsize
704
results = [[] for i in xrange(5)]
706
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
707
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
708
listener.bind(('127.0.0.1', 0))
709
port = listener.getsockname()[1]
712
base_time = time.time()
714
def server(my_results):
715
(sock, addr) = listener.accept()
723
data = sock.recv(recvsize)
725
t1 = time.time() - base_time
727
t2 = time.time() - base_time
728
my_results.append(datasize)
729
my_results.append((t1,t2))
731
datasize += len(data)
740
client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
741
client.connect(('127.0.0.1', port))
743
bufsized(client, size=sendsize)
745
for i in range(sendloops):
746
client.sendall(s2b('*') * sendsize)
753
servers.append(eventlet.spawn(server, r))
755
clients.append(client())
764
# now test that all of the server receive intervals overlap, and
765
# that there were no errors.
767
assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
768
assert r[0] == sendsize * sendloops
769
assert len(r[1]) == 2
770
assert r[1][0] is not None
771
assert r[1][1] is not None
773
starttimes = sorted(r[1][0] for r in results)
774
endtimes = sorted(r[1][1] for r in results)
775
runlengths = sorted(r[1][1] - r[1][0] for r in results)
777
# assert that the last task started before the first task ended
778
# (our no-starvation condition)
779
assert starttimes[-1] < endtimes[0], "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
781
maxstartdiff = starttimes[-1] - starttimes[0]
783
assert maxstartdiff * 2 < runlengths[0], "Largest difference in starting times more than twice the shortest running time!"
784
assert runlengths[0] * 2 > runlengths[-1], "Longest runtime more than twice as long as shortest!"
786
if __name__ == '__main__':