~ubuntu-branches/ubuntu/quantal/python-gevent/quantal

« back to all changes in this revision

Viewing changes to greentest/test__http.py

  • Committer: Bazaar Package Importer
  • Author(s): Örjan Persson
  • Date: 2011-05-17 16:43:20 UTC
  • mfrom: (6.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110517164320-nwhld2xo6sz58p4m
Tags: 0.13.6-1
New upstream version (Closes: #601863).

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
from gevent import http
4
4
import greentest
5
5
import os
6
 
from urllib2 import urlopen, HTTPError
7
6
import socket
8
7
import errno
 
8
from test__pywsgi import read_http
9
9
 
10
10
# add test for "chunked POST input -> chunked output"
11
11
 
15
15
    address = ('127.0.0.1', 0)
16
16
 
17
17
    def setUp(self):
 
18
        greentest.TestCase.setUp(self)
18
19
        self.server = http.HTTPServer(self.address, self.handle)
19
20
        self.server.start()
20
21
 
25
26
            self.server.stop()
26
27
        finally:
27
28
            timeout.cancel()
28
 
        #self.print_netstat('after stop')
29
29
        self.check_refused()
 
30
        greentest.TestCase.tearDown(self)
30
31
 
31
32
    def print_netstat(self, comment=''):
32
 
        cmd ='sudo netstat -anp | grep %s' % self.server.server_port
33
 
        print cmd, ' # %s' % comment
 
33
        cmd = 'echo "%s" && netstat -anp 2>&1 | grep %s' % (comment, self.server.server_port)
34
34
        os.system(cmd)
35
35
 
36
 
    @property
37
 
    def url(self):
38
 
        return 'http://%s:%s' % (self.server.server_host, self.server.server_port)
39
 
 
40
36
    def connect(self):
41
37
        s = socket.socket()
42
38
        s.connect((self.server.server_host, self.server.server_port))
43
39
        return s
44
40
 
 
41
    def urlopen(self, *args, **kwargs):
 
42
        fd = self.connect().makefile(bufsize=1)
 
43
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
 
44
        return read_http(fd, *args, **kwargs)
 
45
 
45
46
    def check_refused(self):
46
47
        try:
47
48
            self.connect()
52
53
            print 'WARNING: instead of ECONNREFUSED got IOError: %s' % e
53
54
 
54
55
 
 
56
class TestNoop(BoundTestCase):
 
57
 
 
58
    def handle(self, r):
 
59
        pass
 
60
 
 
61
    def test(self):
 
62
        self.urlopen(code=500)
 
63
 
 
64
 
55
65
class TestClientCloses(BoundTestCase):
56
66
 
57
67
    # this test is useless. currently there's no way to know that the client closed the connection,
87
97
        return gevent.spawn_later(0.01, self.reply, r)
88
98
 
89
99
    def test(self):
 
100
        server = http.HTTPServer(self.address, self.handle)
 
101
        server.start()
90
102
        s = self.connect()
91
103
        s.sendall('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
92
104
        s.sendall('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
93
105
        s.close()
94
 
        self.server.stop()
 
106
        server.stop()
95
107
        gevent.sleep(0.02)
96
108
        # stopping what already stopped is OK
97
 
        self.server.stop()
 
109
        server.stop()
98
110
 
99
111
 
100
112
class TestSendReply(BoundTestCase):
103
115
        r.send_reply(200, 'OK', 'hello world')
104
116
 
105
117
    def test(self):
106
 
        response = urlopen(self.url)
107
 
        assert response.code == 200, response
108
 
        assert response.msg == 'OK', response
109
 
        data = response.read()
110
 
        assert data == 'hello world', data
 
118
        self.urlopen(body='hello world')
111
119
 
112
120
    def test_keepalive(self):
113
121
        s = self.connect()
121
129
        raise greentest.ExpectedException('TestException.handle')
122
130
 
123
131
    def test(self):
124
 
        try:
125
 
            urlopen(self.url)
126
 
        except HTTPError, e:
127
 
            assert e.code == 500, e
128
 
            assert e.msg == 'Internal Server Error', e
 
132
        self.urlopen(code=500)
129
133
 
130
134
 
131
135
class TestSendReplyLater(BoundTestCase):
135
139
        r.send_reply(200, 'OK', 'hello world')
136
140
 
137
141
    def test(self):
138
 
        response = urlopen(self.url)
139
 
        #print 'connected to %s' % self.url
140
 
        assert response.code == 200, response
141
 
        assert response.msg == 'OK', response
142
 
        data = response.read()
143
 
        #print 'read data from %s' % self.url
144
 
        assert data == 'hello world', data
 
142
        self.urlopen(body='hello world')
145
143
 
146
144
    def test_client_closes_10(self):
147
145
        s = self.connect()
171
169
        assert input.read() == ''
172
170
        assert output.read() == ''
173
171
        self.handled = True
174
 
        self.current.throw(Exception('test done'))
 
172
        gevent.kill(self.current, Exception('test done'))
175
173
 
176
174
    def test(self):
177
175
        self.current = gevent.getcurrent()
178
176
        try:
179
177
            try:
180
 
                urlopen(self.url)
 
178
                self.urlopen()
181
179
            except Exception, ex:
182
180
                assert str(ex) == 'test done', ex
183
181
        finally:
222
220
 
223
221
if __name__ == '__main__':
224
222
    greentest.main()
225