1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.web.static}.
8
import os, re, StringIO
10
from zope.interface.verify import verifyObject
12
from twisted.internet import abstract, interfaces
13
from twisted.python.compat import set
14
from twisted.python.runtime import platform
15
from twisted.python.filepath import FilePath
16
from twisted.python import log
17
from twisted.trial.unittest import TestCase
18
from twisted.web import static, http, script, resource
19
from twisted.web.server import UnsupportedMethod
20
from twisted.web.test.test_web import DummyRequest
21
from twisted.web.test._util import _render
24
class StaticDataTests(TestCase):
28
def test_headRequest(self):
30
L{Data.render} returns an empty response body for a I{HEAD} request.
32
data = static.Data("foo", "bar")
33
request = DummyRequest([''])
34
request.method = 'HEAD'
35
d = _render(data, request)
36
def cbRendered(ignored):
37
self.assertEqual(''.join(request.written), "")
38
d.addCallback(cbRendered)
42
def test_invalidMethod(self):
44
L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
47
data = static.Data("foo", "bar")
48
request = DummyRequest([''])
49
request.method = 'POST'
50
self.assertRaises(UnsupportedMethod, data.render, request)
54
class StaticFileTests(TestCase):
56
Tests for the basic behavior of L{File}.
58
def _render(self, resource, request):
59
return _render(resource, request)
62
def test_invalidMethod(self):
64
L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
67
request = DummyRequest([''])
68
request.method = 'POST'
69
path = FilePath(self.mktemp())
70
path.setContent("foo")
71
file = static.File(path.path)
72
self.assertRaises(UnsupportedMethod, file.render, request)
75
def test_notFound(self):
77
If a request is made which encounters a L{File} before a final segment
78
which does not correspond to any file in the path the L{File} was
79
created with, a not found response is sent.
81
base = FilePath(self.mktemp())
83
file = static.File(base.path)
85
request = DummyRequest(['foobar'])
86
child = resource.getChildForRequest(file, request)
88
d = self._render(child, request)
89
def cbRendered(ignored):
90
self.assertEqual(request.responseCode, 404)
91
d.addCallback(cbRendered)
95
def test_emptyChild(self):
97
The C{''} child of a L{File} which corresponds to a directory in the
98
filesystem is a L{DirectoryLister}.
100
base = FilePath(self.mktemp())
102
file = static.File(base.path)
104
request = DummyRequest([''])
105
child = resource.getChildForRequest(file, request)
106
self.assertIsInstance(child, static.DirectoryLister)
107
self.assertEqual(child.path, base.path)
110
def test_securityViolationNotFound(self):
112
If a request is made which encounters a L{File} before a final segment
113
which cannot be looked up in the filesystem due to security
114
considerations, a not found response is sent.
116
base = FilePath(self.mktemp())
118
file = static.File(base.path)
120
request = DummyRequest(['..'])
121
child = resource.getChildForRequest(file, request)
123
d = self._render(child, request)
124
def cbRendered(ignored):
125
self.assertEqual(request.responseCode, 404)
126
d.addCallback(cbRendered)
130
def test_forbiddenResource(self):
132
If the file in the filesystem which would satisfy a request cannot be
133
read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
135
base = FilePath(self.mktemp())
137
# Make sure we can delete the file later.
138
self.addCleanup(base.chmod, 0700)
140
# Get rid of our own read permission.
143
file = static.File(base.path)
144
request = DummyRequest([''])
145
d = self._render(file, request)
146
def cbRendered(ignored):
147
self.assertEqual(request.responseCode, 403)
148
d.addCallback(cbRendered)
150
if platform.isWindows():
151
test_forbiddenResource.skip = "Cannot remove read permission on Windows"
154
def test_indexNames(self):
156
If a request is made which encounters a L{File} before a final empty
157
segment, a file in the L{File} instance's C{indexNames} list which
158
exists in the path the L{File} was created with is served as the
159
response to the request.
161
base = FilePath(self.mktemp())
163
base.child("foo.bar").setContent("baz")
164
file = static.File(base.path)
165
file.indexNames = ['foo.bar']
167
request = DummyRequest([''])
168
child = resource.getChildForRequest(file, request)
170
d = self._render(child, request)
171
def cbRendered(ignored):
172
self.assertEqual(''.join(request.written), 'baz')
173
self.assertEqual(request.outgoingHeaders['content-length'], '3')
174
d.addCallback(cbRendered)
178
def test_staticFile(self):
180
If a request is made which encounters a L{File} before a final segment
181
which names a file in the path the L{File} was created with, that file
182
is served as the response to the request.
184
base = FilePath(self.mktemp())
186
base.child("foo.bar").setContent("baz")
187
file = static.File(base.path)
189
request = DummyRequest(['foo.bar'])
190
child = resource.getChildForRequest(file, request)
192
d = self._render(child, request)
193
def cbRendered(ignored):
194
self.assertEqual(''.join(request.written), 'baz')
195
self.assertEqual(request.outgoingHeaders['content-length'], '3')
196
d.addCallback(cbRendered)
200
def test_staticFileDeletedGetChild(self):
202
A L{static.File} created for a directory which does not exist should
203
return childNotFound from L{static.File.getChild}.
205
staticFile = static.File(self.mktemp())
206
request = DummyRequest(['foo.bar'])
207
child = staticFile.getChild("foo.bar", request)
208
self.assertEquals(child, staticFile.childNotFound)
211
def test_staticFileDeletedRender(self):
213
A L{static.File} created for a file which does not exist should render
214
its C{childNotFound} page.
216
staticFile = static.File(self.mktemp())
217
request = DummyRequest(['foo.bar'])
218
request2 = DummyRequest(['foo.bar'])
219
d = self._render(staticFile, request)
220
d2 = self._render(staticFile.childNotFound, request2)
221
def cbRendered2(ignored):
222
def cbRendered(ignored):
223
self.assertEquals(''.join(request.written),
224
''.join(request2.written))
225
d.addCallback(cbRendered)
227
d2.addCallback(cbRendered2)
231
def test_headRequest(self):
233
L{static.File.render} returns an empty response body for I{HEAD}
236
path = FilePath(self.mktemp())
237
path.setContent("foo")
238
file = static.File(path.path)
239
request = DummyRequest([''])
240
request.method = 'HEAD'
241
d = _render(file, request)
242
def cbRendered(ignored):
243
self.assertEqual("".join(request.written), "")
244
d.addCallback(cbRendered)
248
def test_processors(self):
250
If a request is made which encounters a L{File} before a final segment
251
which names a file with an extension which is in the L{File}'s
252
C{processors} mapping, the processor associated with that extension is
253
used to serve the response to the request.
255
base = FilePath(self.mktemp())
257
base.child("foo.bar").setContent(
258
"from twisted.web.static import Data\n"
259
"resource = Data('dynamic world','text/plain')\n")
261
file = static.File(base.path)
262
file.processors = {'.bar': script.ResourceScript}
263
request = DummyRequest(["foo.bar"])
264
child = resource.getChildForRequest(file, request)
266
d = self._render(child, request)
267
def cbRendered(ignored):
268
self.assertEqual(''.join(request.written), 'dynamic world')
269
self.assertEqual(request.outgoingHeaders['content-length'], '13')
270
d.addCallback(cbRendered)
274
def test_ignoreExt(self):
276
The list of ignored extensions can be set by passing a value to
277
L{File.__init__} or by calling L{File.ignoreExt} later.
279
file = static.File(".")
280
self.assertEqual(file.ignoredExts, [])
281
file.ignoreExt(".foo")
282
file.ignoreExt(".bar")
283
self.assertEqual(file.ignoredExts, [".foo", ".bar"])
285
file = static.File(".", ignoredExts=(".bar", ".baz"))
286
self.assertEqual(file.ignoredExts, [".bar", ".baz"])
289
def test_ignoredExtensionsIgnored(self):
291
A request for the I{base} child of a L{File} succeeds with a resource
292
for the I{base<extension>} file in the path the L{File} was created
293
with if such a file exists and the L{File} has been configured to
294
ignore the I{<extension>} extension.
296
base = FilePath(self.mktemp())
298
base.child('foo.bar').setContent('baz')
299
base.child('foo.quux').setContent('foobar')
300
file = static.File(base.path, ignoredExts=(".bar",))
302
request = DummyRequest(["foo"])
303
child = resource.getChildForRequest(file, request)
305
d = self._render(child, request)
306
def cbRendered(ignored):
307
self.assertEqual(''.join(request.written), 'baz')
308
d.addCallback(cbRendered)
312
def test_createPickleChild(self):
314
L{static.File.createPickleChild} is deprecated.
316
path = FilePath(self.mktemp())
318
static.File(path.path).createPickleChild("foo", None)
319
warnings = self.flushWarnings([self.test_createPickleChild])
320
self.assertEqual(warnings[0]['category'], DeprecationWarning)
322
warnings[0]['message'],
323
"File.createPickleChild is deprecated since Twisted 9.0. "
324
"Resource persistence is beyond the scope of Twisted Web.")
325
self.assertEqual(len(warnings), 1)
329
class StaticMakeProducerTests(TestCase):
331
Tests for L{File.makeProducer}.
335
def makeResourceWithContent(self, content, type=None, encoding=None):
337
Make a L{static.File} resource that has C{content} for its content.
339
@param content: The bytes to use as the contents of the resource.
340
@param type: Optional value for the content type of the resource.
342
fileName = self.mktemp()
343
fileObject = open(fileName, 'w')
344
fileObject.write(content)
346
resource = static.File(fileName)
347
resource.encoding = encoding
352
def contentHeaders(self, request):
354
Extract the content-* headers from the L{DummyRequest} C{request}.
356
This returns the subset of C{request.outgoingHeaders} of headers that
357
start with 'content-'.
360
for k, v in request.outgoingHeaders.iteritems():
361
if k.startswith('content-'):
362
contentHeaders[k] = v
363
return contentHeaders
366
def test_noRangeHeaderGivesNoRangeStaticProducer(self):
368
makeProducer when no Range header is set returns an instance of
369
NoRangeStaticProducer.
371
resource = self.makeResourceWithContent('')
372
request = DummyRequest([])
373
producer = resource.makeProducer(request, resource.openForReading())
374
self.assertIsInstance(producer, static.NoRangeStaticProducer)
377
def test_noRangeHeaderSets200OK(self):
379
makeProducer when no Range header is set sets the responseCode on the
382
resource = self.makeResourceWithContent('')
383
request = DummyRequest([])
384
resource.makeProducer(request, resource.openForReading())
385
self.assertEqual(http.OK, request.responseCode)
388
def test_noRangeHeaderSetsContentHeaders(self):
390
makeProducer when no Range header is set sets the Content-* headers
394
contentType = "text/plain"
395
contentEncoding = 'gzip'
396
resource = self.makeResourceWithContent(
397
'a'*length, type=contentType, encoding=contentEncoding)
398
request = DummyRequest([])
399
resource.makeProducer(request, resource.openForReading())
401
{'content-type': contentType, 'content-length': str(length),
402
'content-encoding': contentEncoding},
403
self.contentHeaders(request))
406
def test_singleRangeGivesSingleRangeStaticProducer(self):
408
makeProducer when the Range header requests a single byte range
409
returns an instance of SingleRangeStaticProducer.
411
request = DummyRequest([])
412
request.headers['range'] = 'bytes=1-3'
413
resource = self.makeResourceWithContent('abcdef')
414
producer = resource.makeProducer(request, resource.openForReading())
415
self.assertIsInstance(producer, static.SingleRangeStaticProducer)
418
def test_singleRangeSets206PartialContent(self):
420
makeProducer when the Range header requests a single, satisfiable byte
421
range sets the response code on the request to 'Partial Content'.
423
request = DummyRequest([])
424
request.headers['range'] = 'bytes=1-3'
425
resource = self.makeResourceWithContent('abcdef')
426
resource.makeProducer(request, resource.openForReading())
428
http.PARTIAL_CONTENT, request.responseCode)
431
def test_singleRangeSetsContentHeaders(self):
433
makeProducer when the Range header requests a single, satisfiable byte
434
range sets the Content-* headers appropriately.
436
request = DummyRequest([])
437
request.headers['range'] = 'bytes=1-3'
438
contentType = "text/plain"
439
contentEncoding = 'gzip'
440
resource = self.makeResourceWithContent('abcdef', type=contentType, encoding=contentEncoding)
441
resource.makeProducer(request, resource.openForReading())
443
{'content-type': contentType, 'content-encoding': contentEncoding,
444
'content-range': 'bytes 1-3/6', 'content-length': '3'},
445
self.contentHeaders(request))
448
def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
450
makeProducer still returns an instance of L{SingleRangeStaticProducer}
451
when the Range header requests a single unsatisfiable byte range.
453
request = DummyRequest([])
454
request.headers['range'] = 'bytes=4-10'
455
resource = self.makeResourceWithContent('abc')
456
producer = resource.makeProducer(request, resource.openForReading())
457
self.assertIsInstance(producer, static.SingleRangeStaticProducer)
460
def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
462
makeProducer sets the response code of the request to of 'Requested
463
Range Not Satisfiable' when the Range header requests a single
464
unsatisfiable byte range.
466
request = DummyRequest([])
467
request.headers['range'] = 'bytes=4-10'
468
resource = self.makeResourceWithContent('abc')
469
resource.makeProducer(request, resource.openForReading())
471
http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
474
def test_singleUnsatisfiableRangeSetsContentHeaders(self):
476
makeProducer when the Range header requests a single, unsatisfiable
477
byte range sets the Content-* headers appropriately.
479
request = DummyRequest([])
480
request.headers['range'] = 'bytes=4-10'
481
contentType = "text/plain"
482
resource = self.makeResourceWithContent('abc', type=contentType)
483
resource.makeProducer(request, resource.openForReading())
485
{'content-type': 'text/plain', 'content-length': '0',
486
'content-range': 'bytes */3'},
487
self.contentHeaders(request))
490
def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
492
makeProducer when the Range header requests a single byte range that
493
partly overlaps the resource sets the Content-* headers appropriately.
495
request = DummyRequest([])
496
request.headers['range'] = 'bytes=2-10'
497
contentType = "text/plain"
498
resource = self.makeResourceWithContent('abc', type=contentType)
499
resource.makeProducer(request, resource.openForReading())
501
{'content-type': 'text/plain', 'content-length': '1',
502
'content-range': 'bytes 2-2/3'},
503
self.contentHeaders(request))
506
def test_multipleRangeGivesMultipleRangeStaticProducer(self):
508
makeProducer when the Range header requests a single byte range
509
returns an instance of MultipleRangeStaticProducer.
511
request = DummyRequest([])
512
request.headers['range'] = 'bytes=1-3,5-6'
513
resource = self.makeResourceWithContent('abcdef')
514
producer = resource.makeProducer(request, resource.openForReading())
515
self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
518
def test_multipleRangeSets206PartialContent(self):
520
makeProducer when the Range header requests a multiple satisfiable
521
byte ranges sets the response code on the request to 'Partial
524
request = DummyRequest([])
525
request.headers['range'] = 'bytes=1-3,5-6'
526
resource = self.makeResourceWithContent('abcdef')
527
resource.makeProducer(request, resource.openForReading())
529
http.PARTIAL_CONTENT, request.responseCode)
532
def test_mutipleRangeSetsContentHeaders(self):
534
makeProducer when the Range header requests a single, satisfiable byte
535
range sets the Content-* headers appropriately.
537
request = DummyRequest([])
538
request.headers['range'] = 'bytes=1-3,5-6'
539
resource = self.makeResourceWithContent(
540
'abcdefghijkl', encoding='gzip')
541
producer = resource.makeProducer(request, resource.openForReading())
542
contentHeaders = self.contentHeaders(request)
543
# The only content-* headers set are content-type and content-length.
545
set(['content-length', 'content-type']),
546
set(contentHeaders.keys()))
547
# The content-length depends on the boundary used in the response.
549
for boundary, offset, size in producer.rangeInfo:
550
expectedLength += len(boundary)
551
self.assertEqual(expectedLength, contentHeaders['content-length'])
552
# Content-type should be set to a value indicating a multipart
553
# response and the boundary used to separate the parts.
554
self.assertIn('content-type', contentHeaders)
555
contentType = contentHeaders['content-type']
556
self.assertNotIdentical(
558
'multipart/byteranges; boundary="[^"]*"\Z', contentType))
559
# Content-encoding is not set in the response to a multiple range
560
# response, which is a bit wussy but works well enough with the way
561
# static.File does content-encodings...
562
self.assertNotIn('content-encoding', contentHeaders)
565
def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
567
makeProducer still returns an instance of L{SingleRangeStaticProducer}
568
when the Range header requests multiple ranges, none of which are
571
request = DummyRequest([])
572
request.headers['range'] = 'bytes=10-12,15-20'
573
resource = self.makeResourceWithContent('abc')
574
producer = resource.makeProducer(request, resource.openForReading())
575
self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
578
def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
580
makeProducer sets the response code of the request to of 'Requested
581
Range Not Satisfiable' when the Range header requests multiple ranges,
582
none of which are satisfiable.
584
request = DummyRequest([])
585
request.headers['range'] = 'bytes=10-12,15-20'
586
resource = self.makeResourceWithContent('abc')
587
resource.makeProducer(request, resource.openForReading())
589
http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
592
def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
594
makeProducer when the Range header requests multiple ranges, none of
595
which are satisfiable, sets the Content-* headers appropriately.
597
request = DummyRequest([])
598
request.headers['range'] = 'bytes=4-10'
599
contentType = "text/plain"
600
request.headers['range'] = 'bytes=10-12,15-20'
601
resource = self.makeResourceWithContent('abc', type=contentType)
602
resource.makeProducer(request, resource.openForReading())
604
{'content-length': '0', 'content-range': 'bytes */3'},
605
self.contentHeaders(request))
608
def test_oneSatisfiableRangeIsEnough(self):
610
makeProducer when the Range header requests multiple ranges, at least
611
one of which matches, sets the response code to 'Partial Content'.
613
request = DummyRequest([])
614
request.headers['range'] = 'bytes=1-3,100-200'
615
resource = self.makeResourceWithContent('abcdef')
616
resource.makeProducer(request, resource.openForReading())
618
http.PARTIAL_CONTENT, request.responseCode)
622
class StaticProducerTests(TestCase):
624
Tests for the abstract L{StaticProducer}.
627
def test_stopProducingClosesFile(self):
629
L{StaticProducer.stopProducing} closes the file object the producer is
632
fileObject = StringIO.StringIO()
633
producer = static.StaticProducer(None, fileObject)
634
producer.stopProducing()
635
self.assertTrue(fileObject.closed)
638
def test_stopProducingSetsRequestToNone(self):
640
L{StaticProducer.stopProducing} sets the request instance variable to
641
None, which indicates to subclasses' resumeProducing methods that no
642
more data should be produced.
644
fileObject = StringIO.StringIO()
645
producer = static.StaticProducer(DummyRequest([]), fileObject)
646
producer.stopProducing()
647
self.assertIdentical(None, producer.request)
651
class NoRangeStaticProducerTests(TestCase):
653
Tests for L{NoRangeStaticProducer}.
656
def test_implementsIPullProducer(self):
658
L{NoRangeStaticProducer} implements L{IPullProducer}.
661
interfaces.IPullProducer,
662
static.NoRangeStaticProducer(None, None))
665
def test_resumeProducingProducesContent(self):
667
L{NoRangeStaticProducer.resumeProducing} writes content from the
668
resource to the request.
670
request = DummyRequest([])
672
producer = static.NoRangeStaticProducer(
673
request, StringIO.StringIO(content))
674
# start calls registerProducer on the DummyRequest, which pulls all
675
# output from the producer and so we just need this one call.
677
self.assertEqual(content, ''.join(request.written))
680
def test_resumeProducingBuffersOutput(self):
682
L{NoRangeStaticProducer.start} writes at most
683
C{abstract.FileDescriptor.bufferSize} bytes of content from the
684
resource to the request at once.
686
request = DummyRequest([])
687
bufferSize = abstract.FileDescriptor.bufferSize
688
content = 'a' * (2*bufferSize + 1)
689
producer = static.NoRangeStaticProducer(
690
request, StringIO.StringIO(content))
691
# start calls registerProducer on the DummyRequest, which pulls all
692
# output from the producer and so we just need this one call.
695
content[0:bufferSize],
696
content[bufferSize:2*bufferSize],
697
content[2*bufferSize:]
699
self.assertEqual(expected, request.written)
702
def test_finishCalledWhenDone(self):
704
L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
705
after it is done producing content.
707
request = DummyRequest([])
708
finishDeferred = request.notifyFinish()
710
finishDeferred.addCallback(callbackList.append)
711
producer = static.NoRangeStaticProducer(
712
request, StringIO.StringIO('abcdef'))
713
# start calls registerProducer on the DummyRequest, which pulls all
714
# output from the producer and so we just need this one call.
716
self.assertEqual([None], callbackList)
720
class SingleRangeStaticProducerTests(TestCase):
722
Tests for L{SingleRangeStaticProducer}.
725
def test_implementsIPullProducer(self):
727
L{SingleRangeStaticProducer} implements L{IPullProducer}.
730
interfaces.IPullProducer,
731
static.SingleRangeStaticProducer(None, None, None, None))
734
def test_resumeProducingProducesContent(self):
736
L{SingleRangeStaticProducer.resumeProducing} writes the given amount
737
of content, starting at the given offset, from the resource to the
740
request = DummyRequest([])
742
producer = static.SingleRangeStaticProducer(
743
request, StringIO.StringIO(content), 1, 3)
744
# DummyRequest.registerProducer pulls all output from the producer, so
745
# we just need to call start.
747
self.assertEqual(content[1:4], ''.join(request.written))
750
def test_resumeProducingBuffersOutput(self):
752
L{SingleRangeStaticProducer.start} writes at most
753
C{abstract.FileDescriptor.bufferSize} bytes of content from the
754
resource to the request at once.
756
request = DummyRequest([])
757
bufferSize = abstract.FileDescriptor.bufferSize
758
content = 'abc' * bufferSize
759
producer = static.SingleRangeStaticProducer(
760
request, StringIO.StringIO(content), 1, bufferSize+10)
761
# DummyRequest.registerProducer pulls all output from the producer, so
762
# we just need to call start.
765
content[1:bufferSize+1],
766
content[bufferSize+1:bufferSize+11],
768
self.assertEqual(expected, request.written)
771
def test_finishCalledWhenDone(self):
773
L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
774
request after it is done producing content.
776
request = DummyRequest([])
777
finishDeferred = request.notifyFinish()
779
finishDeferred.addCallback(callbackList.append)
780
producer = static.SingleRangeStaticProducer(
781
request, StringIO.StringIO('abcdef'), 1, 1)
782
# start calls registerProducer on the DummyRequest, which pulls all
783
# output from the producer and so we just need this one call.
785
self.assertEqual([None], callbackList)
789
class MultipleRangeStaticProducerTests(TestCase):
791
Tests for L{MultipleRangeStaticProducer}.
794
def test_implementsIPullProducer(self):
796
L{MultipleRangeStaticProducer} implements L{IPullProducer}.
799
interfaces.IPullProducer,
800
static.MultipleRangeStaticProducer(None, None, None))
803
def test_resumeProducingProducesContent(self):
805
L{MultipleRangeStaticProducer.resumeProducing} writes the requested
806
chunks of content from the resource to the request, with the supplied
807
boundaries in between each chunk.
809
request = DummyRequest([])
811
producer = static.MultipleRangeStaticProducer(
812
request, StringIO.StringIO(content), [('1', 1, 3), ('2', 5, 1)])
813
# DummyRequest.registerProducer pulls all output from the producer, so
814
# we just need to call start.
816
self.assertEqual('1bcd2f', ''.join(request.written))
819
def test_resumeProducingBuffersOutput(self):
821
L{MultipleRangeStaticProducer.start} writes about
822
C{abstract.FileDescriptor.bufferSize} bytes of content from the
823
resource to the request at once.
825
To be specific about the 'about' above: it can write slightly more,
826
for example in the case where the first boundary plus the first chunk
827
is less than C{bufferSize} but first boundary plus the first chunk
828
plus the second boundary is more, but this is unimportant as in
829
practice the boundaries are fairly small. On the other side, it is
830
important for performance to bundle up several small chunks into one
831
call to request.write.
833
request = DummyRequest([])
834
content = '0123456789' * 2
835
producer = static.MultipleRangeStaticProducer(
836
request, StringIO.StringIO(content),
837
[('a', 0, 2), ('b', 5, 10), ('c', 0, 0)])
838
producer.bufferSize = 10
839
# DummyRequest.registerProducer pulls all output from the producer, so
840
# we just need to call start.
843
'a' + content[0:2] + 'b' + content[5:11],
844
content[11:15] + 'c',
846
self.assertEqual(expected, request.written)
849
def test_finishCalledWhenDone(self):
851
L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
852
request after it is done producing content.
854
request = DummyRequest([])
855
finishDeferred = request.notifyFinish()
857
finishDeferred.addCallback(callbackList.append)
858
producer = static.MultipleRangeStaticProducer(
859
request, StringIO.StringIO('abcdef'), [('', 1, 2)])
860
# start calls registerProducer on the DummyRequest, which pulls all
861
# output from the producer and so we just need this one call.
863
self.assertEqual([None], callbackList)
867
class RangeTests(TestCase):
869
Tests for I{Range-Header} support in L{twisted.web.static.File}.
872
@ivar file: Temporary (binary) file containing the content to be served.
874
@type resource: L{static.File}
875
@ivar resource: A leaf web resource using C{file} as content.
877
@type request: L{DummyRequest}
878
@ivar request: A fake request, requesting C{resource}.
880
@type catcher: L{list}
881
@ivar catcher: List which gathers all log information.
885
Create a temporary file with a fixed payload of 64 bytes. Create a
886
resource for that file and create a request which will be for that
887
resource. Each test can set a different range header to test different
888
aspects of the implementation.
890
path = FilePath(self.mktemp())
891
# This is just a jumble of random stuff. It's supposed to be a good
892
# set of data for this test, particularly in order to avoid
893
# accidentally seeing the right result by having a byte sequence
894
# repeated at different locations or by having byte values which are
895
# somehow correlated with their position in the string.
896
self.payload = ('\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
897
'\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
898
'\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
899
'&\xfd%\xdd\x82q/A\x10Y\x8b')
900
path.setContent(self.payload)
901
self.file = path.open()
902
self.resource = static.File(self.file.name)
903
self.resource.isLeaf = 1
904
self.request = DummyRequest([''])
905
self.request.uri = self.file.name
907
log.addObserver(self.catcher.append)
912
Clean up the resource file and the log observer.
915
log.removeObserver(self.catcher.append)
918
def _assertLogged(self, expected):
920
Asserts that a given log message occurred with an expected message.
922
logItem = self.catcher.pop()
923
self.assertEquals(logItem["message"][0], expected)
925
self.catcher, [], "An additional log occured: %r" % (logItem,))
928
def test_invalidRanges(self):
930
L{File._parseRangeHeader} raises L{ValueError} when passed
931
syntactically invalid byte ranges.
933
f = self.resource._parseRangeHeader
936
self.assertRaises(ValueError, f, 'bytes')
938
# unknown isn't a valid Bytes-Unit
939
self.assertRaises(ValueError, f, 'unknown=1-2')
941
# there's no - in =stuff
942
self.assertRaises(ValueError, f, 'bytes=3')
944
# both start and end are empty
945
self.assertRaises(ValueError, f, 'bytes=-')
947
# start isn't an integer
948
self.assertRaises(ValueError, f, 'bytes=foo-')
950
# end isn't an integer
951
self.assertRaises(ValueError, f, 'bytes=-foo')
953
# end isn't equal to or greater than start
954
self.assertRaises(ValueError, f, 'bytes=5-4')
957
def test_rangeMissingStop(self):
959
A single bytes range without an explicit stop position is parsed into a
960
two-tuple giving the start position and C{None}.
963
self.resource._parseRangeHeader('bytes=0-'), [(0, None)])
966
def test_rangeMissingStart(self):
968
A single bytes range without an explicit start position is parsed into
969
a two-tuple of C{None} and the end position.
972
self.resource._parseRangeHeader('bytes=-3'), [(None, 3)])
975
def test_range(self):
977
A single bytes range with explicit start and stop positions is parsed
978
into a two-tuple of those positions.
981
self.resource._parseRangeHeader('bytes=2-5'), [(2, 5)])
984
def test_rangeWithSpace(self):
986
A single bytes range with whitespace in allowed places is parsed in
987
the same way as it would be without the whitespace.
990
self.resource._parseRangeHeader(' bytes=1-2 '), [(1, 2)])
992
self.resource._parseRangeHeader('bytes =1-2 '), [(1, 2)])
994
self.resource._parseRangeHeader('bytes= 1-2'), [(1, 2)])
996
self.resource._parseRangeHeader('bytes=1 -2'), [(1, 2)])
998
self.resource._parseRangeHeader('bytes=1- 2'), [(1, 2)])
1000
self.resource._parseRangeHeader('bytes=1-2 '), [(1, 2)])
1003
def test_nullRangeElements(self):
1005
If there are multiple byte ranges but only one is non-null, the
1006
non-null range is parsed and its start and stop returned.
1009
self.resource._parseRangeHeader('bytes=1-2,\r\n, ,\t'), [(1, 2)])
1012
def test_multipleRanges(self):
1014
If multiple byte ranges are specified their starts and stops are
1018
self.resource._parseRangeHeader('bytes=1-2,3-4'),
1022
def test_bodyLength(self):
1024
A correct response to a range request is as long as the length of the
1027
self.request.headers['range'] = 'bytes=0-43'
1028
self.resource.render(self.request)
1029
self.assertEquals(len(''.join(self.request.written)), 44)
1032
def test_invalidRangeRequest(self):
1034
An incorrect range request (RFC 2616 defines a correct range request as
1035
a Bytes-Unit followed by a '=' character followed by a specific range.
1036
Only 'bytes' is defined) results in the range header value being logged
1037
and a normal 200 response being sent.
1039
self.request.headers['range'] = range = 'foobar=0-43'
1040
self.resource.render(self.request)
1041
expected = "Ignoring malformed Range header %r" % (range,)
1042
self._assertLogged(expected)
1043
self.assertEquals(''.join(self.request.written), self.payload)
1044
self.assertEquals(self.request.responseCode, http.OK)
1046
self.request.outgoingHeaders['content-length'],
1047
str(len(self.payload)))
1050
def parseMultipartBody(self, body, boundary):
1052
Parse C{body} as a multipart MIME response separated by C{boundary}.
1054
Note that this with fail the calling test on certain syntactic
1057
sep = "\r\n--" + boundary
1058
parts = ''.join(body).split(sep)
1059
self.assertEquals('', parts[0])
1060
self.assertEquals('--\r\n', parts[-1])
1062
for part in parts[1:-1]:
1063
before, header1, header2, blank, partBody = part.split('\r\n', 4)
1064
headers = header1 + '\n' + header2
1065
self.assertEqual('', before)
1066
self.assertEqual('', blank)
1067
partContentTypeValue = re.search(
1068
'^content-type: (.*)$', headers, re.I|re.M).group(1)
1069
start, end, size = re.search(
1070
'^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$',
1071
headers, re.I|re.M).groups()
1072
parsed_parts.append(
1073
{'contentType': partContentTypeValue,
1074
'contentRange': (start, end, size),
1079
def test_multipleRangeRequest(self):
1081
The response to a request for multipe bytes ranges is a MIME-ish
1084
startEnds = [(0, 2), (20, 30), (40, 50)]
1085
rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1086
self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1087
self.resource.render(self.request)
1088
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1089
boundary = re.match(
1090
'^multipart/byteranges; boundary="(.*)"$',
1091
self.request.outgoingHeaders['content-type']).group(1)
1092
parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1093
self.assertEquals(len(startEnds), len(parts))
1094
for part, (s, e) in zip(parts, startEnds):
1095
self.assertEqual(self.resource.type, part['contentType'])
1096
start, end, size = part['contentRange']
1097
self.assertEqual(int(start), s)
1098
self.assertEqual(int(end), e)
1099
self.assertEqual(int(size), self.resource.getFileSize())
1100
self.assertEqual(self.payload[s:e+1], part['body'])
1103
def test_multipleRangeRequestWithRangeOverlappingEnd(self):
1105
The response to a request for multipe bytes ranges is a MIME-ish
1106
multipart response, even when one of the ranged falls off the end of
1109
startEnds = [(0, 2), (40, len(self.payload) + 10)]
1110
rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1111
self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1112
self.resource.render(self.request)
1113
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1114
boundary = re.match(
1115
'^multipart/byteranges; boundary="(.*)"$',
1116
self.request.outgoingHeaders['content-type']).group(1)
1117
parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1118
self.assertEquals(len(startEnds), len(parts))
1119
for part, (s, e) in zip(parts, startEnds):
1120
self.assertEqual(self.resource.type, part['contentType'])
1121
start, end, size = part['contentRange']
1122
self.assertEqual(int(start), s)
1123
self.assertEqual(int(end), min(e, self.resource.getFileSize()-1))
1124
self.assertEqual(int(size), self.resource.getFileSize())
1125
self.assertEqual(self.payload[s:e+1], part['body'])
1128
def test_implicitEnd(self):
1130
If the end byte position is omitted, then it is treated as if the
1131
length of the resource was specified by the end byte position.
1133
self.request.headers['range'] = 'bytes=23-'
1134
self.resource.render(self.request)
1135
self.assertEquals(''.join(self.request.written), self.payload[23:])
1136
self.assertEquals(len(''.join(self.request.written)), 41)
1137
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1139
self.request.outgoingHeaders['content-range'], 'bytes 23-63/64')
1140
self.assertEquals(self.request.outgoingHeaders['content-length'], '41')
1143
def test_implicitStart(self):
1145
If the start byte position is omitted but the end byte position is
1146
supplied, then the range is treated as requesting the last -N bytes of
1147
the resource, where N is the end byte position.
1149
self.request.headers['range'] = 'bytes=-17'
1150
self.resource.render(self.request)
1151
self.assertEquals(''.join(self.request.written), self.payload[-17:])
1152
self.assertEquals(len(''.join(self.request.written)), 17)
1153
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1155
self.request.outgoingHeaders['content-range'], 'bytes 47-63/64')
1156
self.assertEquals(self.request.outgoingHeaders['content-length'], '17')
1159
def test_explicitRange(self):
1161
A correct response to a bytes range header request from A to B starts
1162
with the A'th byte and ends with (including) the B'th byte. The first
1163
byte of a page is numbered with 0.
1165
self.request.headers['range'] = 'bytes=3-43'
1166
self.resource.render(self.request)
1167
written = ''.join(self.request.written)
1168
self.assertEquals(written, self.payload[3:44])
1169
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1171
self.request.outgoingHeaders['content-range'], 'bytes 3-43/64')
1173
str(len(written)), self.request.outgoingHeaders['content-length'])
1176
def test_explicitRangeOverlappingEnd(self):
1178
A correct response to a bytes range header request from A to B when B
1179
is past the end of the resource starts with the A'th byte and ends
1180
with the last byte of the resource. The first byte of a page is
1183
self.request.headers['range'] = 'bytes=40-100'
1184
self.resource.render(self.request)
1185
written = ''.join(self.request.written)
1186
self.assertEquals(written, self.payload[40:])
1187
self.assertEquals(self.request.responseCode, http.PARTIAL_CONTENT)
1189
self.request.outgoingHeaders['content-range'], 'bytes 40-63/64')
1191
str(len(written)), self.request.outgoingHeaders['content-length'])
1194
def test_statusCodeRequestedRangeNotSatisfiable(self):
1196
If a range is syntactically invalid due to the start being greater than
1197
the end, the range header is ignored (the request is responded to as if
1198
it were not present).
1200
self.request.headers['range'] = 'bytes=20-13'
1201
self.resource.render(self.request)
1202
self.assertEquals(self.request.responseCode, http.OK)
1203
self.assertEquals(''.join(self.request.written), self.payload)
1205
self.request.outgoingHeaders['content-length'],
1206
str(len(self.payload)))
1209
def test_invalidStartBytePos(self):
1211
If a range is unsatisfiable due to the start not being less than the
1212
length of the resource, the response is 416 (Requested range not
1213
satisfiable) and no data is written to the response body (RFC 2616,
1216
self.request.headers['range'] = 'bytes=67-108'
1217
self.resource.render(self.request)
1219
self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE)
1220
self.assertEquals(''.join(self.request.written), '')
1221
self.assertEquals(self.request.outgoingHeaders['content-length'], '0')
1222
# Sections 10.4.17 and 14.16
1224
self.request.outgoingHeaders['content-range'],
1225
'bytes */%d' % (len(self.payload),))
1229
class DirectoryListerTest(TestCase):
1231
Tests for L{static.DirectoryLister}.
1233
def _request(self, uri):
1234
request = DummyRequest([''])
1239
def test_renderHeader(self):
1241
L{static.DirectoryLister} prints the request uri as header of the
1244
path = FilePath(self.mktemp())
1247
lister = static.DirectoryLister(path.path)
1248
data = lister.render(self._request('foo'))
1249
self.assertIn("<h1>Directory listing for foo</h1>", data)
1250
self.assertIn("<title>Directory listing for foo</title>", data)
1253
def test_renderUnquoteHeader(self):
1255
L{static.DirectoryLister} unquote the request uri before printing it.
1257
path = FilePath(self.mktemp())
1260
lister = static.DirectoryLister(path.path)
1261
data = lister.render(self._request('foo%20bar'))
1262
self.assertIn("<h1>Directory listing for foo bar</h1>", data)
1263
self.assertIn("<title>Directory listing for foo bar</title>", data)
1266
def test_escapeHeader(self):
1268
L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
1271
path = FilePath(self.mktemp())
1274
lister = static.DirectoryLister(path.path)
1275
data = lister.render(self._request('foo%26bar'))
1276
self.assertIn("<h1>Directory listing for foo&bar</h1>", data)
1277
self.assertIn("<title>Directory listing for foo&bar</title>", data)
1280
def test_renderFiles(self):
1282
L{static.DirectoryLister} is able to list all the files inside a
1285
path = FilePath(self.mktemp())
1287
path.child('file1').setContent("content1")
1288
path.child('file2').setContent("content2" * 1000)
1290
lister = static.DirectoryLister(path.path)
1291
data = lister.render(self._request('foo'))
1292
body = """<tr class="odd">
1293
<td><a href="file1">file1</a></td>
1295
<td>[text/html]</td>
1299
<td><a href="file2">file2</a></td>
1301
<td>[text/html]</td>
1304
self.assertIn(body, data)
1307
def test_renderDirectories(self):
1309
L{static.DirectoryLister} is able to list all the directories inside
1312
path = FilePath(self.mktemp())
1314
path.child('dir1').makedirs()
1315
path.child('dir2 & 3').makedirs()
1317
lister = static.DirectoryLister(path.path)
1318
data = lister.render(self._request('foo'))
1319
body = """<tr class="odd">
1320
<td><a href="dir1/">dir1/</a></td>
1322
<td>[Directory]</td>
1326
<td><a href="dir2%20%26%203/">dir2 & 3/</a></td>
1328
<td>[Directory]</td>
1331
self.assertIn(body, data)
1334
def test_renderFiltered(self):
1336
L{static.DirectoryLister} takes a optional C{dirs} argument that
1337
filter out the list of of directories and files printed.
1339
path = FilePath(self.mktemp())
1341
path.child('dir1').makedirs()
1342
path.child('dir2').makedirs()
1343
path.child('dir3').makedirs()
1344
lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
1345
data = lister.render(self._request('foo'))
1346
body = """<tr class="odd">
1347
<td><a href="dir1/">dir1/</a></td>
1349
<td>[Directory]</td>
1353
<td><a href="dir3/">dir3/</a></td>
1355
<td>[Directory]</td>
1358
self.assertIn(body, data)
1361
def test_oddAndEven(self):
1363
L{static.DirectoryLister} gives an alternate class for each odd and
1364
even rows in the table.
1366
lister = static.DirectoryLister(None)
1367
elements = [{"href": "", "text": "", "size": "", "type": "",
1368
"encoding": ""} for i in xrange(5)]
1369
content = lister._buildTableContent(elements)
1371
self.assertEquals(len(content), 5)
1372
self.assertTrue(content[0].startswith('<tr class="odd">'))
1373
self.assertTrue(content[1].startswith('<tr class="even">'))
1374
self.assertTrue(content[2].startswith('<tr class="odd">'))
1375
self.assertTrue(content[3].startswith('<tr class="even">'))
1376
self.assertTrue(content[4].startswith('<tr class="odd">'))
1379
def test_mimeTypeAndEncodings(self):
1381
L{static.DirectoryLister} is able to detect mimetype and encoding of
1384
path = FilePath(self.mktemp())
1386
path.child('file1.txt').setContent("file1")
1387
path.child('file2.py').setContent("python")
1388
path.child('file3.conf.gz').setContent("conf compressed")
1389
path.child('file4.diff.bz2').setContent("diff compressed")
1390
directory = os.listdir(path.path)
1394
".txt": "text/plain",
1395
".py": "text/python",
1396
".conf": "text/configuration",
1397
".diff": "text/diff"
1400
lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
1401
dirs, files = lister._getFilesAndDirectories(directory)
1402
self.assertEquals(dirs, [])
1403
self.assertEquals(files, [
1405
'href': 'file1.txt',
1407
'text': 'file1.txt',
1408
'type': '[text/plain]'},
1413
'type': '[text/python]'},
1414
{'encoding': '[gzip]',
1415
'href': 'file3.conf.gz',
1417
'text': 'file3.conf.gz',
1418
'type': '[text/configuration]'},
1419
{'encoding': '[bzip2]',
1420
'href': 'file4.diff.bz2',
1422
'text': 'file4.diff.bz2',
1423
'type': '[text/diff]'}])
1426
def test_brokenSymlink(self):
1428
If on the file in the listing points to a broken symlink, it should not
1429
be returned by L{static.DirectoryLister._getFilesAndDirectories}.
1431
path = FilePath(self.mktemp())
1433
file1 = path.child('file1')
1434
file1.setContent("file1")
1435
file1.linkTo(path.child("file2"))
1438
lister = static.DirectoryLister(path.path)
1439
directory = os.listdir(path.path)
1441
dirs, files = lister._getFilesAndDirectories(directory)
1442
self.assertEquals(dirs, [])
1443
self.assertEquals(files, [])
1445
if getattr(os, "symlink", None) is None:
1446
test_brokenSymlink.skip = "No symlink support"
1449
def test_childrenNotFound(self):
1451
Any child resource of L{static.DirectoryLister} renders an HTTP
1452
I{NOT FOUND} response code.
1454
path = FilePath(self.mktemp())
1456
lister = static.DirectoryLister(path.path)
1457
request = self._request('')
1458
child = resource.getChildForRequest(lister, request)
1459
result = _render(child, request)
1460
def cbRendered(ignored):
1461
self.assertEquals(request.responseCode, http.NOT_FOUND)
1462
result.addCallback(cbRendered)
1466
def test_repr(self):
1468
L{static.DirectoryLister.__repr__} gives the path of the lister.
1470
path = FilePath(self.mktemp())
1471
lister = static.DirectoryLister(path.path)
1472
self.assertEquals(repr(lister),
1473
"<DirectoryLister of %r>" % (path.path,))
1474
self.assertEquals(str(lister),
1475
"<DirectoryLister of %r>" % (path.path,))
1477
def test_formatFileSize(self):
1479
L{static.formatFileSize} format an amount of bytes into a more readable
1482
self.assertEquals(static.formatFileSize(0), "0B")
1483
self.assertEquals(static.formatFileSize(123), "123B")
1484
self.assertEquals(static.formatFileSize(4567), "4K")
1485
self.assertEquals(static.formatFileSize(8900000), "8M")
1486
self.assertEquals(static.formatFileSize(1234000000), "1G")
1487
self.assertEquals(static.formatFileSize(1234567890000), "1149G")
1491
class TestFileTransferDeprecated(TestCase):
1493
L{static.FileTransfer} is deprecated.
1496
def test_deprecation(self):
1498
Instantiation of L{FileTransfer} produces a deprecation warning.
1500
static.FileTransfer(StringIO.StringIO(), 0, DummyRequest([]))
1501
warnings = self.flushWarnings([self.test_deprecation])
1502
self.assertEqual(len(warnings), 1)
1503
self.assertEqual(warnings[0]['category'], DeprecationWarning)
1505
warnings[0]['message'],
1506
'FileTransfer is deprecated since Twisted 9.0. '
1507
'Use a subclass of StaticProducer instead.')