~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/web/test/test_static.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.web.static}.
 
6
"""
 
7
 
 
8
import os, re, StringIO
 
9
 
 
10
from zope.interface.verify import verifyObject
 
11
 
 
12
from twisted.internet import abstract, interfaces
 
13
from twisted.python.compat import set
 
14
from twisted.python.runtime import platform
 
15
from twisted.python.filepath import FilePath
 
16
from twisted.python import log
 
17
from twisted.trial.unittest import TestCase
 
18
from twisted.web import static, http, script, resource
 
19
from twisted.web.server import UnsupportedMethod
 
20
from twisted.web.test.test_web import DummyRequest
 
21
from twisted.web.test._util import _render
 
22
 
 
23
 
 
24
class StaticDataTests(TestCase):
 
25
    """
 
26
    Tests for L{Data}.
 
27
    """
 
28
    def test_headRequest(self):
 
29
        """
 
30
        L{Data.render} returns an empty response body for a I{HEAD} request.
 
31
        """
 
32
        data = static.Data("foo", "bar")
 
33
        request = DummyRequest([''])
 
34
        request.method = 'HEAD'
 
35
        d = _render(data, request)
 
36
        def cbRendered(ignored):
 
37
            self.assertEqual(''.join(request.written), "")
 
38
        d.addCallback(cbRendered)
 
39
        return d
 
40
 
 
41
 
 
42
    def test_invalidMethod(self):
 
43
        """
 
44
        L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
 
45
        non-I{HEAD} request.
 
46
        """
 
47
        data = static.Data("foo", "bar")
 
48
        request = DummyRequest([''])
 
49
        request.method = 'POST'
 
50
        self.assertRaises(UnsupportedMethod, data.render, request)
 
51
 
 
52
 
 
53
 
 
54
class StaticFileTests(TestCase):
 
55
    """
 
56
    Tests for the basic behavior of L{File}.
 
57
    """
 
58
    def _render(self, resource, request):
 
59
        return _render(resource, request)
 
60
 
 
61
 
 
62
    def test_invalidMethod(self):
 
63
        """
 
64
        L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
 
65
        non-I{HEAD} request.
 
66
        """
 
67
        request = DummyRequest([''])
 
68
        request.method = 'POST'
 
69
        path = FilePath(self.mktemp())
 
70
        path.setContent("foo")
 
71
        file = static.File(path.path)
 
72
        self.assertRaises(UnsupportedMethod, file.render, request)
 
73
 
 
74
 
 
75
    def test_notFound(self):
 
76
        """
 
77
        If a request is made which encounters a L{File} before a final segment
 
78
        which does not correspond to any file in the path the L{File} was
 
79
        created with, a not found response is sent.
 
80
        """
 
81
        base = FilePath(self.mktemp())
 
82
        base.makedirs()
 
83
        file = static.File(base.path)
 
84
 
 
85
        request = DummyRequest(['foobar'])
 
86
        child = resource.getChildForRequest(file, request)
 
87
 
 
88
        d = self._render(child, request)
 
89
        def cbRendered(ignored):
 
90
            self.assertEqual(request.responseCode, 404)
 
91
        d.addCallback(cbRendered)
 
92
        return d
 
93
 
 
94
 
 
95
    def test_emptyChild(self):
 
96
        """
 
97
        The C{''} child of a L{File} which corresponds to a directory in the
 
98
        filesystem is a L{DirectoryLister}.
 
99
        """
 
100
        base = FilePath(self.mktemp())
 
101
        base.makedirs()
 
102
        file = static.File(base.path)
 
103
 
 
104
        request = DummyRequest([''])
 
105
        child = resource.getChildForRequest(file, request)
 
106
        self.assertIsInstance(child, static.DirectoryLister)
 
107
        self.assertEqual(child.path, base.path)
 
108
 
 
109
 
 
110
    def test_securityViolationNotFound(self):
 
111
        """
 
112
        If a request is made which encounters a L{File} before a final segment
 
113
        which cannot be looked up in the filesystem due to security
 
114
        considerations, a not found response is sent.
 
115
        """
 
116
        base = FilePath(self.mktemp())
 
117
        base.makedirs()
 
118
        file = static.File(base.path)
 
119
 
 
120
        request = DummyRequest(['..'])
 
121
        child = resource.getChildForRequest(file, request)
 
122
 
 
123
        d = self._render(child, request)
 
124
        def cbRendered(ignored):
 
125
            self.assertEqual(request.responseCode, 404)
 
126
        d.addCallback(cbRendered)
 
127
        return d
 
128
 
 
129
 
 
130
    def test_forbiddenResource(self):
 
131
        """
 
132
        If the file in the filesystem which would satisfy a request cannot be
 
133
        read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
 
134
        """
 
135
        base = FilePath(self.mktemp())
 
136
        base.setContent('')
 
137
        # Make sure we can delete the file later.
 
138
        self.addCleanup(base.chmod, 0700)
 
139
 
 
140
        # Get rid of our own read permission.
 
141
        base.chmod(0)
 
142
 
 
143
        file = static.File(base.path)
 
144
        request = DummyRequest([''])
 
145
        d = self._render(file, request)
 
146
        def cbRendered(ignored):
 
147
            self.assertEqual(request.responseCode, 403)
 
148
        d.addCallback(cbRendered)
 
149
        return d
 
150
    if platform.isWindows():
 
151
        test_forbiddenResource.skip = "Cannot remove read permission on Windows"
 
152
 
 
153
 
 
154
    def test_indexNames(self):
 
155
        """
 
156
        If a request is made which encounters a L{File} before a final empty
 
157
        segment, a file in the L{File} instance's C{indexNames} list which
 
158
        exists in the path the L{File} was created with is served as the
 
159
        response to the request.
 
160
        """
 
161
        base = FilePath(self.mktemp())
 
162
        base.makedirs()
 
163
        base.child("foo.bar").setContent("baz")
 
164
        file = static.File(base.path)
 
165
        file.indexNames = ['foo.bar']
 
166
 
 
167
        request = DummyRequest([''])
 
168
        child = resource.getChildForRequest(file, request)
 
169
 
 
170
        d = self._render(child, request)
 
171
        def cbRendered(ignored):
 
172
            self.assertEqual(''.join(request.written), 'baz')
 
173
            self.assertEqual(request.outgoingHeaders['content-length'], '3')
 
174
        d.addCallback(cbRendered)
 
175
        return d
 
176
 
 
177
 
 
178
    def test_staticFile(self):
 
179
        """
 
180
        If a request is made which encounters a L{File} before a final segment
 
181
        which names a file in the path the L{File} was created with, that file
 
182
        is served as the response to the request.
 
183
        """
 
184
        base = FilePath(self.mktemp())
 
185
        base.makedirs()
 
186
        base.child("foo.bar").setContent("baz")
 
187
        file = static.File(base.path)
 
188
 
 
189
        request = DummyRequest(['foo.bar'])
 
190
        child = resource.getChildForRequest(file, request)
 
191
 
 
192
        d = self._render(child, request)
 
193
        def cbRendered(ignored):
 
194
            self.assertEqual(''.join(request.written), 'baz')
 
195
            self.assertEqual(request.outgoingHeaders['content-length'], '3')
 
196
        d.addCallback(cbRendered)
 
197
        return d
 
198
 
 
199
 
 
200
    def test_staticFileDeletedGetChild(self):
 
201
        """
 
202
        A L{static.File} created for a directory which does not exist should
 
203
        return childNotFound from L{static.File.getChild}.
 
204
        """
 
205
        staticFile = static.File(self.mktemp())
 
206
        request = DummyRequest(['foo.bar'])
 
207
        child = staticFile.getChild("foo.bar", request)
 
208
        self.assertEquals(child, staticFile.childNotFound)
 
209
 
 
210
 
 
211
    def test_staticFileDeletedRender(self):
 
212
        """
 
213
        A L{static.File} created for a file which does not exist should render
 
214
        its C{childNotFound} page.
 
215
        """
 
216
        staticFile = static.File(self.mktemp())
 
217
        request = DummyRequest(['foo.bar'])
 
218
        request2 = DummyRequest(['foo.bar'])
 
219
        d = self._render(staticFile, request)
 
220
        d2 = self._render(staticFile.childNotFound, request2)
 
221
        def cbRendered2(ignored):
 
222
            def cbRendered(ignored):
 
223
                self.assertEquals(''.join(request.written),
 
224
                                  ''.join(request2.written))
 
225
            d.addCallback(cbRendered)
 
226
            return d
 
227
        d2.addCallback(cbRendered2)
 
228
        return d2
 
229
 
 
230
 
 
231
    def test_headRequest(self):
 
232
        """
 
233
        L{static.File.render} returns an empty response body for I{HEAD}
 
234
        requests.
 
235
        """
 
236
        path = FilePath(self.mktemp())
 
237
        path.setContent("foo")
 
238
        file = static.File(path.path)
 
239
        request = DummyRequest([''])
 
240
        request.method = 'HEAD'
 
241
        d = _render(file, request)
 
242
        def cbRendered(ignored):
 
243
            self.assertEqual("".join(request.written), "")
 
244
        d.addCallback(cbRendered)
 
245
        return d
 
246
 
 
247
 
 
248
    def test_processors(self):
 
249
        """
 
250
        If a request is made which encounters a L{File} before a final segment
 
251
        which names a file with an extension which is in the L{File}'s
 
252
        C{processors} mapping, the processor associated with that extension is
 
253
        used to serve the response to the request.
 
254
        """
 
255
        base = FilePath(self.mktemp())
 
256
        base.makedirs()
 
257
        base.child("foo.bar").setContent(
 
258
            "from twisted.web.static import Data\n"
 
259
            "resource = Data('dynamic world','text/plain')\n")
 
260
 
 
261
        file = static.File(base.path)
 
262
        file.processors = {'.bar': script.ResourceScript}
 
263
        request = DummyRequest(["foo.bar"])
 
264
        child = resource.getChildForRequest(file, request)
 
265
 
 
266
        d = self._render(child, request)
 
267
        def cbRendered(ignored):
 
268
            self.assertEqual(''.join(request.written), 'dynamic world')
 
269
            self.assertEqual(request.outgoingHeaders['content-length'], '13')
 
270
        d.addCallback(cbRendered)
 
271
        return d
 
272
 
 
273
 
 
274
    def test_ignoreExt(self):
 
275
        """
 
276
        The list of ignored extensions can be set by passing a value to
 
277
        L{File.__init__} or by calling L{File.ignoreExt} later.
 
278
        """
 
279
        file = static.File(".")
 
280
        self.assertEqual(file.ignoredExts, [])
 
281
        file.ignoreExt(".foo")
 
282
        file.ignoreExt(".bar")
 
283
        self.assertEqual(file.ignoredExts, [".foo", ".bar"])
 
284
 
 
285
        file = static.File(".", ignoredExts=(".bar", ".baz"))
 
286
        self.assertEqual(file.ignoredExts, [".bar", ".baz"])
 
287
 
 
288
 
 
289
    def test_ignoredExtensionsIgnored(self):
 
290
        """
 
291
        A request for the I{base} child of a L{File} succeeds with a resource
 
292
        for the I{base<extension>} file in the path the L{File} was created
 
293
        with if such a file exists and the L{File} has been configured to
 
294
        ignore the I{<extension>} extension.
 
295
        """
 
296
        base = FilePath(self.mktemp())
 
297
        base.makedirs()
 
298
        base.child('foo.bar').setContent('baz')
 
299
        base.child('foo.quux').setContent('foobar')
 
300
        file = static.File(base.path, ignoredExts=(".bar",))
 
301
 
 
302
        request = DummyRequest(["foo"])
 
303
        child = resource.getChildForRequest(file, request)
 
304
 
 
305
        d = self._render(child, request)
 
306
        def cbRendered(ignored):
 
307
            self.assertEqual(''.join(request.written), 'baz')
 
308
        d.addCallback(cbRendered)
 
309
        return d
 
310
 
 
311
 
 
312
    def test_createPickleChild(self):
 
313
        """
 
314
        L{static.File.createPickleChild} is deprecated.
 
315
        """
 
316
        path = FilePath(self.mktemp())
 
317
        path.makedirs()
 
318
        static.File(path.path).createPickleChild("foo", None)
 
319
        warnings = self.flushWarnings([self.test_createPickleChild])
 
320
        self.assertEqual(warnings[0]['category'], DeprecationWarning)
 
321
        self.assertEqual(
 
322
            warnings[0]['message'],
 
323
            "File.createPickleChild is deprecated since Twisted 9.0.  "
 
324
            "Resource persistence is beyond the scope of Twisted Web.")
 
325
        self.assertEqual(len(warnings), 1)
 
326
 
 
327
 
 
328
 
 
329
class StaticMakeProducerTests(TestCase):
 
330
    """
 
331
    Tests for L{File.makeProducer}.
 
332
    """
 
333
 
 
334
 
 
335
    def makeResourceWithContent(self, content, type=None, encoding=None):
 
336
        """
 
337
        Make a L{static.File} resource that has C{content} for its content.
 
338
 
 
339
        @param content: The bytes to use as the contents of the resource.
 
340
        @param type: Optional value for the content type of the resource.
 
341
        """
 
342
        fileName = self.mktemp()
 
343
        fileObject = open(fileName, 'w')
 
344
        fileObject.write(content)
 
345
        fileObject.close()
 
346
        resource = static.File(fileName)
 
347
        resource.encoding = encoding
 
348
        resource.type = type
 
349
        return resource
 
350
 
 
351
 
 
352
    def contentHeaders(self, request):
 
353
        """
 
354
        Extract the content-* headers from the L{DummyRequest} C{request}.
 
355
 
 
356
        This returns the subset of C{request.outgoingHeaders} of headers that
 
357
        start with 'content-'.
 
358
        """
 
359
        contentHeaders = {}
 
360
        for k, v in request.outgoingHeaders.iteritems():
 
361
            if k.startswith('content-'):
 
362
                contentHeaders[k] = v
 
363
        return contentHeaders
 
364
 
 
365
 
 
366
    def test_noRangeHeaderGivesNoRangeStaticProducer(self):
 
367
        """
 
368
        makeProducer when no Range header is set returns an instance of
 
369
        NoRangeStaticProducer.
 
370
        """
 
371
        resource = self.makeResourceWithContent('')
 
372
        request = DummyRequest([])
 
373
        producer = resource.makeProducer(request, resource.openForReading())
 
374
        self.assertIsInstance(producer, static.NoRangeStaticProducer)
 
375
 
 
376
 
 
377
    def test_noRangeHeaderSets200OK(self):
 
378
        """
 
379
        makeProducer when no Range header is set sets the responseCode on the
 
380
        request to 'OK'.
 
381
        """
 
382
        resource = self.makeResourceWithContent('')
 
383
        request = DummyRequest([])
 
384
        resource.makeProducer(request, resource.openForReading())
 
385
        self.assertEqual(http.OK, request.responseCode)
 
386
 
 
387
 
 
388
    def test_noRangeHeaderSetsContentHeaders(self):
 
389
        """
 
390
        makeProducer when no Range header is set sets the Content-* headers
 
391
        for the response.
 
392
        """
 
393
        length = 123
 
394
        contentType = "text/plain"
 
395
        contentEncoding = 'gzip'
 
396
        resource = self.makeResourceWithContent(
 
397
            'a'*length, type=contentType, encoding=contentEncoding)
 
398
        request = DummyRequest([])
 
399
        resource.makeProducer(request, resource.openForReading())
 
400
        self.assertEqual(
 
401
            {'content-type': contentType, 'content-length': str(length),
 
402
             'content-encoding': contentEncoding},
 
403
            self.contentHeaders(request))
 
404
 
 
405
 
 
406
    def test_singleRangeGivesSingleRangeStaticProducer(self):
 
407
        """
 
408
        makeProducer when the Range header requests a single byte range
 
409
        returns an instance of SingleRangeStaticProducer.
 
410
        """
 
411
        request = DummyRequest([])
 
412
        request.headers['range'] = 'bytes=1-3'
 
413
        resource = self.makeResourceWithContent('abcdef')
 
414
        producer = resource.makeProducer(request, resource.openForReading())
 
415
        self.assertIsInstance(producer, static.SingleRangeStaticProducer)
 
416
 
 
417
 
 
418
    def test_singleRangeSets206PartialContent(self):
 
419
        """
 
420
        makeProducer when the Range header requests a single, satisfiable byte
 
421
        range sets the response code on the request to 'Partial Content'.
 
422
        """
 
423
        request = DummyRequest([])
 
424
        request.headers['range'] = 'bytes=1-3'
 
425
        resource = self.makeResourceWithContent('abcdef')
 
426
        resource.makeProducer(request, resource.openForReading())
 
427
        self.assertEqual(
 
428
            http.PARTIAL_CONTENT, request.responseCode)
 
429
 
 
430
 
 
431
    def test_singleRangeSetsContentHeaders(self):
 
432
        """
 
433
        makeProducer when the Range header requests a single, satisfiable byte
 
434
        range sets the Content-* headers appropriately.
 
435
        """
 
436
        request = DummyRequest([])
 
437
        request.headers['range'] = 'bytes=1-3'
 
438
        contentType = "text/plain"
 
439
        contentEncoding = 'gzip'
 
440
        resource = self.makeResourceWithContent('abcdef', type=contentType, encoding=contentEncoding)
 
441
        resource.makeProducer(request, resource.openForReading())
 
442
        self.assertEqual(
 
443
            {'content-type': contentType, 'content-encoding': contentEncoding,
 
444
             'content-range': 'bytes 1-3/6', 'content-length': '3'},
 
445
            self.contentHeaders(request))
 
446
 
 
447
 
 
448
    def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
 
449
        """
 
450
        makeProducer still returns an instance of L{SingleRangeStaticProducer}
 
451
        when the Range header requests a single unsatisfiable byte range.
 
452
        """
 
453
        request = DummyRequest([])
 
454
        request.headers['range'] = 'bytes=4-10'
 
455
        resource = self.makeResourceWithContent('abc')
 
456
        producer = resource.makeProducer(request, resource.openForReading())
 
457
        self.assertIsInstance(producer, static.SingleRangeStaticProducer)
 
458
 
 
459
 
 
460
    def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
 
461
        """
 
462
        makeProducer sets the response code of the request to of 'Requested
 
463
        Range Not Satisfiable' when the Range header requests a single
 
464
        unsatisfiable byte range.
 
465
        """
 
466
        request = DummyRequest([])
 
467
        request.headers['range'] = 'bytes=4-10'
 
468
        resource = self.makeResourceWithContent('abc')
 
469
        resource.makeProducer(request, resource.openForReading())
 
470
        self.assertEqual(
 
471
            http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
 
472
 
 
473
 
 
474
    def test_singleUnsatisfiableRangeSetsContentHeaders(self):
 
475
        """
 
476
        makeProducer when the Range header requests a single, unsatisfiable
 
477
        byte range sets the Content-* headers appropriately.
 
478
        """
 
479
        request = DummyRequest([])
 
480
        request.headers['range'] = 'bytes=4-10'
 
481
        contentType = "text/plain"
 
482
        resource = self.makeResourceWithContent('abc', type=contentType)
 
483
        resource.makeProducer(request, resource.openForReading())
 
484
        self.assertEqual(
 
485
            {'content-type': 'text/plain', 'content-length': '0',
 
486
             'content-range': 'bytes */3'},
 
487
            self.contentHeaders(request))
 
488
 
 
489
 
 
490
    def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
 
491
        """
 
492
        makeProducer when the Range header requests a single byte range that
 
493
        partly overlaps the resource sets the Content-* headers appropriately.
 
494
        """
 
495
        request = DummyRequest([])
 
496
        request.headers['range'] = 'bytes=2-10'
 
497
        contentType = "text/plain"
 
498
        resource = self.makeResourceWithContent('abc', type=contentType)
 
499
        resource.makeProducer(request, resource.openForReading())
 
500
        self.assertEqual(
 
501
            {'content-type': 'text/plain', 'content-length': '1',
 
502
             'content-range': 'bytes 2-2/3'},
 
503
            self.contentHeaders(request))
 
504
 
 
505
 
 
506
    def test_multipleRangeGivesMultipleRangeStaticProducer(self):
 
507
        """
 
508
        makeProducer when the Range header requests a single byte range
 
509
        returns an instance of MultipleRangeStaticProducer.
 
510
        """
 
511
        request = DummyRequest([])
 
512
        request.headers['range'] = 'bytes=1-3,5-6'
 
513
        resource = self.makeResourceWithContent('abcdef')
 
514
        producer = resource.makeProducer(request, resource.openForReading())
 
515
        self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
 
516
 
 
517
 
 
518
    def test_multipleRangeSets206PartialContent(self):
 
519
        """
 
520
        makeProducer when the Range header requests a multiple satisfiable
 
521
        byte ranges sets the response code on the request to 'Partial
 
522
        Content'.
 
523
        """
 
524
        request = DummyRequest([])
 
525
        request.headers['range'] = 'bytes=1-3,5-6'
 
526
        resource = self.makeResourceWithContent('abcdef')
 
527
        resource.makeProducer(request, resource.openForReading())
 
528
        self.assertEqual(
 
529
            http.PARTIAL_CONTENT, request.responseCode)
 
530
 
 
531
 
 
532
    def test_mutipleRangeSetsContentHeaders(self):
 
533
        """
 
534
        makeProducer when the Range header requests a single, satisfiable byte
 
535
        range sets the Content-* headers appropriately.
 
536
        """
 
537
        request = DummyRequest([])
 
538
        request.headers['range'] = 'bytes=1-3,5-6'
 
539
        resource = self.makeResourceWithContent(
 
540
            'abcdefghijkl', encoding='gzip')
 
541
        producer = resource.makeProducer(request, resource.openForReading())
 
542
        contentHeaders = self.contentHeaders(request)
 
543
        # The only content-* headers set are content-type and content-length.
 
544
        self.assertEqual(
 
545
            set(['content-length', 'content-type']),
 
546
            set(contentHeaders.keys()))
 
547
        # The content-length depends on the boundary used in the response.
 
548
        expectedLength = 5
 
549
        for boundary, offset, size in producer.rangeInfo:
 
550
            expectedLength += len(boundary)
 
551
        self.assertEqual(expectedLength, contentHeaders['content-length'])
 
552
        # Content-type should be set to a value indicating a multipart
 
553
        # response and the boundary used to separate the parts.
 
554
        self.assertIn('content-type', contentHeaders)
 
555
        contentType = contentHeaders['content-type']
 
556
        self.assertNotIdentical(
 
557
            None, re.match(
 
558
                'multipart/byteranges; boundary="[^"]*"\Z', contentType))
 
559
        # Content-encoding is not set in the response to a multiple range
 
560
        # response, which is a bit wussy but works well enough with the way
 
561
        # static.File does content-encodings...
 
562
        self.assertNotIn('content-encoding', contentHeaders)
 
563
 
 
564
 
 
565
    def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
 
566
        """
 
567
        makeProducer still returns an instance of L{SingleRangeStaticProducer}
 
568
        when the Range header requests multiple ranges, none of which are
 
569
        satisfiable.
 
570
        """
 
571
        request = DummyRequest([])
 
572
        request.headers['range'] = 'bytes=10-12,15-20'
 
573
        resource = self.makeResourceWithContent('abc')
 
574
        producer = resource.makeProducer(request, resource.openForReading())
 
575
        self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
 
576
 
 
577
 
 
578
    def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
 
579
        """
 
580
        makeProducer sets the response code of the request to of 'Requested
 
581
        Range Not Satisfiable' when the Range header requests multiple ranges,
 
582
        none of which are satisfiable.
 
583
        """
 
584
        request = DummyRequest([])
 
585
        request.headers['range'] = 'bytes=10-12,15-20'
 
586
        resource = self.makeResourceWithContent('abc')
 
587
        resource.makeProducer(request, resource.openForReading())
 
588
        self.assertEqual(
 
589
            http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
 
590
 
 
591
 
 
592
    def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
 
593
        """
 
594
        makeProducer when the Range header requests multiple ranges, none of
 
595
        which are satisfiable, sets the Content-* headers appropriately.
 
596
        """
 
597
        request = DummyRequest([])
 
598
        request.headers['range'] = 'bytes=4-10'
 
599
        contentType = "text/plain"
 
600
        request.headers['range'] = 'bytes=10-12,15-20'
 
601
        resource = self.makeResourceWithContent('abc', type=contentType)
 
602
        resource.makeProducer(request, resource.openForReading())
 
603
        self.assertEqual(
 
604
            {'content-length': '0', 'content-range': 'bytes */3'},
 
605
            self.contentHeaders(request))
 
606
 
 
607
 
 
608
    def test_oneSatisfiableRangeIsEnough(self):
 
609
        """
 
610
        makeProducer when the Range header requests multiple ranges, at least
 
611
        one of which matches, sets the response code to 'Partial Content'.
 
612
        """
 
613
        request = DummyRequest([])
 
614
        request.headers['range'] = 'bytes=1-3,100-200'
 
615
        resource = self.makeResourceWithContent('abcdef')
 
616
        resource.makeProducer(request, resource.openForReading())
 
617
        self.assertEqual(
 
618
            http.PARTIAL_CONTENT, request.responseCode)
 
619
 
 
620
 
 
621
 
 
622
class StaticProducerTests(TestCase):
 
623
    """
 
624
    Tests for the abstract L{StaticProducer}.
 
625
    """
 
626
 
 
627
    def test_stopProducingClosesFile(self):
 
628
        """
 
629
        L{StaticProducer.stopProducing} closes the file object the producer is
 
630
        producing data from.
 
631
        """
 
632
        fileObject = StringIO.StringIO()
 
633
        producer = static.StaticProducer(None, fileObject)
 
634
        producer.stopProducing()
 
635
        self.assertTrue(fileObject.closed)
 
636
 
 
637
 
 
638
    def test_stopProducingSetsRequestToNone(self):
 
639
        """
 
640
        L{StaticProducer.stopProducing} sets the request instance variable to
 
641
        None, which indicates to subclasses' resumeProducing methods that no
 
642
        more data should be produced.
 
643
        """
 
644
        fileObject = StringIO.StringIO()
 
645
        producer = static.StaticProducer(DummyRequest([]), fileObject)
 
646
        producer.stopProducing()
 
647
        self.assertIdentical(None, producer.request)
 
648
 
 
649
 
 
650
 
 
651
class NoRangeStaticProducerTests(TestCase):
 
652
    """
 
653
    Tests for L{NoRangeStaticProducer}.
 
654
    """
 
655
 
 
656
    def test_implementsIPullProducer(self):
 
657
        """
 
658
        L{NoRangeStaticProducer} implements L{IPullProducer}.
 
659
        """
 
660
        verifyObject(
 
661
            interfaces.IPullProducer,
 
662
            static.NoRangeStaticProducer(None, None))
 
663
 
 
664
 
 
665
    def test_resumeProducingProducesContent(self):
 
666
        """
 
667
        L{NoRangeStaticProducer.resumeProducing} writes content from the
 
668
        resource to the request.
 
669
        """
 
670
        request = DummyRequest([])
 
671
        content = 'abcdef'
 
672
        producer = static.NoRangeStaticProducer(
 
673
            request, StringIO.StringIO(content))
 
674
        # start calls registerProducer on the DummyRequest, which pulls all
 
675
        # output from the producer and so we just need this one call.
 
676
        producer.start()
 
677
        self.assertEqual(content, ''.join(request.written))
 
678
 
 
679
 
 
680
    def test_resumeProducingBuffersOutput(self):
 
681
        """
 
682
        L{NoRangeStaticProducer.start} writes at most
 
683
        C{abstract.FileDescriptor.bufferSize} bytes of content from the
 
684
        resource to the request at once.
 
685
        """
 
686
        request = DummyRequest([])
 
687
        bufferSize = abstract.FileDescriptor.bufferSize
 
688
        content = 'a' * (2*bufferSize + 1)
 
689
        producer = static.NoRangeStaticProducer(
 
690
            request, StringIO.StringIO(content))
 
691
        # start calls registerProducer on the DummyRequest, which pulls all
 
692
        # output from the producer and so we just need this one call.
 
693
        producer.start()
 
694
        expected = [
 
695
            content[0:bufferSize],
 
696
            content[bufferSize:2*bufferSize],
 
697
            content[2*bufferSize:]
 
698
            ]
 
699
        self.assertEqual(expected, request.written)
 
700
 
 
701
 
 
702
    def test_finishCalledWhenDone(self):
 
703
        """
 
704
        L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
 
705
        after it is done producing content.
 
706
        """
 
707
        request = DummyRequest([])
 
708
        finishDeferred = request.notifyFinish()
 
709
        callbackList = []
 
710
        finishDeferred.addCallback(callbackList.append)
 
711
        producer = static.NoRangeStaticProducer(
 
712
            request, StringIO.StringIO('abcdef'))
 
713
        # start calls registerProducer on the DummyRequest, which pulls all
 
714
        # output from the producer and so we just need this one call.
 
715
        producer.start()
 
716
        self.assertEqual([None], callbackList)
 
717
 
 
718
 
 
719
 
 
720
class SingleRangeStaticProducerTests(TestCase):
 
721
    """
 
722
    Tests for L{SingleRangeStaticProducer}.
 
723
    """
 
724
 
 
725
    def test_implementsIPullProducer(self):
 
726
        """
 
727
        L{SingleRangeStaticProducer} implements L{IPullProducer}.
 
728
        """
 
729
        verifyObject(
 
730
            interfaces.IPullProducer,
 
731
            static.SingleRangeStaticProducer(None, None, None, None))
 
732
 
 
733
 
 
734
    def test_resumeProducingProducesContent(self):
 
735
        """
 
736
        L{SingleRangeStaticProducer.resumeProducing} writes the given amount
 
737
        of content, starting at the given offset, from the resource to the
 
738
        request.
 
739
        """
 
740
        request = DummyRequest([])
 
741
        content = 'abcdef'
 
742
        producer = static.SingleRangeStaticProducer(
 
743
            request, StringIO.StringIO(content), 1, 3)
 
744
        # DummyRequest.registerProducer pulls all output from the producer, so
 
745
        # we just need to call start.
 
746
        producer.start()
 
747
        self.assertEqual(content[1:4], ''.join(request.written))
 
748
 
 
749
 
 
750
    def test_resumeProducingBuffersOutput(self):
 
751
        """
 
752
        L{SingleRangeStaticProducer.start} writes at most
 
753
        C{abstract.FileDescriptor.bufferSize} bytes of content from the
 
754
        resource to the request at once.
 
755
        """
 
756
        request = DummyRequest([])
 
757
        bufferSize = abstract.FileDescriptor.bufferSize
 
758
        content = 'abc' * bufferSize
 
759
        producer = static.SingleRangeStaticProducer(
 
760
            request, StringIO.StringIO(content), 1, bufferSize+10)
 
761
        # DummyRequest.registerProducer pulls all output from the producer, so
 
762
        # we just need to call start.
 
763
        producer.start()
 
764
        expected = [
 
765
            content[1:bufferSize+1],
 
766
            content[bufferSize+1:bufferSize+11],
 
767
            ]
 
768
        self.assertEqual(expected, request.written)
 
769
 
 
770
 
 
771
    def test_finishCalledWhenDone(self):
 
772
        """
 
773
        L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
 
774
        request after it is done producing content.
 
775
        """
 
776
        request = DummyRequest([])
 
777
        finishDeferred = request.notifyFinish()
 
778
        callbackList = []
 
779
        finishDeferred.addCallback(callbackList.append)
 
780
        producer = static.SingleRangeStaticProducer(
 
781
            request, StringIO.StringIO('abcdef'), 1, 1)
 
782
        # start calls registerProducer on the DummyRequest, which pulls all
 
783
        # output from the producer and so we just need this one call.
 
784
        producer.start()
 
785
        self.assertEqual([None], callbackList)
 
786
 
 
787
 
 
788
 
 
789
class MultipleRangeStaticProducerTests(TestCase):
 
790
    """
 
791
    Tests for L{MultipleRangeStaticProducer}.
 
792
    """
 
793
 
 
794
    def test_implementsIPullProducer(self):
 
795
        """
 
796
        L{MultipleRangeStaticProducer} implements L{IPullProducer}.
 
797
        """
 
798
        verifyObject(
 
799
            interfaces.IPullProducer,
 
800
            static.MultipleRangeStaticProducer(None, None, None))
 
801
 
 
802
 
 
803
    def test_resumeProducingProducesContent(self):
 
804
        """
 
805
        L{MultipleRangeStaticProducer.resumeProducing} writes the requested
 
806
        chunks of content from the resource to the request, with the supplied
 
807
        boundaries in between each chunk.
 
808
        """
 
809
        request = DummyRequest([])
 
810
        content = 'abcdef'
 
811
        producer = static.MultipleRangeStaticProducer(
 
812
            request, StringIO.StringIO(content), [('1', 1, 3), ('2', 5, 1)])
 
813
        # DummyRequest.registerProducer pulls all output from the producer, so
 
814
        # we just need to call start.
 
815
        producer.start()
 
816
        self.assertEqual('1bcd2f', ''.join(request.written))
 
817
 
 
818
 
 
819
    def test_resumeProducingBuffersOutput(self):
 
820
        """
 
821
        L{MultipleRangeStaticProducer.start} writes about
 
822
        C{abstract.FileDescriptor.bufferSize} bytes of content from the
 
823
        resource to the request at once.
 
824
 
 
825
        To be specific about the 'about' above: it can write slightly more,
 
826
        for example in the case where the first boundary plus the first chunk
 
827
        is less than C{bufferSize} but first boundary plus the first chunk
 
828
        plus the second boundary is more, but this is unimportant as in
 
829
        practice the boundaries are fairly small.  On the other side, it is
 
830
        important for performance to bundle up several small chunks into one
 
831
        call to request.write.
 
832
        """
 
833
        request = DummyRequest([])
 
834
        content = '0123456789' * 2
 
835
        producer = static.MultipleRangeStaticProducer(
 
836
            request, StringIO.StringIO(content),
 
837
            [('a', 0, 2), ('b', 5, 10), ('c', 0, 0)])
 
838
        producer.bufferSize = 10
 
839
        # DummyRequest.registerProducer pulls all output from the producer, so
 
840
        # we just need to call start.
 
841
        producer.start()
 
842
        expected = [
 
843
            'a' + content[0:2] + 'b' + content[5:11],
 
844
            content[11:15] + 'c',
 
845
            ]
 
846
        self.assertEqual(expected, request.written)
 
847
 
 
848
 
 
849
    def test_finishCalledWhenDone(self):
 
850
        """
 
851
        L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
 
852
        request after it is done producing content.
 
853
        """
 
854
        request = DummyRequest([])
 
855
        finishDeferred = request.notifyFinish()
 
856
        callbackList = []
 
857
        finishDeferred.addCallback(callbackList.append)
 
858
        producer = static.MultipleRangeStaticProducer(
 
859
            request, StringIO.StringIO('abcdef'), [('', 1, 2)])
 
860
        # start calls registerProducer on the DummyRequest, which pulls all
 
861
        # output from the producer and so we just need this one call.
 
862
        producer.start()
 
863
        self.assertEqual([None], callbackList)
 
864
 
 
865
 
 
866
 
 
867
class RangeTests(TestCase):
 
868
    """
 
869
    Tests for I{Range-Header} support in L{twisted.web.static.File}.
 
870
 
 
871
    @type file: L{file}
 
872
    @ivar file: Temporary (binary) file containing the content to be served.
 
873
 
 
874
    @type resource: L{static.File}
 
875
    @ivar resource: A leaf web resource using C{file} as content.
 
876
 
 
877
    @type request: L{DummyRequest}
 
878
    @ivar request: A fake request, requesting C{resource}.
 
879
 
 
880
    @type catcher: L{list}
 
881
    @ivar catcher: List which gathers all log information.
 
882
    """
 
883
    def setUp(self):
 
884
        """
 
885
        Create a temporary file with a fixed payload of 64 bytes.  Create a
 
886
        resource for that file and create a request which will be for that
 
887
        resource.  Each test can set a different range header to test different
 
888
        aspects of the implementation.
 
889
        """
 
890
        path = FilePath(self.mktemp())
 
891
        # This is just a jumble of random stuff.  It's supposed to be a good
 
892
        # set of data for this test, particularly in order to avoid
 
893
        # accidentally seeing the right result by having a byte sequence
 
894
        # repeated at different locations or by having byte values which are
 
895
        # somehow correlated with their position in the string.
 
896
        self.payload = ('\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
 
897
                        '\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
 
898
                        '\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
 
899
                        '&\xfd%\xdd\x82q/A\x10Y\x8b')
 
900
        path.setContent(self.payload)
 
901
        self.file = path.open()
 
902
        self.resource = static.File(self.file.name)
 
903
        self.resource.isLeaf = 1
 
904
        self.request = DummyRequest([''])
 
905
        self.request.uri = self.file.name
 
906
        self.catcher = []
 
907
        log.addObserver(self.catcher.append)
 
908
 
 
909
 
 
910
    def tearDown(self):
 
911
        """
 
912
        Clean up the resource file and the log observer.
 
913
        """
 
914
        self.file.close()
 
915
        log.removeObserver(self.catcher.append)
 
916
 
 
917
 
 
918
    def _assertLogged(self, expected):
 
919
        """
 
920
        Asserts that a given log message occurred with an expected message.
 
921
        """
 
922
        logItem = self.catcher.pop()
 
923
        self.assertEquals(logItem["message"][0], expected)
 
924
        self.assertEqual(
 
925
            self.catcher, [], "An additional log occured: %r" % (logItem,))
 
926
 
 
927
 
 
928
    def test_invalidRanges(self):
 
929
        """
 
930
        L{File._parseRangeHeader} raises L{ValueError} when passed
 
931
        syntactically invalid byte ranges.
 
932
        """
 
933
        f = self.resource._parseRangeHeader
 
934
 
 
935
        # there's no =
 
936
        self.assertRaises(ValueError, f, 'bytes')
 
937
 
 
938
        # unknown isn't a valid Bytes-Unit
 
939
        self.assertRaises(ValueError, f, 'unknown=1-2')
 
940
 
 
941
        # there's no - in =stuff
 
942
        self.assertRaises(ValueError, f, 'bytes=3')
 
943
 
 
944
        # both start and end are empty
 
945
        self.assertRaises(ValueError, f, 'bytes=-')
 
946
 
 
947
        # start isn't an integer
 
948
        self.assertRaises(ValueError, f, 'bytes=foo-')
 
949
 
 
950
        # end isn't an integer
 
951
        self.assertRaises(ValueError, f, 'bytes=-foo')
 
952
 
 
953
        # end isn't equal to or greater than start
 
954
        self.assertRaises(ValueError, f, 'bytes=5-4')
 
955
 
 
956
 
 
957
    def test_rangeMissingStop(self):
 
958
        """
 
959
        A single bytes range without an explicit stop position is parsed into a
 
960
        two-tuple giving the start position and C{None}.
 
961
        """
 
962
        self.assertEqual(
 
963
            self.resource._parseRangeHeader('bytes=0-'), [(0, None)])
 
964
 
 
965
 
 
966
    def test_rangeMissingStart(self):
 
967
        """
 
968
        A single bytes range without an explicit start position is parsed into
 
969
        a two-tuple of C{None} and the end position.
 
970
        """
 
971
        self.assertEqual(
 
972
            self.resource._parseRangeHeader('bytes=-3'), [(None, 3)])
 
973
 
 
974
 
 
975
    def test_range(self):
 
976
        """
 
977
        A single bytes range with explicit start and stop positions is parsed
 
978
        into a two-tuple of those positions.
 
979
        """
 
980
        self.assertEqual(
 
981
            self.resource._parseRangeHeader('bytes=2-5'), [(2, 5)])
 
982
 
 
983
 
 
984
    def test_rangeWithSpace(self):
 
985
        """
 
986
        A single bytes range with whitespace in allowed places is parsed in
 
987
        the same way as it would be without the whitespace.
 
988
        """
 
989
        self.assertEqual(
 
990
            self.resource._parseRangeHeader(' bytes=1-2 '), [(1, 2)])
 
991
        self.assertEqual(
 
992
            self.resource._parseRangeHeader('bytes =1-2 '), [(1, 2)])
 
993
        self.assertEqual(
 
994
            self.resource._parseRangeHeader('bytes= 1-2'), [(1, 2)])
 
995
        self.assertEqual(
 
996
            self.resource._parseRangeHeader('bytes=1 -2'), [(1, 2)])
 
997
        self.assertEqual(
 
998
            self.resource._parseRangeHeader('bytes=1- 2'), [(1, 2)])
 
999
        self.assertEqual(
 
1000
            self.resource._parseRangeHeader('bytes=1-2 '), [(1, 2)])
 
1001
 
 
1002
 
 
1003
    def test_nullRangeElements(self):
 
1004
        """
 
1005
        If there are multiple byte ranges but only one is non-null, the
 
1006
        non-null range is parsed and its start and stop returned.
 
1007
        """
 
1008
        self.assertEqual(
 
1009
            self.resource._parseRangeHeader('bytes=1-2,\r\n, ,\t'), [(1, 2)])
 
1010
 
 
1011
 
 
1012
    def test_multipleRanges(self):
 
1013
        """
 
1014
        If multiple byte ranges are specified their starts and stops are
 
1015
        returned.
 
1016
        """
 
1017
        self.assertEqual(
 
1018
            self.resource._parseRangeHeader('bytes=1-2,3-4'),
 
1019
            [(1, 2), (3, 4)])
 
1020
 
 
1021
 
 
1022
    def test_bodyLength(self):
 
1023
        """
 
1024
        A correct response to a range request is as long as the length of the
 
1025
        requested range.
 
1026
        """
 
1027
        self.request.headers['range'] = 'bytes=0-43'
 
1028
        self.resource.render(self.request)
 
1029
        self.assertEquals(len(''.join(self.request.written)), 44)
 
1030
 
 
1031
 
 
1032
    def test_invalidRangeRequest(self):
 
1033
        """
 
1034
        An incorrect range request (RFC 2616 defines a correct range request as
 
1035
        a Bytes-Unit followed by a '=' character followed by a specific range.
 
1036
        Only 'bytes' is defined) results in the range header value being logged
 
1037
        and a normal 200 response being sent.
 
1038
        """
 
1039
        self.request.headers['range'] = range = 'foobar=0-43'
 
1040
        self.resource.render(self.request)
 
1041
        expected = "Ignoring malformed Range header %r" % (range,)
 
1042
        self._assertLogged(expected)
 
1043
        self.assertEquals(''.join(self.request.written), self.payload)
 
1044
        self.assertEquals(self.request.responseCode, http.OK)
 
1045
        self.assertEquals(
 
1046
            self.request.outgoingHeaders['content-length'],
 
1047
            str(len(self.payload)))
 
1048
 
 
1049
 
 
1050
    def parseMultipartBody(self, body, boundary):
 
1051
        """
 
1052
        Parse C{body} as a multipart MIME response separated by C{boundary}.
 
1053
 
 
1054
        Note that this with fail the calling test on certain syntactic
 
1055
        problems.
 
1056
        """
 
1057
        sep = "\r\n--" + boundary
 
1058
        parts = ''.join(body).split(sep)
 
1059
        self.assertEquals('', parts[0])
 
1060
        self.assertEquals('--\r\n', parts[-1])
 
1061
        parsed_parts = []
 
1062
        for part in parts[1:-1]:
 
1063
            before, header1, header2, blank, partBody = part.split('\r\n', 4)
 
1064
            headers = header1 + '\n' + header2
 
1065
            self.assertEqual('', before)
 
1066
            self.assertEqual('', blank)
 
1067
            partContentTypeValue = re.search(
 
1068
                '^content-type: (.*)$', headers, re.I|re.M).group(1)
 
1069
            start, end, size = re.search(
 
1070
                '^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$',
 
1071
                headers, re.I|re.M).groups()
 
1072
            parsed_parts.append(
 
1073
                {'contentType': partContentTypeValue,
 
1074
                 'contentRange': (start, end, size),
 
1075
                 'body': partBody})
 
1076
        return parsed_parts
 
1077
 
 
1078
 
 
1079
    def test_multipleRangeRequest(self):
 
1080
        """
 
1081
        The response to a request for multipe bytes ranges is a MIME-ish
 
1082
        multipart response.
 
1083
        """
 
1084
        startEnds = [(0, 2), (20, 30), (40, 50)]
 
1085
        rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
 
1086
        self.request.headers['range'] = 'bytes=' + rangeHeaderValue
 
1087
        self.resource.render(self.request)
 
1088
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1089
        boundary = re.match(
 
1090
            '^multipart/byteranges; boundary="(.*)"$',
 
1091
            self.request.outgoingHeaders['content-type']).group(1)
 
1092
        parts = self.parseMultipartBody(''.join(self.request.written), boundary)
 
1093
        self.assertEquals(len(startEnds), len(parts))
 
1094
        for part, (s, e) in zip(parts, startEnds):
 
1095
            self.assertEqual(self.resource.type, part['contentType'])
 
1096
            start, end, size = part['contentRange']
 
1097
            self.assertEqual(int(start), s)
 
1098
            self.assertEqual(int(end), e)
 
1099
            self.assertEqual(int(size), self.resource.getFileSize())
 
1100
            self.assertEqual(self.payload[s:e+1], part['body'])
 
1101
 
 
1102
 
 
1103
    def test_multipleRangeRequestWithRangeOverlappingEnd(self):
 
1104
        """
 
1105
        The response to a request for multipe bytes ranges is a MIME-ish
 
1106
        multipart response, even when one of the ranged falls off the end of
 
1107
        the resource.
 
1108
        """
 
1109
        startEnds = [(0, 2), (40, len(self.payload) + 10)]
 
1110
        rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
 
1111
        self.request.headers['range'] = 'bytes=' + rangeHeaderValue
 
1112
        self.resource.render(self.request)
 
1113
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1114
        boundary = re.match(
 
1115
            '^multipart/byteranges; boundary="(.*)"$',
 
1116
            self.request.outgoingHeaders['content-type']).group(1)
 
1117
        parts = self.parseMultipartBody(''.join(self.request.written), boundary)
 
1118
        self.assertEquals(len(startEnds), len(parts))
 
1119
        for part, (s, e) in zip(parts, startEnds):
 
1120
            self.assertEqual(self.resource.type, part['contentType'])
 
1121
            start, end, size = part['contentRange']
 
1122
            self.assertEqual(int(start), s)
 
1123
            self.assertEqual(int(end), min(e, self.resource.getFileSize()-1))
 
1124
            self.assertEqual(int(size), self.resource.getFileSize())
 
1125
            self.assertEqual(self.payload[s:e+1], part['body'])
 
1126
 
 
1127
 
 
1128
    def test_implicitEnd(self):
 
1129
        """
 
1130
        If the end byte position is omitted, then it is treated as if the
 
1131
        length of the resource was specified by the end byte position.
 
1132
        """
 
1133
        self.request.headers['range'] = 'bytes=23-'
 
1134
        self.resource.render(self.request)
 
1135
        self.assertEquals(''.join(self.request.written), self.payload[23:])
 
1136
        self.assertEquals(len(''.join(self.request.written)), 41)
 
1137
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1138
        self.assertEquals(
 
1139
            self.request.outgoingHeaders['content-range'], 'bytes 23-63/64')
 
1140
        self.assertEquals(self.request.outgoingHeaders['content-length'], '41')
 
1141
 
 
1142
 
 
1143
    def test_implicitStart(self):
 
1144
        """
 
1145
        If the start byte position is omitted but the end byte position is
 
1146
        supplied, then the range is treated as requesting the last -N bytes of
 
1147
        the resource, where N is the end byte position.
 
1148
        """
 
1149
        self.request.headers['range'] = 'bytes=-17'
 
1150
        self.resource.render(self.request)
 
1151
        self.assertEquals(''.join(self.request.written), self.payload[-17:])
 
1152
        self.assertEquals(len(''.join(self.request.written)), 17)
 
1153
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1154
        self.assertEquals(
 
1155
            self.request.outgoingHeaders['content-range'], 'bytes 47-63/64')
 
1156
        self.assertEquals(self.request.outgoingHeaders['content-length'], '17')
 
1157
 
 
1158
 
 
1159
    def test_explicitRange(self):
 
1160
        """
 
1161
        A correct response to a bytes range header request from A to B starts
 
1162
        with the A'th byte and ends with (including) the B'th byte. The first
 
1163
        byte of a page is numbered with 0.
 
1164
        """
 
1165
        self.request.headers['range'] = 'bytes=3-43'
 
1166
        self.resource.render(self.request)
 
1167
        written = ''.join(self.request.written)
 
1168
        self.assertEquals(written, self.payload[3:44])
 
1169
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1170
        self.assertEquals(
 
1171
            self.request.outgoingHeaders['content-range'], 'bytes 3-43/64')
 
1172
        self.assertEquals(
 
1173
            str(len(written)), self.request.outgoingHeaders['content-length'])
 
1174
 
 
1175
 
 
1176
    def test_explicitRangeOverlappingEnd(self):
 
1177
        """
 
1178
        A correct response to a bytes range header request from A to B when B
 
1179
        is past the end of the resource starts with the A'th byte and ends
 
1180
        with the last byte of the resource. The first byte of a page is
 
1181
        numbered with 0.
 
1182
        """
 
1183
        self.request.headers['range'] = 'bytes=40-100'
 
1184
        self.resource.render(self.request)
 
1185
        written = ''.join(self.request.written)
 
1186
        self.assertEquals(written, self.payload[40:])
 
1187
        self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
 
1188
        self.assertEquals(
 
1189
            self.request.outgoingHeaders['content-range'], 'bytes 40-63/64')
 
1190
        self.assertEquals(
 
1191
            str(len(written)), self.request.outgoingHeaders['content-length'])
 
1192
 
 
1193
 
 
1194
    def test_statusCodeRequestedRangeNotSatisfiable(self):
 
1195
        """
 
1196
        If a range is syntactically invalid due to the start being greater than
 
1197
        the end, the range header is ignored (the request is responded to as if
 
1198
        it were not present).
 
1199
        """
 
1200
        self.request.headers['range'] = 'bytes=20-13'
 
1201
        self.resource.render(self.request)
 
1202
        self.assertEquals(self.request.responseCode, http.OK)
 
1203
        self.assertEquals(''.join(self.request.written), self.payload)
 
1204
        self.assertEquals(
 
1205
            self.request.outgoingHeaders['content-length'],
 
1206
            str(len(self.payload)))
 
1207
 
 
1208
 
 
1209
    def test_invalidStartBytePos(self):
 
1210
        """
 
1211
        If a range is unsatisfiable due to the start not being less than the
 
1212
        length of the resource, the response is 416 (Requested range not
 
1213
        satisfiable) and no data is written to the response body (RFC 2616,
 
1214
        section 14.35.1).
 
1215
        """
 
1216
        self.request.headers['range'] = 'bytes=67-108'
 
1217
        self.resource.render(self.request)
 
1218
        self.assertEquals(
 
1219
            self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE)
 
1220
        self.assertEquals(''.join(self.request.written), '')
 
1221
        self.assertEquals(self.request.outgoingHeaders['content-length'], '0')
 
1222
        # Sections 10.4.17 and 14.16
 
1223
        self.assertEquals(
 
1224
            self.request.outgoingHeaders['content-range'],
 
1225
            'bytes */%d' % (len(self.payload),))
 
1226
 
 
1227
 
 
1228
 
 
1229
class DirectoryListerTest(TestCase):
 
1230
    """
 
1231
    Tests for L{static.DirectoryLister}.
 
1232
    """
 
1233
    def _request(self, uri):
 
1234
        request = DummyRequest([''])
 
1235
        request.uri = uri
 
1236
        return request
 
1237
 
 
1238
 
 
1239
    def test_renderHeader(self):
 
1240
        """
 
1241
        L{static.DirectoryLister} prints the request uri as header of the
 
1242
        rendered content.
 
1243
        """
 
1244
        path = FilePath(self.mktemp())
 
1245
        path.makedirs()
 
1246
 
 
1247
        lister = static.DirectoryLister(path.path)
 
1248
        data = lister.render(self._request('foo'))
 
1249
        self.assertIn("<h1>Directory listing for foo</h1>", data)
 
1250
        self.assertIn("<title>Directory listing for foo</title>", data)
 
1251
 
 
1252
 
 
1253
    def test_renderUnquoteHeader(self):
 
1254
        """
 
1255
        L{static.DirectoryLister} unquote the request uri before printing it.
 
1256
        """
 
1257
        path = FilePath(self.mktemp())
 
1258
        path.makedirs()
 
1259
 
 
1260
        lister = static.DirectoryLister(path.path)
 
1261
        data = lister.render(self._request('foo%20bar'))
 
1262
        self.assertIn("<h1>Directory listing for foo bar</h1>", data)
 
1263
        self.assertIn("<title>Directory listing for foo bar</title>", data)
 
1264
 
 
1265
 
 
1266
    def test_escapeHeader(self):
 
1267
        """
 
1268
        L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
 
1269
        request uri.
 
1270
        """
 
1271
        path = FilePath(self.mktemp())
 
1272
        path.makedirs()
 
1273
 
 
1274
        lister = static.DirectoryLister(path.path)
 
1275
        data = lister.render(self._request('foo%26bar'))
 
1276
        self.assertIn("<h1>Directory listing for foo&amp;bar</h1>", data)
 
1277
        self.assertIn("<title>Directory listing for foo&amp;bar</title>", data)
 
1278
 
 
1279
 
 
1280
    def test_renderFiles(self):
 
1281
        """
 
1282
        L{static.DirectoryLister} is able to list all the files inside a
 
1283
        directory.
 
1284
        """
 
1285
        path = FilePath(self.mktemp())
 
1286
        path.makedirs()
 
1287
        path.child('file1').setContent("content1")
 
1288
        path.child('file2').setContent("content2" * 1000)
 
1289
 
 
1290
        lister = static.DirectoryLister(path.path)
 
1291
        data = lister.render(self._request('foo'))
 
1292
        body = """<tr class="odd">
 
1293
    <td><a href="file1">file1</a></td>
 
1294
    <td>8B</td>
 
1295
    <td>[text/html]</td>
 
1296
    <td></td>
 
1297
</tr>
 
1298
<tr class="even">
 
1299
    <td><a href="file2">file2</a></td>
 
1300
    <td>7K</td>
 
1301
    <td>[text/html]</td>
 
1302
    <td></td>
 
1303
</tr>"""
 
1304
        self.assertIn(body, data)
 
1305
 
 
1306
 
 
1307
    def test_renderDirectories(self):
 
1308
        """
 
1309
        L{static.DirectoryLister} is able to list all the directories inside
 
1310
        a directory.
 
1311
        """
 
1312
        path = FilePath(self.mktemp())
 
1313
        path.makedirs()
 
1314
        path.child('dir1').makedirs()
 
1315
        path.child('dir2 & 3').makedirs()
 
1316
 
 
1317
        lister = static.DirectoryLister(path.path)
 
1318
        data = lister.render(self._request('foo'))
 
1319
        body = """<tr class="odd">
 
1320
    <td><a href="dir1/">dir1/</a></td>
 
1321
    <td></td>
 
1322
    <td>[Directory]</td>
 
1323
    <td></td>
 
1324
</tr>
 
1325
<tr class="even">
 
1326
    <td><a href="dir2%20%26%203/">dir2 &amp; 3/</a></td>
 
1327
    <td></td>
 
1328
    <td>[Directory]</td>
 
1329
    <td></td>
 
1330
</tr>"""
 
1331
        self.assertIn(body, data)
 
1332
 
 
1333
 
 
1334
    def test_renderFiltered(self):
 
1335
        """
 
1336
        L{static.DirectoryLister} takes a optional C{dirs} argument that
 
1337
        filter out the list of of directories and files printed.
 
1338
        """
 
1339
        path = FilePath(self.mktemp())
 
1340
        path.makedirs()
 
1341
        path.child('dir1').makedirs()
 
1342
        path.child('dir2').makedirs()
 
1343
        path.child('dir3').makedirs()
 
1344
        lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
 
1345
        data = lister.render(self._request('foo'))
 
1346
        body = """<tr class="odd">
 
1347
    <td><a href="dir1/">dir1/</a></td>
 
1348
    <td></td>
 
1349
    <td>[Directory]</td>
 
1350
    <td></td>
 
1351
</tr>
 
1352
<tr class="even">
 
1353
    <td><a href="dir3/">dir3/</a></td>
 
1354
    <td></td>
 
1355
    <td>[Directory]</td>
 
1356
    <td></td>
 
1357
</tr>"""
 
1358
        self.assertIn(body, data)
 
1359
 
 
1360
 
 
1361
    def test_oddAndEven(self):
 
1362
        """
 
1363
        L{static.DirectoryLister} gives an alternate class for each odd and
 
1364
        even rows in the table.
 
1365
        """
 
1366
        lister = static.DirectoryLister(None)
 
1367
        elements = [{"href": "", "text": "", "size": "", "type": "",
 
1368
                     "encoding": ""}  for i in xrange(5)]
 
1369
        content = lister._buildTableContent(elements)
 
1370
 
 
1371
        self.assertEquals(len(content), 5)
 
1372
        self.assertTrue(content[0].startswith('<tr class="odd">'))
 
1373
        self.assertTrue(content[1].startswith('<tr class="even">'))
 
1374
        self.assertTrue(content[2].startswith('<tr class="odd">'))
 
1375
        self.assertTrue(content[3].startswith('<tr class="even">'))
 
1376
        self.assertTrue(content[4].startswith('<tr class="odd">'))
 
1377
 
 
1378
 
 
1379
    def test_mimeTypeAndEncodings(self):
 
1380
        """
 
1381
        L{static.DirectoryLister} is able to detect mimetype and encoding of
 
1382
        listed files.
 
1383
        """
 
1384
        path = FilePath(self.mktemp())
 
1385
        path.makedirs()
 
1386
        path.child('file1.txt').setContent("file1")
 
1387
        path.child('file2.py').setContent("python")
 
1388
        path.child('file3.conf.gz').setContent("conf compressed")
 
1389
        path.child('file4.diff.bz2').setContent("diff compressed")
 
1390
        directory = os.listdir(path.path)
 
1391
        directory.sort()
 
1392
 
 
1393
        contentTypes = {
 
1394
            ".txt": "text/plain",
 
1395
            ".py": "text/python",
 
1396
            ".conf": "text/configuration",
 
1397
            ".diff": "text/diff"
 
1398
        }
 
1399
 
 
1400
        lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
 
1401
        dirs, files = lister._getFilesAndDirectories(directory)
 
1402
        self.assertEquals(dirs, [])
 
1403
        self.assertEquals(files, [
 
1404
            {'encoding': '',
 
1405
             'href': 'file1.txt',
 
1406
             'size': '5B',
 
1407
             'text': 'file1.txt',
 
1408
             'type': '[text/plain]'},
 
1409
            {'encoding': '',
 
1410
             'href': 'file2.py',
 
1411
             'size': '6B',
 
1412
             'text': 'file2.py',
 
1413
             'type': '[text/python]'},
 
1414
            {'encoding': '[gzip]',
 
1415
             'href': 'file3.conf.gz',
 
1416
             'size': '15B',
 
1417
             'text': 'file3.conf.gz',
 
1418
             'type': '[text/configuration]'},
 
1419
            {'encoding': '[bzip2]',
 
1420
             'href': 'file4.diff.bz2',
 
1421
             'size': '15B',
 
1422
             'text': 'file4.diff.bz2',
 
1423
             'type': '[text/diff]'}])
 
1424
 
 
1425
 
 
1426
    def test_brokenSymlink(self):
 
1427
        """
 
1428
        If on the file in the listing points to a broken symlink, it should not
 
1429
        be returned by L{static.DirectoryLister._getFilesAndDirectories}.
 
1430
        """
 
1431
        path = FilePath(self.mktemp())
 
1432
        path.makedirs()
 
1433
        file1 = path.child('file1')
 
1434
        file1.setContent("file1")
 
1435
        file1.linkTo(path.child("file2"))
 
1436
        file1.remove()
 
1437
 
 
1438
        lister = static.DirectoryLister(path.path)
 
1439
        directory = os.listdir(path.path)
 
1440
        directory.sort()
 
1441
        dirs, files = lister._getFilesAndDirectories(directory)
 
1442
        self.assertEquals(dirs, [])
 
1443
        self.assertEquals(files, [])
 
1444
 
 
1445
    if getattr(os, "symlink", None) is None:
 
1446
        test_brokenSymlink.skip = "No symlink support"
 
1447
 
 
1448
 
 
1449
    def test_childrenNotFound(self):
 
1450
        """
 
1451
        Any child resource of L{static.DirectoryLister} renders an HTTP
 
1452
        I{NOT FOUND} response code.
 
1453
        """
 
1454
        path = FilePath(self.mktemp())
 
1455
        path.makedirs()
 
1456
        lister = static.DirectoryLister(path.path)
 
1457
        request = self._request('')
 
1458
        child = resource.getChildForRequest(lister, request)
 
1459
        result = _render(child, request)
 
1460
        def cbRendered(ignored):
 
1461
            self.assertEquals(request.responseCode, http.NOT_FOUND)
 
1462
        result.addCallback(cbRendered)
 
1463
        return result
 
1464
 
 
1465
 
 
1466
    def test_repr(self):
 
1467
        """
 
1468
        L{static.DirectoryLister.__repr__} gives the path of the lister.
 
1469
        """
 
1470
        path = FilePath(self.mktemp())
 
1471
        lister = static.DirectoryLister(path.path)
 
1472
        self.assertEquals(repr(lister),
 
1473
                          "<DirectoryLister of %r>" % (path.path,))
 
1474
        self.assertEquals(str(lister),
 
1475
                          "<DirectoryLister of %r>" % (path.path,))
 
1476
 
 
1477
    def test_formatFileSize(self):
 
1478
        """
 
1479
        L{static.formatFileSize} format an amount of bytes into a more readable
 
1480
        format.
 
1481
        """
 
1482
        self.assertEquals(static.formatFileSize(0), "0B")
 
1483
        self.assertEquals(static.formatFileSize(123), "123B")
 
1484
        self.assertEquals(static.formatFileSize(4567), "4K")
 
1485
        self.assertEquals(static.formatFileSize(8900000), "8M")
 
1486
        self.assertEquals(static.formatFileSize(1234000000), "1G")
 
1487
        self.assertEquals(static.formatFileSize(1234567890000), "1149G")
 
1488
 
 
1489
 
 
1490
 
 
1491
class TestFileTransferDeprecated(TestCase):
 
1492
    """
 
1493
    L{static.FileTransfer} is deprecated.
 
1494
    """
 
1495
 
 
1496
    def test_deprecation(self):
 
1497
        """
 
1498
        Instantiation of L{FileTransfer} produces a deprecation warning.
 
1499
        """
 
1500
        static.FileTransfer(StringIO.StringIO(), 0, DummyRequest([]))
 
1501
        warnings = self.flushWarnings([self.test_deprecation])
 
1502
        self.assertEqual(len(warnings), 1)
 
1503
        self.assertEqual(warnings[0]['category'], DeprecationWarning)
 
1504
        self.assertEqual(
 
1505
            warnings[0]['message'],
 
1506
            'FileTransfer is deprecated since Twisted 9.0. '
 
1507
            'Use a subclass of StaticProducer instead.')