~ubuntu-branches/ubuntu/trusty/python-eventlet/trusty-proposed

« back to all changes in this revision

Viewing changes to .pc/retry-on-timeout.patch/tests/greenio_test.py

  • Committer: Package Import Robot
  • Author(s): Dave Walker (Daviey)
  • Date: 2012-05-18 13:36:26 UTC
  • mfrom: (4.1.4 sid)
  • Revision ID: package-import@ubuntu.com-20120518133626-v2hvgj9cza44ub0r
Tags: 0.9.16-2ubuntu1
* Merge from Debian, remaining changes:
  - Drop python-zmq from build depends, it's currently in universe.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import socket as _orig_sock
2
 
from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows
3
 
from eventlet import event
4
 
from eventlet import greenio
5
 
from eventlet import debug
6
 
from eventlet.support import get_errno
7
 
from eventlet.green import socket
8
 
from eventlet.green import time
9
 
import errno
10
 
 
11
 
import eventlet
12
 
import os
13
 
import sys
14
 
import array
15
 
import tempfile, shutil
16
 
 
17
 
def bufsized(sock, size=1):
18
 
    """ Resize both send and receive buffers on a socket.
19
 
    Useful for testing trampoline.  Returns the socket.
20
 
 
21
 
    >>> import socket
22
 
    >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
23
 
    """
24
 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
25
 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
26
 
    return sock
27
 
 
28
 
def min_buf_size():
29
 
    """Return the minimum buffer size that the platform supports."""
30
 
    test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
31
 
    test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
32
 
    return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
33
 
 
34
 
 
35
 
def using_epoll_hub(_f):
36
 
        from eventlet.hubs import get_hub
37
 
        try:
38
 
            return 'epolls' in type(get_hub()).__module__
39
 
        except Exception:
40
 
            return False
41
 
 
42
 
 
43
 
class TestGreenSocket(LimitedTestCase):
44
 
    def assertWriteToClosedFileRaises(self, fd):
45
 
        if sys.version_info[0]<3:
46
 
            # 2.x socket._fileobjects are odd: writes don't check
47
 
            # whether the socket is closed or not, and you get an
48
 
            # AttributeError during flush if it is closed
49
 
            fd.write('a')
50
 
            self.assertRaises(Exception, fd.flush)
51
 
        else:
52
 
            # 3.x io write to closed file-like pbject raises ValueError
53
 
            self.assertRaises(ValueError, fd.write, 'a')
54
 
 
55
 
    def test_connect_timeout(self):
56
 
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57
 
        s.settimeout(0.1)
58
 
        gs = greenio.GreenSocket(s)
59
 
        try:
60
 
            gs.connect(('192.0.2.1', 80))
61
 
            self.fail("socket.timeout not raised")
62
 
        except socket.timeout, e:
63
 
            self.assert_(hasattr(e, 'args'))
64
 
            self.assertEqual(e.args[0], 'timed out')
65
 
        except socket.error, e:
66
 
            # unreachable is also a valid outcome
67
 
            if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
68
 
                raise
69
 
 
70
 
    def test_accept_timeout(self):
71
 
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
72
 
        s.bind(('', 0))
73
 
        s.listen(50)
74
 
 
75
 
        s.settimeout(0.1)
76
 
        gs = greenio.GreenSocket(s)
77
 
        try:
78
 
            gs.accept()
79
 
            self.fail("socket.timeout not raised")
80
 
        except socket.timeout, e:
81
 
            self.assert_(hasattr(e, 'args'))
82
 
            self.assertEqual(e.args[0], 'timed out')
83
 
 
84
 
    def test_connect_ex_timeout(self):
85
 
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86
 
        s.settimeout(0.1)
87
 
        gs = greenio.GreenSocket(s)
88
 
        e = gs.connect_ex(('192.0.2.1', 80))
89
 
        if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH):
90
 
            self.assertEquals(e, errno.EAGAIN)
91
 
 
92
 
    def test_recv_timeout(self):
93
 
        listener = greenio.GreenSocket(socket.socket())
94
 
        listener.bind(('', 0))
95
 
        listener.listen(50)
96
 
        
97
 
        evt = event.Event()
98
 
        def server():
99
 
            # accept the connection in another greenlet
100
 
            sock, addr = listener.accept()
101
 
            evt.wait()
102
 
 
103
 
        gt = eventlet.spawn(server)
104
 
 
105
 
        addr = listener.getsockname()
106
 
 
107
 
        client = greenio.GreenSocket(socket.socket())
108
 
        client.settimeout(0.1)
109
 
 
110
 
        client.connect(addr)
111
 
 
112
 
        try:
113
 
            client.recv(8192)
114
 
            self.fail("socket.timeout not raised")
115
 
        except socket.timeout, e:
116
 
            self.assert_(hasattr(e, 'args'))
117
 
            self.assertEqual(e.args[0], 'timed out')
118
 
 
119
 
        evt.send()
120
 
        gt.wait()
121
 
 
122
 
    def test_recvfrom_timeout(self):
123
 
        gs = greenio.GreenSocket(
124
 
            socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
125
 
        gs.settimeout(.1)
126
 
        gs.bind(('', 0))
127
 
 
128
 
        try:
129
 
            gs.recvfrom(8192)
130
 
            self.fail("socket.timeout not raised")
131
 
        except socket.timeout, e:
132
 
            self.assert_(hasattr(e, 'args'))
133
 
            self.assertEqual(e.args[0], 'timed out')
134
 
 
135
 
    def test_recvfrom_into_timeout(self):
136
 
        buf = buffer(array.array('B'))
137
 
 
138
 
        gs = greenio.GreenSocket(
139
 
            socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
140
 
        gs.settimeout(.1)
141
 
        gs.bind(('', 0))
142
 
 
143
 
        try:
144
 
            gs.recvfrom_into(buf)
145
 
            self.fail("socket.timeout not raised")
146
 
        except socket.timeout, e:
147
 
            self.assert_(hasattr(e, 'args'))
148
 
            self.assertEqual(e.args[0], 'timed out')
149
 
 
150
 
    def test_recv_into_timeout(self):
151
 
        buf = buffer(array.array('B'))
152
 
 
153
 
        listener = greenio.GreenSocket(socket.socket())
154
 
        listener.bind(('', 0))
155
 
        listener.listen(50)
156
 
 
157
 
        evt = event.Event()
158
 
        def server():
159
 
            # accept the connection in another greenlet
160
 
            sock, addr = listener.accept()
161
 
            evt.wait()
162
 
 
163
 
        gt = eventlet.spawn(server)
164
 
 
165
 
        addr = listener.getsockname()
166
 
 
167
 
        client = greenio.GreenSocket(socket.socket())
168
 
        client.settimeout(0.1)
169
 
 
170
 
        client.connect(addr)
171
 
 
172
 
        try:
173
 
            client.recv_into(buf)
174
 
            self.fail("socket.timeout not raised")
175
 
        except socket.timeout, e:
176
 
            self.assert_(hasattr(e, 'args'))
177
 
            self.assertEqual(e.args[0], 'timed out')
178
 
 
179
 
        evt.send()
180
 
        gt.wait()
181
 
 
182
 
    def test_send_timeout(self):
183
 
        listener = bufsized(eventlet.listen(('', 0)))
184
 
 
185
 
        evt = event.Event()
186
 
        def server():
187
 
            # accept the connection in another greenlet
188
 
            sock, addr = listener.accept()
189
 
            sock = bufsized(sock)
190
 
            evt.wait()
191
 
 
192
 
        gt = eventlet.spawn(server)
193
 
 
194
 
        addr = listener.getsockname()
195
 
 
196
 
        client = bufsized(greenio.GreenSocket(socket.socket()))
197
 
        client.connect(addr)
198
 
        try:
199
 
            client.settimeout(0.00001)
200
 
            msg = s2b("A")*(100000)  # large enough number to overwhelm most buffers
201
 
 
202
 
            total_sent = 0
203
 
            # want to exceed the size of the OS buffer so it'll block in a
204
 
            # single send
205
 
            for x in range(10):
206
 
                total_sent += client.send(msg)
207
 
            self.fail("socket.timeout not raised")
208
 
        except socket.timeout, e:
209
 
            self.assert_(hasattr(e, 'args'))
210
 
            self.assertEqual(e.args[0], 'timed out')
211
 
 
212
 
        evt.send()
213
 
        gt.wait()
214
 
 
215
 
    def test_sendall_timeout(self):
216
 
        listener = greenio.GreenSocket(socket.socket())
217
 
        listener.bind(('', 0))
218
 
        listener.listen(50)
219
 
 
220
 
        evt = event.Event()
221
 
        def server():
222
 
            # accept the connection in another greenlet
223
 
            sock, addr = listener.accept()
224
 
            evt.wait()
225
 
 
226
 
        gt = eventlet.spawn(server)
227
 
 
228
 
        addr = listener.getsockname()
229
 
 
230
 
        client = greenio.GreenSocket(socket.socket())
231
 
        client.settimeout(0.1)
232
 
        client.connect(addr)
233
 
 
234
 
        try:
235
 
            msg = s2b("A")*(8*1024*1024)
236
 
 
237
 
            # want to exceed the size of the OS buffer so it'll block
238
 
            client.sendall(msg)
239
 
            self.fail("socket.timeout not raised")
240
 
        except socket.timeout, e:
241
 
            self.assert_(hasattr(e, 'args'))
242
 
            self.assertEqual(e.args[0], 'timed out')
243
 
 
244
 
        evt.send()
245
 
        gt.wait()
246
 
 
247
 
    def test_close_with_makefile(self):
248
 
        def accept_close_early(listener):
249
 
            # verify that the makefile and the socket are truly independent
250
 
            # by closing the socket prior to using the made file
251
 
            try:
252
 
                conn, addr = listener.accept()
253
 
                fd = conn.makefile('w')
254
 
                conn.close()
255
 
                fd.write('hello\n')
256
 
                fd.close()
257
 
                self.assertWriteToClosedFileRaises(fd)
258
 
                self.assertRaises(socket.error, conn.send, s2b('b'))
259
 
            finally:
260
 
                listener.close()
261
 
 
262
 
        def accept_close_late(listener):
263
 
            # verify that the makefile and the socket are truly independent
264
 
            # by closing the made file and then sending a character
265
 
            try:
266
 
                conn, addr = listener.accept()
267
 
                fd = conn.makefile('w')
268
 
                fd.write('hello')
269
 
                fd.close()
270
 
                conn.send(s2b('\n'))
271
 
                conn.close()
272
 
                self.assertWriteToClosedFileRaises(fd)
273
 
                self.assertRaises(socket.error, conn.send, s2b('b'))
274
 
            finally:
275
 
                listener.close()
276
 
 
277
 
        def did_it_work(server):
278
 
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
279
 
            client.connect(('127.0.0.1', server.getsockname()[1]))
280
 
            fd = client.makefile()
281
 
            client.close()
282
 
            assert fd.readline() == 'hello\n'
283
 
            assert fd.read() == ''
284
 
            fd.close()
285
 
 
286
 
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
287
 
        server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
288
 
        server.bind(('0.0.0.0', 0))
289
 
        server.listen(50)
290
 
        killer = eventlet.spawn(accept_close_early, server)
291
 
        did_it_work(server)
292
 
        killer.wait()
293
 
 
294
 
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
295
 
        server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
296
 
        server.bind(('0.0.0.0', 0))
297
 
        server.listen(50)
298
 
        killer = eventlet.spawn(accept_close_late, server)
299
 
        did_it_work(server)
300
 
        killer.wait()
301
 
 
302
 
    def test_del_closes_socket(self):
303
 
        def accept_once(listener):
304
 
            # delete/overwrite the original conn
305
 
            # object, only keeping the file object around
306
 
            # closing the file object should close everything
307
 
            try:
308
 
                conn, addr = listener.accept()
309
 
                conn = conn.makefile('w')
310
 
                conn.write('hello\n')
311
 
                conn.close()
312
 
                self.assertWriteToClosedFileRaises(conn)
313
 
            finally:
314
 
                listener.close()
315
 
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316
 
        server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
317
 
        server.bind(('127.0.0.1', 0))
318
 
        server.listen(50)
319
 
        killer = eventlet.spawn(accept_once, server)
320
 
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
321
 
        client.connect(('127.0.0.1', server.getsockname()[1]))
322
 
        fd = client.makefile()
323
 
        client.close()
324
 
        assert fd.read() == 'hello\n'
325
 
        assert fd.read() == ''
326
 
 
327
 
        killer.wait()
328
 
 
329
 
    def test_full_duplex(self):
330
 
        large_data = s2b('*') * 10 * min_buf_size()
331
 
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
332
 
        listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
333
 
        listener.bind(('127.0.0.1', 0))
334
 
        listener.listen(50)
335
 
        bufsized(listener)
336
 
 
337
 
        def send_large(sock):
338
 
            sock.sendall(large_data)
339
 
 
340
 
        def read_large(sock):
341
 
            result = sock.recv(len(large_data))
342
 
            while len(result) < len(large_data):
343
 
                result += sock.recv(len(large_data))
344
 
            self.assertEquals(result, large_data)
345
 
 
346
 
        def server():
347
 
            (sock, addr) = listener.accept()
348
 
            sock = bufsized(sock)
349
 
            send_large_coro = eventlet.spawn(send_large, sock)
350
 
            eventlet.sleep(0)
351
 
            result = sock.recv(10)
352
 
            expected = s2b('hello world')
353
 
            while len(result) < len(expected):
354
 
                result += sock.recv(10)
355
 
            self.assertEquals(result, expected)
356
 
            send_large_coro.wait()
357
 
 
358
 
        server_evt = eventlet.spawn(server)
359
 
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
360
 
        client.connect(('127.0.0.1', listener.getsockname()[1]))
361
 
        bufsized(client)
362
 
        large_evt = eventlet.spawn(read_large, client)
363
 
        eventlet.sleep(0)
364
 
        client.sendall(s2b('hello world'))
365
 
        server_evt.wait()
366
 
        large_evt.wait()
367
 
        client.close()
368
 
 
369
 
    def test_sendall(self):
370
 
        # test adapted from Marcus Cavanaugh's email
371
 
        # it may legitimately take a while, but will eventually complete
372
 
        self.timer.cancel()
373
 
        second_bytes = 10
374
 
        def test_sendall_impl(many_bytes):
375
 
            bufsize = max(many_bytes//15, 2)
376
 
            def sender(listener):
377
 
                (sock, addr) = listener.accept()
378
 
                sock = bufsized(sock, size=bufsize)
379
 
                sock.sendall(s2b('x')*many_bytes)
380
 
                sock.sendall(s2b('y')*second_bytes)
381
 
 
382
 
            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
383
 
            listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
384
 
            listener.bind(("", 0))
385
 
            listener.listen(50)
386
 
            sender_coro = eventlet.spawn(sender, listener)
387
 
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
388
 
            client.connect(('127.0.0.1', listener.getsockname()[1]))
389
 
            bufsized(client, size=bufsize)
390
 
            total = 0
391
 
            while total < many_bytes:
392
 
                data = client.recv(min(many_bytes - total, many_bytes//10))
393
 
                if not data:
394
 
                    break
395
 
                total += len(data)
396
 
 
397
 
            total2 = 0
398
 
            while total < second_bytes:
399
 
                data = client.recv(second_bytes)
400
 
                if not data:
401
 
                    break
402
 
                total2 += len(data)
403
 
 
404
 
            sender_coro.wait()
405
 
            client.close()
406
 
 
407
 
        for how_many in (1000, 10000, 100000, 1000000):
408
 
            test_sendall_impl(how_many)
409
 
 
410
 
    def test_wrap_socket(self):
411
 
        try:
412
 
            import ssl
413
 
        except ImportError:
414
 
            pass  # pre-2.6
415
 
        else:
416
 
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
417
 
            sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
418
 
            sock.bind(('127.0.0.1', 0))
419
 
            sock.listen(50)
420
 
            ssl_sock = ssl.wrap_socket(sock)
421
 
 
422
 
    def test_timeout_and_final_write(self):
423
 
        # This test verifies that a write on a socket that we've
424
 
        # stopped listening for doesn't result in an incorrect switch
425
 
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
426
 
        server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
427
 
        server.bind(('127.0.0.1', 0))
428
 
        server.listen(50)
429
 
        bound_port = server.getsockname()[1]
430
 
 
431
 
        def sender(evt):
432
 
            s2, addr = server.accept()
433
 
            wrap_wfile = s2.makefile('w')
434
 
 
435
 
            eventlet.sleep(0.02)
436
 
            wrap_wfile.write('hi')
437
 
            s2.close()
438
 
            evt.send('sent via event')
439
 
 
440
 
        from eventlet import event
441
 
        evt = event.Event()
442
 
        eventlet.spawn(sender, evt)
443
 
        eventlet.sleep(0)  # lets the socket enter accept mode, which
444
 
                      # is necessary for connect to succeed on windows
445
 
        try:
446
 
            # try and get some data off of this pipe
447
 
            # but bail before any is sent
448
 
            eventlet.Timeout(0.01)
449
 
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
450
 
            client.connect(('127.0.0.1', bound_port))
451
 
            wrap_rfile = client.makefile()
452
 
            _c = wrap_rfile.read(1)
453
 
            self.fail()
454
 
        except eventlet.TimeoutError:
455
 
            pass
456
 
 
457
 
        result = evt.wait()
458
 
        self.assertEquals(result, 'sent via event')
459
 
        server.close()
460
 
        client.close()
461
 
 
462
 
    @skip_with_pyevent
463
 
    def test_raised_multiple_readers(self):
464
 
        debug.hub_prevent_multiple_readers(True)
465
 
 
466
 
        def handle(sock, addr):
467
 
            sock.recv(1)
468
 
            sock.sendall("a")
469
 
            raise eventlet.StopServe()
470
 
        listener = eventlet.listen(('127.0.0.1', 0))
471
 
        server = eventlet.spawn(eventlet.serve, 
472
 
                                listener,
473
 
                                handle)
474
 
        def reader(s):
475
 
            s.recv(1)
476
 
 
477
 
        s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
478
 
        a = eventlet.spawn(reader, s)
479
 
        eventlet.sleep(0)
480
 
        self.assertRaises(RuntimeError, s.recv, 1)
481
 
        s.sendall('b')
482
 
        a.wait()
483
 
 
484
 
    @skip_with_pyevent
485
 
    @skip_if(using_epoll_hub)
486
 
    def test_closure(self):
487
 
        def spam_to_me(address):
488
 
            sock = eventlet.connect(address)
489
 
            while True:
490
 
                try:
491
 
                    sock.sendall('hello world')
492
 
                except socket.error, e:
493
 
                    if get_errno(e)== errno.EPIPE:
494
 
                        return
495
 
                    raise
496
 
 
497
 
        server = eventlet.listen(('127.0.0.1', 0))
498
 
        sender = eventlet.spawn(spam_to_me, server.getsockname())
499
 
        client, address = server.accept()
500
 
        server.close()
501
 
 
502
 
        def reader():
503
 
            try:
504
 
                while True:
505
 
                    data = client.recv(1024)
506
 
                    self.assert_(data)
507
 
            except socket.error, e:
508
 
                # we get an EBADF because client is closed in the same process
509
 
                # (but a different greenthread)
510
 
                if get_errno(e) != errno.EBADF:
511
 
                    raise
512
 
 
513
 
        def closer():
514
 
            client.close()
515
 
 
516
 
        reader = eventlet.spawn(reader)
517
 
        eventlet.spawn_n(closer)
518
 
        reader.wait()
519
 
        sender.wait()
520
 
    
521
 
    def test_invalid_connection(self):
522
 
        # find an unused port by creating a socket then closing it
523
 
        port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1]
524
 
        self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
525
 
    
526
 
class TestGreenPipe(LimitedTestCase):
527
 
    @skip_on_windows
528
 
    def setUp(self):
529
 
        super(self.__class__, self).setUp()
530
 
        self.tempdir = tempfile.mkdtemp('_green_pipe_test')
531
 
 
532
 
    def tearDown(self):
533
 
        shutil.rmtree(self.tempdir)
534
 
        super(self.__class__, self).tearDown()
535
 
 
536
 
    def test_pipe(self):
537
 
        r,w = os.pipe()
538
 
        rf = greenio.GreenPipe(r, 'r');
539
 
        wf = greenio.GreenPipe(w, 'w', 0);
540
 
        def sender(f, content):
541
 
            for ch in content:
542
 
                eventlet.sleep(0.0001)
543
 
                f.write(ch)
544
 
            f.close()
545
 
 
546
 
        one_line = "12345\n";
547
 
        eventlet.spawn(sender, wf, one_line*5)
548
 
        for i in xrange(5):
549
 
            line = rf.readline()
550
 
            eventlet.sleep(0.01)
551
 
            self.assertEquals(line, one_line)
552
 
        self.assertEquals(rf.readline(), '')
553
 
 
554
 
    def test_pipe_read(self):
555
 
        # ensure that 'readline' works properly on GreenPipes when data is not
556
 
        # immediately available (fd is nonblocking, was raising EAGAIN)
557
 
        # also ensures that readline() terminates on '\n' and '\r\n'
558
 
        r, w = os.pipe()
559
 
 
560
 
        r = greenio.GreenPipe(r)
561
 
        w = greenio.GreenPipe(w, 'w')
562
 
 
563
 
        def writer():
564
 
            eventlet.sleep(.1)
565
 
 
566
 
            w.write('line\n')
567
 
            w.flush()
568
 
 
569
 
            w.write('line\r\n')
570
 
            w.flush()
571
 
 
572
 
        gt = eventlet.spawn(writer)
573
 
 
574
 
        eventlet.sleep(0)
575
 
 
576
 
        line = r.readline()
577
 
        self.assertEquals(line, 'line\n')
578
 
 
579
 
        line = r.readline()
580
 
        self.assertEquals(line, 'line\r\n')
581
 
 
582
 
        gt.wait()
583
 
 
584
 
    def test_pipe_writes_large_messages(self):
585
 
        r, w = os.pipe()
586
 
 
587
 
        r = greenio.GreenPipe(r)
588
 
        w = greenio.GreenPipe(w, 'w')
589
 
 
590
 
        large_message = "".join([1024*chr(i) for i in xrange(65)])
591
 
        def writer():
592
 
            w.write(large_message)
593
 
            w.close()
594
 
 
595
 
        gt = eventlet.spawn(writer)
596
 
 
597
 
        for i in xrange(65):
598
 
            buf = r.read(1024)
599
 
            expected = 1024*chr(i) 
600
 
            self.assertEquals(buf, expected, 
601
 
                "expected=%r..%r, found=%r..%r iter=%d" 
602
 
                % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
603
 
        gt.wait()
604
 
 
605
 
    def test_seek_on_buffered_pipe(self):
606
 
        f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
607
 
        self.assertEquals(f.tell(),0)
608
 
        f.seek(0,2)
609
 
        self.assertEquals(f.tell(),0)
610
 
        f.write('1234567890')
611
 
        f.seek(0,2)
612
 
        self.assertEquals(f.tell(),10)
613
 
        f.seek(0)
614
 
        value = f.read(1)
615
 
        self.assertEqual(value, '1')
616
 
        self.assertEquals(f.tell(),1)
617
 
        value = f.read(1)
618
 
        self.assertEqual(value, '2')
619
 
        self.assertEquals(f.tell(),2)
620
 
        f.seek(0, 1)
621
 
        self.assertEqual(f.readline(), '34567890')
622
 
        f.seek(0)
623
 
        self.assertEqual(f.readline(), '1234567890')
624
 
        f.seek(0, 2)
625
 
        self.assertEqual(f.readline(), '')
626
 
 
627
 
    def test_truncate(self):
628
 
        f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
629
 
        f.write('1234567890')
630
 
        f.truncate(9)
631
 
        self.assertEquals(f.tell(), 9)
632
 
 
633
 
 
634
 
class TestGreenIoLong(LimitedTestCase):
635
 
    TEST_TIMEOUT=10  # the test here might take a while depending on the OS
636
 
    @skip_with_pyevent
637
 
    def test_multiple_readers(self, clibufsize=False):
638
 
        debug.hub_prevent_multiple_readers(False)
639
 
        recvsize = 2 * min_buf_size()
640
 
        sendsize = 10 * recvsize
641
 
        # test that we can have multiple coroutines reading
642
 
        # from the same fd.  We make no guarantees about which one gets which
643
 
        # bytes, but they should both get at least some
644
 
        def reader(sock, results):
645
 
            while True:
646
 
                data = sock.recv(recvsize)
647
 
                if not data:
648
 
                    break
649
 
                results.append(data)
650
 
 
651
 
        results1 = []
652
 
        results2 = []
653
 
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
654
 
        listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
655
 
        listener.bind(('127.0.0.1', 0))
656
 
        listener.listen(50)
657
 
        def server():
658
 
            (sock, addr) = listener.accept()
659
 
            sock = bufsized(sock)
660
 
            try:
661
 
                c1 = eventlet.spawn(reader, sock, results1)
662
 
                c2 = eventlet.spawn(reader, sock, results2)
663
 
                try:
664
 
                    c1.wait()
665
 
                    c2.wait()
666
 
                finally:
667
 
                    c1.kill()
668
 
                    c2.kill()
669
 
            finally:
670
 
                sock.close()
671
 
 
672
 
        server_coro = eventlet.spawn(server)
673
 
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
674
 
        client.connect(('127.0.0.1', listener.getsockname()[1]))
675
 
        if clibufsize:
676
 
            bufsized(client, size=sendsize)
677
 
        else:
678
 
            bufsized(client)
679
 
        client.sendall(s2b('*') * sendsize)
680
 
        client.close()
681
 
        server_coro.wait()
682
 
        listener.close()
683
 
        self.assert_(len(results1) > 0)
684
 
        self.assert_(len(results2) > 0)
685
 
        debug.hub_prevent_multiple_readers()
686
 
 
687
 
    @skipped  # by rdw because it fails but it's not clear how to make it pass
688
 
    @skip_with_pyevent
689
 
    def test_multiple_readers2(self):
690
 
        self.test_multiple_readers(clibufsize=True)
691
 
 
692
 
class TestGreenIoStarvation(LimitedTestCase):    
693
 
    # fixme: this doesn't succeed, because of eventlet's predetermined
694
 
    # ordering.  two processes, one with server, one with client eventlets
695
 
    # might be more reliable?
696
 
    
697
 
    TEST_TIMEOUT=300  # the test here might take a while depending on the OS
698
 
    @skipped  # by rdw, because it fails but it's not clear how to make it pass
699
 
    @skip_with_pyevent
700
 
    def test_server_starvation(self, sendloops=15):
701
 
        recvsize = 2 * min_buf_size()
702
 
        sendsize = 10000 * recvsize
703
 
        
704
 
        results = [[] for i in xrange(5)]
705
 
        
706
 
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
707
 
        listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
708
 
        listener.bind(('127.0.0.1', 0))
709
 
        port = listener.getsockname()[1]
710
 
        listener.listen(50)
711
 
        
712
 
        base_time = time.time()
713
 
        
714
 
        def server(my_results):
715
 
            (sock, addr) = listener.accept()
716
 
            
717
 
            datasize = 0
718
 
            
719
 
            t1 = None
720
 
            t2 = None
721
 
            try:
722
 
                while True:
723
 
                    data = sock.recv(recvsize)
724
 
                    if not t1:
725
 
                        t1 = time.time() - base_time
726
 
                    if not data:
727
 
                        t2 = time.time() - base_time
728
 
                        my_results.append(datasize)
729
 
                        my_results.append((t1,t2))
730
 
                        break
731
 
                    datasize += len(data)
732
 
            finally:
733
 
                sock.close()
734
 
 
735
 
        def client():
736
 
            pid = os.fork()
737
 
            if pid:
738
 
                return pid
739
 
    
740
 
            client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
741
 
            client.connect(('127.0.0.1', port))
742
 
 
743
 
            bufsized(client, size=sendsize)
744
 
 
745
 
            for i in range(sendloops):
746
 
                client.sendall(s2b('*') * sendsize)
747
 
            client.close()
748
 
            os._exit(0)
749
 
 
750
 
        clients = []
751
 
        servers = []
752
 
        for r in results:
753
 
            servers.append(eventlet.spawn(server, r))
754
 
        for r in results:
755
 
            clients.append(client())
756
 
 
757
 
        for s in servers:
758
 
            s.wait()
759
 
        for c in clients:
760
 
            os.waitpid(c, 0)
761
 
 
762
 
        listener.close()
763
 
 
764
 
        # now test that all of the server receive intervals overlap, and
765
 
        # that there were no errors.
766
 
        for r in results:
767
 
            assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
768
 
            assert r[0] == sendsize * sendloops
769
 
            assert len(r[1]) == 2
770
 
            assert r[1][0] is not None
771
 
            assert r[1][1] is not None
772
 
 
773
 
        starttimes = sorted(r[1][0] for r in results)
774
 
        endtimes = sorted(r[1][1] for r in results)
775
 
        runlengths = sorted(r[1][1] - r[1][0] for r in results)
776
 
 
777
 
        # assert that the last task started before the first task ended
778
 
        # (our no-starvation condition)
779
 
        assert starttimes[-1] < endtimes[0], "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
780
 
 
781
 
        maxstartdiff = starttimes[-1] - starttimes[0]
782
 
 
783
 
        assert maxstartdiff * 2 < runlengths[0], "Largest difference in starting times more than twice the shortest running time!"
784
 
        assert runlengths[0] * 2 > runlengths[-1], "Longest runtime more than twice as long as shortest!"
785
 
 
786
 
if __name__ == '__main__':
787
 
    main()