~ubuntu-branches/ubuntu/quantal/python-gevent/quantal

« back to all changes in this revision

Viewing changes to greentest/test__server_close.py

  • Committer: Bazaar Package Importer
  • Author(s): Örjan Persson
  • Date: 2011-05-17 16:43:20 UTC
  • mfrom: (6.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110517164320-nwhld2xo6sz58p4m
Tags: 0.13.6-1
New upstream version (Closes: #601863).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import unittest
2
 
from gevent import socket
3
 
import gevent
4
 
from gevent.server import StreamServer
5
 
import errno
6
 
import os
7
 
from test__server import SimpleStreamServer
8
 
 
9
 
 
10
 
class Test(unittest.TestCase):
11
 
 
12
 
    ServerSubClass = SimpleStreamServer
13
 
 
14
 
    def makefile(self, timeout=0.1, bufsize=1):
15
 
        sock = socket.create_connection((self.server.server_host, self.server.server_port))
16
 
        sock.settimeout(timeout)
17
 
        return sock.makefile(bufsize=bufsize)
18
 
 
19
 
    def assertConnectionRefused(self):
20
 
        try:
21
 
            conn = self.makefile()
22
 
            raise AssertionError('Connection was not refused: %r' % (conn._sock, ))
23
 
        except socket.error, ex:
24
 
            if ex[0] != errno.ECONNREFUSED:
25
 
                raise
26
 
 
27
 
    def assertRequestSucceeded(self):
28
 
        conn = self.makefile()
29
 
        conn.write('GET /ping HTTP/1.0\r\n\r\n')
30
 
        result = conn.read()
31
 
        assert result.endswith('\r\n\r\nPONG'), repr(result)
32
 
 
33
 
    def init_server(self):
34
 
        self.server = self.ServerSubClass(('127.0.0.1', 0))
35
 
        self.server.start()
36
 
        gevent.sleep(0.01)
37
 
 
38
 
    def test_socket_shutdown(self):
39
 
        self.init_server()
40
 
        self.server.socket.shutdown(socket.SHUT_RDWR)
41
 
        self.assertConnectionRefused()
42
 
        assert not self.server.started, self.server
43
 
 
44
 
    def test_socket_close(self):
45
 
        self.server = self.ServerSubClass(('127.0.0.1', 0))
46
 
        self.server.start()
47
 
        self.server.socket.close()
48
 
        self.assertConnectionRefused()
49
 
        #assert not self.server.started
50
 
 
51
 
    def test_socket_close_fileno(self):
52
 
        self.server = self.ServerSubClass(('127.0.0.1', 0))
53
 
        self.server.start()
54
 
        os.close(self.server.socket.fileno())
55
 
        self.assertConnectionRefused()
56
 
        #assert not self.server.started
57
 
 
58
 
    def test_socket_file(self):
59
 
        self.server = self.ServerSubClass(('127.0.0.1', 0))
60
 
        self.server.start()
61
 
        os.close(self.server.socket.fileno())
62
 
        f = open("/dev/zero", "r")
63
 
        self.assertConnectionRefused()
64
 
        del f
65
 
 
66
 
 
67
 
if __name__ == '__main__':
68
 
    unittest.main()
69