~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to sandbox/foom/web/test_http.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2004-06-21 22:01:11 UTC
  • mto: (2.2.3 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20040621220111-vkf909euqnyrp3nr
Tags: upstream-1.3.0
ImportĀ upstreamĀ versionĀ 1.3.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# This library is free software; you can redistribute it and/or
 
2
# modify it under the terms of version 2.1 of the GNU Lesser General Public
 
3
# License as published by the Free Software Foundation.
 
4
#
 
5
# This library is distributed in the hope that it will be useful,
 
6
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
7
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
8
# Lesser General Public License for more details.
 
9
#
 
10
# You should have received a copy of the GNU Lesser General Public
 
11
# License along with this library; if not, write to the Free Software
 
12
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
13
 
 
14
"""Test HTTP support."""
 
15
 
 
16
from __future__ import nested_scopes
 
17
 
 
18
import string, random, urllib, cgi
 
19
from twisted.trial import unittest
 
20
from twisted.protocols import loopback, basic
 
21
from twisted.internet import protocol
 
22
from twisted.test.test_protocols import StringIOWithoutClosing
 
23
from twisted.python.util import OrderedDict
 
24
 
 
25
import http
 
26
 
 
27
class SuxHTTPClient(basic.LineReceiver):
 
28
    """A client for HTTP 1.0
 
29
 
 
30
    Notes:
 
31
    You probably want to send a 'Host' header with the name of
 
32
    the site you're connecting to, in order to not break name
 
33
    based virtual hosting.
 
34
    """
 
35
    length = None
 
36
    firstLine = 1
 
37
    __buffer = ''
 
38
 
 
39
    def sendCommand(self, command, path):
 
40
        self.transport.write('%s %s HTTP/1.0\r\n' % (command, path))
 
41
 
 
42
    def sendHeader(self, name, value):
 
43
        self.transport.write('%s: %s\r\n' % (name, value))
 
44
 
 
45
    def endHeaders(self):
 
46
        self.transport.write('\r\n')
 
47
 
 
48
    def lineReceived(self, line):
 
49
        if self.firstLine:
 
50
            self.firstLine = 0
 
51
            try:
 
52
                version, status, message = line.split(None, 2)
 
53
            except ValueError:
 
54
                # sometimes there is no message
 
55
                version, status = line.split(None, 1)
 
56
                message = ""
 
57
            self.handleStatus(version, status, message)
 
58
            return
 
59
        if line:
 
60
            key, val = line.split(':', 1)
 
61
            val = val.lstrip()
 
62
            self.handleHeader(key, val)
 
63
            if key.lower() == 'content-length':
 
64
                self.length = int(val)
 
65
        else:
 
66
            self.handleEndHeaders()
 
67
            self.setRawMode()
 
68
 
 
69
    def connectionLost(self, reason):
 
70
        self.handleResponseEnd()
 
71
 
 
72
    def handleResponseEnd(self):
 
73
        if self.__buffer != None:
 
74
            b = self.__buffer
 
75
            self.__buffer = None
 
76
            self.handleResponse(b)
 
77
    
 
78
    def handleResponsePart(self, data):
 
79
        self.__buffer += data
 
80
 
 
81
    def connectionMade(self):
 
82
        pass
 
83
 
 
84
    handleStatus = handleHeader = handleEndHeaders = lambda *args: None
 
85
 
 
86
    def rawDataReceived(self, data):
 
87
        if self.length is not None:
 
88
            data, rest = data[:self.length], data[self.length:]
 
89
            self.length -= len(data)
 
90
        else:
 
91
            rest = ''
 
92
        self.handleResponsePart(data)
 
93
        if self.length == 0:
 
94
            self.handleResponseEnd()
 
95
            self.setLineMode(rest)
 
96
 
 
97
 
 
98
class DummyHTTPHandler(http.Request):
 
99
 
 
100
    def process(self):
 
101
        self.headers = OrderedDict(self.headers)
 
102
        self.content.seek(0, 0)
 
103
        data = self.content.read()
 
104
        length = self.getHeader('content-length')
 
105
        request = "'''\n"+str(length)+"\n"+data+"'''\n"
 
106
        self.setResponseCode(200)
 
107
        self.setHeader("Request", self.uri)
 
108
        self.setHeader("Command", self.method)
 
109
        self.setHeader("Version", self.clientproto)
 
110
        self.setHeader("Content-Length", len(request))
 
111
        self.write(request)
 
112
        self.finish()
 
113
 
 
114
 
 
115
class LoopbackHTTPClient(SuxHTTPClient):
 
116
 
 
117
    def connectionMade(self):
 
118
        self.sendCommand("GET", "/foo/bar")
 
119
        self.sendHeader("Content-Length", 10)
 
120
        self.endHeaders()
 
121
        self.transport.write("0123456789")
 
122
 
 
123
 
 
124
class HTTP1_0TestCase(unittest.TestCase):
 
125
 
 
126
    requests = '''\
 
127
GET / HTTP/1.0
 
128
 
 
129
GET / HTTP/1.1
 
130
Accept: text/html
 
131
 
 
132
'''
 
133
    requests = string.replace(requests, '\n', '\r\n')
 
134
 
 
135
    expected_response = "HTTP/1.0 200 OK\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.0\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012"
 
136
 
 
137
    def testBuffer(self):
 
138
        b = StringIOWithoutClosing()
 
139
        a = http.HTTPChannel()
 
140
        a.requestFactory = DummyHTTPHandler
 
141
        a.makeConnection(protocol.FileWrapper(b))
 
142
        # one byte at a time, to stress it.
 
143
        for byte in self.requests:
 
144
            a.dataReceived(byte)
 
145
        a.connectionLost(IOError("all one"))
 
146
        value = b.getvalue()
 
147
        if value != self.expected_response:
 
148
            for i in range(len(value)):
 
149
                if len(self.expected_response) <= i:
 
150
                    print `value[i-5:i+10]`, `self.expected_response[i-5:i+10]`
 
151
                elif value[i] != self.expected_response[i]:
 
152
                    print `value[i-5:i+10]`, `self.expected_response[i-5:i+10]`
 
153
                    break
 
154
            print '---VALUE---'
 
155
            print repr(value)
 
156
            print '---EXPECTED---'
 
157
            print repr(self.expected_response)
 
158
            raise AssertionError
 
159
 
 
160
 
 
161
class HTTP1_1TestCase(HTTP1_0TestCase):
 
162
 
 
163
    requests = '''\
 
164
GET / HTTP/1.1
 
165
Accept: text/html
 
166
 
 
167
POST / HTTP/1.1
 
168
Content-Length: 10
 
169
 
 
170
0123456789POST / HTTP/1.1
 
171
Content-Length: 10
 
172
 
 
173
0123456789HEAD / HTTP/1.1
 
174
 
 
175
'''
 
176
    requests = string.replace(requests, '\n', '\r\n')
 
177
 
 
178
    expected_response = "HTTP/1.1 200 OK\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: POST\015\012Version: HTTP/1.1\015\012Content-length: 21\015\012\015\012'''\01210\0120123456789'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: POST\015\012Version: HTTP/1.1\015\012Content-length: 21\015\012\015\012'''\01210\0120123456789'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: HEAD\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012"
 
179
 
 
180
class HTTP1_1_close_TestCase(HTTP1_0TestCase):
 
181
 
 
182
    requests = '''\
 
183
GET / HTTP/1.1
 
184
Accept: text/html
 
185
Connection: close
 
186
 
 
187
GET / HTTP/1.0
 
188
 
 
189
'''
 
190
 
 
191
    requests = string.replace(requests, '\n', '\r\n')
 
192
 
 
193
    expected_response = "HTTP/1.1 200 OK\015\012Connection: close\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012"
 
194
 
 
195
 
 
196
class HTTP0_9TestCase(HTTP1_0TestCase):
 
197
 
 
198
    requests = '''\
 
199
GET /
 
200
'''
 
201
    requests = string.replace(requests, '\n', '\r\n')
 
202
 
 
203
    expected_response = "HTTP/1.1 400 Bad Request\r\n\r\n"
 
204
 
 
205
 
 
206
class HTTPLoopbackTestCase(unittest.TestCase):
 
207
 
 
208
    expectedHeaders = {'request' : '/foo/bar',
 
209
                       'command' : 'GET',
 
210
                       'version' : 'HTTP/1.0',
 
211
                       'content-length' : '21'}
 
212
    numHeaders = 0
 
213
    gotStatus = 0
 
214
    gotResponse = 0
 
215
    gotEndHeaders = 0
 
216
 
 
217
    def _handleStatus(self, version, status, message):
 
218
        self.gotStatus = 1
 
219
        self.assertEquals(version, "HTTP/1.0")
 
220
        self.assertEquals(status, "200")
 
221
 
 
222
    def _handleResponse(self, data):
 
223
        self.gotResponse = 1
 
224
        self.assertEquals(data, "'''\n10\n0123456789'''\n")
 
225
 
 
226
    def _handleHeader(self, key, value):
 
227
        self.numHeaders = self.numHeaders + 1
 
228
        self.assertEquals(self.expectedHeaders[string.lower(key)], value)
 
229
 
 
230
    def _handleEndHeaders(self):
 
231
        self.gotEndHeaders = 1
 
232
        self.assertEquals(self.numHeaders, 4)
 
233
 
 
234
    def testLoopback(self):
 
235
        server = http.HTTPChannel()
 
236
        server.requestFactory = DummyHTTPHandler
 
237
        client = LoopbackHTTPClient()
 
238
        client.handleResponse = self._handleResponse
 
239
        client.handleHeader = self._handleHeader
 
240
        client.handleEndHeaders = self._handleEndHeaders
 
241
        client.handleStatus = self._handleStatus
 
242
        loopback.loopback(server, client)
 
243
        if not (self.gotStatus and self.gotResponse and self.gotEndHeaders):
 
244
            raise RuntimeError, "didn't got all callbacks %s" % [self.gotStatus, self.gotResponse, self.gotEndHeaders]
 
245
        del self.gotEndHeaders
 
246
        del self.gotResponse
 
247
        del self.gotStatus
 
248
        del self.numHeaders
 
249
 
 
250
 
 
251
class PRequest:
 
252
    """Dummy request for persistence tests."""
 
253
 
 
254
    def __init__(self, **headers):
 
255
        self.received_headers = headers
 
256
        self.headers = {}
 
257
 
 
258
    def getHeader(self, k):
 
259
        return self.received_headers.get(k, '')
 
260
 
 
261
    def setHeader(self, k, v):
 
262
        self.headers[k] = v
 
263
 
 
264
 
 
265
class PersistenceTestCase(unittest.TestCase):
 
266
    """Tests for persistent HTTP connections."""
 
267
 
 
268
    ptests = [#(PRequest(connection="Keep-Alive"), "HTTP/1.0", 1, {'connection' : 'Keep-Alive'}),
 
269
              (PRequest(), "HTTP/1.0", 0, {'connection': None}),
 
270
              (PRequest(connection="close"), "HTTP/1.1", 0, {'connection' : 'close'}),
 
271
              (PRequest(), "HTTP/1.1", 1, {'connection': None}),
 
272
              (PRequest(), "HTTP/0.9", 0, {'connection': None}),
 
273
              ]
 
274
 
 
275
 
 
276
    def testAlgorithm(self):
 
277
        c = http.HTTPChannel()
 
278
        for req, version, correctResult, resultHeaders in self.ptests:
 
279
            result = c.checkPersistence(req, version)
 
280
            self.assertEquals(result, correctResult)
 
281
            for header in resultHeaders.keys():
 
282
                self.assertEquals(req.headers.get(header, None), resultHeaders[header])
 
283
 
 
284
 
 
285
class ChunkingTestCase(unittest.TestCase):
 
286
 
 
287
    strings = ["abcv", "", "fdfsd423", "Ffasfas\r\n",
 
288
               "523523\n\rfsdf", "4234"]
 
289
 
 
290
    def testChunks(self):
 
291
        for s in self.strings:
 
292
            self.assertEquals((s, ''), http.fromChunk(''.join(http.toChunk(s))))
 
293
 
 
294
    def testConcatenatedChunks(self):
 
295
        chunked = ''.join([''.join(http.toChunk(t)) for t in self.strings])
 
296
        result = []
 
297
        buffer = ""
 
298
        for c in chunked:
 
299
            buffer = buffer + c
 
300
            try:
 
301
                data, buffer = http.fromChunk(buffer)
 
302
                result.append(data)
 
303
            except ValueError:
 
304
                pass
 
305
        self.assertEquals(result, self.strings)
 
306
 
 
307
 
 
308
 
 
309
class ParsingTestCase(unittest.TestCase):
 
310
 
 
311
    def runRequest(self, httpRequest, requestClass, success=1):
 
312
        httpRequest = httpRequest.replace("\n", "\r\n")
 
313
        b = StringIOWithoutClosing()
 
314
        a = http.HTTPChannel()
 
315
        a.requestFactory = requestClass
 
316
        a.makeConnection(protocol.FileWrapper(b))
 
317
        # one byte at a time, to stress it.
 
318
        for byte in httpRequest:
 
319
            if a.transport.closed:
 
320
                break
 
321
            a.dataReceived(byte)
 
322
        a.connectionLost(IOError("all done"))
 
323
        if success:
 
324
            self.assertEquals(self.didRequest, 1)
 
325
            del self.didRequest
 
326
        else:
 
327
            self.assert_(not hasattr(self, "didRequest"))
 
328
 
 
329
    def testBasicAuth(self):
 
330
        testcase = self
 
331
        class Request(http.Request):
 
332
            l = []
 
333
            def process(self):
 
334
                testcase.assertEquals(self.getUser(), self.l[0])
 
335
                testcase.assertEquals(self.getPassword(), self.l[1])
 
336
        for u, p in [("foo", "bar"), ("hello", "there:z")]:
 
337
            Request.l[:] = [u, p]
 
338
            s = "%s:%s" % (u, p)
 
339
            f = "GET / HTTP/1.0\nAuthorization: Basic %s\n\n" % (s.encode("base64").strip(), )
 
340
            self.runRequest(f, Request, 0)
 
341
    
 
342
    def testTooManyHeaders(self):
 
343
        httpRequest = "GET / HTTP/1.0\n"
 
344
        for i in range(502):
 
345
            httpRequest += "%s: foo\n" % i
 
346
        httpRequest += "\n"
 
347
        class MyRequest(http.Request):
 
348
            def process(self):
 
349
                raise RuntimeError, "should not get called"
 
350
        self.runRequest(httpRequest, MyRequest, 0)
 
351
        
 
352
    def testHeaders(self):
 
353
        httpRequest = """\
 
354
GET / HTTP/1.0
 
355
Foo: bar
 
356
baz: 1 2 3
 
357
 
 
358
"""
 
359
        testcase = self
 
360
 
 
361
        class MyRequest(http.Request):
 
362
            def process(self):
 
363
                testcase.assertEquals(self.getHeader('foo'), 'bar')
 
364
                testcase.assertEquals(self.getHeader('Foo'), 'bar')
 
365
                testcase.assertEquals(self.getHeader('bAz'), '1 2 3')
 
366
                testcase.didRequest = 1
 
367
                self.finish()
 
368
 
 
369
        self.runRequest(httpRequest, MyRequest)
 
370
 
 
371
    def testCookies(self):
 
372
        httpRequest = '''\
 
373
GET / HTTP/1.0
 
374
Cookie: rabbit="eat carrot"; ninja=secret
 
375
 
 
376
'''
 
377
        testcase = self
 
378
 
 
379
        class MyRequest(http.Request):
 
380
            def process(self):
 
381
                testcase.assertEquals(self.getCookie('rabbit'), '"eat carrot"')
 
382
                testcase.assertEquals(self.getCookie('ninja'), 'secret')
 
383
                testcase.didRequest = 1
 
384
                self.finish()
 
385
 
 
386
        self.runRequest(httpRequest, MyRequest)
 
387
 
 
388
    def testGET(self):
 
389
        httpRequest = '''\
 
390
GET /?key=value&multiple=two+words&multiple=more%20words&empty= HTTP/1.0
 
391
 
 
392
'''
 
393
        testcase = self
 
394
        class MyRequest(http.Request):
 
395
            def process(self):
 
396
                testcase.assertEquals(self.method, "GET")
 
397
                testcase.assertEquals(self.args["key"], ["value"])
 
398
                testcase.assertEquals(self.args["empty"], [""])
 
399
                testcase.assertEquals(self.args["multiple"], ["two words", "more words"])
 
400
                testcase.didRequest = 1
 
401
                self.finish()
 
402
 
 
403
        self.runRequest(httpRequest, MyRequest)
 
404
 
 
405
    def testPOST(self):
 
406
        query = 'key=value&multiple=two+words&multiple=more%20words&empty='
 
407
        httpRequest = '''\
 
408
POST / HTTP/1.0
 
409
Content-Length: %d
 
410
Content-Type: application/x-www-form-urlencoded
 
411
 
 
412
%s''' % (len(query), query)
 
413
 
 
414
        testcase = self
 
415
        class MyRequest(http.Request):
 
416
            def process(self):
 
417
                testcase.assertEquals(self.method, "POST")
 
418
                testcase.assertEquals(self.args["key"], ["value"])
 
419
                testcase.assertEquals(self.args["empty"], [""])
 
420
                testcase.assertEquals(self.args["multiple"], ["two words", "more words"])
 
421
                testcase.didRequest = 1
 
422
                self.finish()
 
423
 
 
424
        self.runRequest(httpRequest, MyRequest)
 
425
 
 
426
class QueryArgumentsTestCase(unittest.TestCase):
 
427
    def testUnquote(self):
 
428
        try:
 
429
            from twisted.protocols import _c_urlarg
 
430
        except ImportError:
 
431
            raise unittest.SkipTest
 
432
        # work exactly like urllib.unquote, including stupid things
 
433
        # % followed by a non-hexdigit in the middle and in the end
 
434
        self.failUnlessEqual(urllib.unquote("%notreally%n"),
 
435
            _c_urlarg.unquote("%notreally%n"))
 
436
        # % followed by hexdigit, followed by non-hexdigit
 
437
        self.failUnlessEqual(urllib.unquote("%1quite%1"),
 
438
            _c_urlarg.unquote("%1quite%1"))
 
439
        # unquoted text, followed by some quoted chars, ends in a trailing %
 
440
        self.failUnlessEqual(urllib.unquote("blah%21%40%23blah%"), 
 
441
            _c_urlarg.unquote("blah%21%40%23blah%"))
 
442
 
 
443
    def testParseqs(self):
 
444
        self.failUnlessEqual(cgi.parse_qs("a=b&d=c;+=f"),
 
445
            http.parse_qs("a=b&d=c;+=f"))
 
446
        self.failUnlessRaises(ValueError, http.parse_qs, "blah",
 
447
            strict_parsing = 1)
 
448
        self.failUnlessEqual(cgi.parse_qs("a=&b=c", keep_blank_values = 1),
 
449
            http.parse_qs("a=&b=c", keep_blank_values = 1))
 
450
        self.failUnlessEqual(cgi.parse_qs("a=&b=c"),
 
451
            http.parse_qs("a=&b=c"))
 
452
 
 
453
    def testEscchar(self):
 
454
        try:
 
455
            from twisted.protocols import _c_urlarg
 
456
        except ImportError:
 
457
            raise unittest.SkipTest
 
458
        self.failUnlessEqual("!@#+b",
 
459
            _c_urlarg.unquote("+21+40+23+b", "+"))
 
460