3
from cStringIO import StringIO
4
from zope.interface import implements
6
from twisted.python import log
7
from twisted.internet import interfaces, protocol, reactor
8
from twisted.protocols import policies, basic
9
from twisted.web2 import responsecode
10
from twisted.web2 import http_headers
11
from twisted.web2 import http
13
PERSIST_NO_PIPELINE, PERSIST_PIPELINE = (1,2)
16
def _cachedGetHostByAddr(hostaddr):
17
hostname = _cachedHostNames.get(hostaddr)
20
hostname = socket.gethostbyaddr(hostaddr)[0]
23
_cachedHostNames[hostaddr]=hostname
26
class StringTransport(object):
28
I am a StringIO wrapper that conforms for the transport API. I support
29
the 'writeSequence' method.
33
def writeSequence(self, seq):
34
self.s.write(''.join(seq))
35
def __getattr__(self, attr):
36
return getattr(self.__dict__['s'], attr)
38
class AbortedException(Exception):
42
class HTTPParser(object):
43
"""This class handles the parsing side of HTTP processing. With a suitable
44
subclass, it can parse either the client side or the server side of the
49
parseCloseAsEnd = False
58
finishedReading = False
70
# handleContentChunk(data)
71
# handleContentComplete()
73
# Needs functions to exist on .channel
74
# channel.maxHeaderLength
75
# channel.requestReadFinished(self)
76
# channel.setReadPersistent(self, persistent)
77
# (from LineReceiver):
78
# channel.setRawMode()
79
# channel.setLineMode(extraneous)
80
# channel.pauseProducing()
81
# channel.resumeProducing()
82
# channel.stopProducing()
85
def __init__(self, channel):
86
self.inHeaders = http_headers.Headers()
87
self.channel = channel
89
def lineReceived(self, line):
91
# Parsing a chunked input
92
if self.chunkedIn == 1:
93
# First we get a line like "chunk-size [';' chunk-extension]"
94
# (where chunk extension is just random crap as far as we're concerned)
95
# RFC says to ignore any extensions you don't recognize -- that's all of them.
96
chunksize = line.split(';', 1)[0]
98
self.length = int(chunksize, 16)
100
self._abortWithError(responsecode.BAD_REQUEST, "Invalid chunk size, not a hex number: %s!" % chunksize)
102
self._abortWithError(responsecode.BAD_REQUEST, "Invalid chunk size, negative.")
105
# We're done, parse the trailers line
108
# Read self.length bytes of raw data
109
self.channel.setRawMode()
110
elif self.chunkedIn == 2:
111
# After we got data bytes of the appropriate length, we end up here,
112
# waiting for the CRLF, then go back to get the next chunk size.
114
self._abortWithError(responsecode.BAD_REQUEST, "Excess %d bytes sent in chunk transfer mode" % len(line))
116
elif self.chunkedIn == 3:
117
# TODO: support Trailers (maybe! but maybe not!)
119
# After getting the final "0" chunk we're here, and we *EAT MERCILESSLY*
120
# any trailer headers sent, and wait for the blank line to terminate the
123
self.allContentReceived()
124
# END of chunk handling
126
# Empty line => End of headers
127
if self.partialHeader:
128
self.headerReceived(self.partialHeader)
129
self.partialHeader = ''
130
self.allHeadersReceived() # can set chunkedIn
133
# stay in linemode waiting for chunk header
135
elif self.length == 0:
136
# no content expected
137
self.allContentReceived()
139
# await raw data as content
140
self.channel.setRawMode()
141
# Should I do self.pauseProducing() here?
142
self.processRequest()
144
self.headerlen += len(line)
145
if self.headerlen > self.channel.maxHeaderLength:
146
self._abortWithError(responsecode.BAD_REQUEST, 'Headers too long.')
149
# Append a header continuation
150
self.partialHeader += line
152
if self.partialHeader:
153
self.headerReceived(self.partialHeader)
154
self.partialHeader = line
156
def rawDataReceived(self, data):
157
"""Handle incoming content."""
159
if datalen < self.length:
160
self.handleContentChunk(data)
161
self.length = self.length - datalen
163
self.handleContentChunk(data[:self.length])
164
extraneous = data[self.length:]
165
channel = self.channel # could go away from allContentReceived.
166
if not self.chunkedIn:
167
self.allContentReceived()
169
# NOTE: in chunked mode, self.length is the size of the current chunk,
170
# so we still have more to read.
171
self.chunkedIn = 2 # Read next chunksize
173
channel.setLineMode(extraneous)
175
def headerReceived(self, line):
176
"""Store this header away. Check for too much header data
177
(> channel.maxHeaderLength) and abort the connection if so.
179
nameval = line.split(':', 1)
180
if len(nameval) != 2:
181
self._abortWithError(responsecode.BAD_REQUEST, "No ':' in header.")
184
val = val.lstrip(' \t')
185
self.inHeaders.addRawHeader(name, val)
188
def allHeadersReceived(self):
189
# Split off connection-related headers
190
connHeaders = self.splitConnectionHeaders()
192
# Set connection parameters from headers
193
self.setConnectionParams(connHeaders)
194
self.connHeaders = connHeaders
196
def allContentReceived(self):
197
self.finishedReading = True
198
self.channel.requestReadFinished(self)
199
self.handleContentComplete()
202
def splitConnectionHeaders(self):
203
# Split off headers for the connection from headers for the request.
206
h = inHeaders.getRawHeaders(name, None)
208
inHeaders.removeHeader(name)
209
connHeaders.setRawHeaders(name, h)
211
# NOTE: According to HTTP spec, we're supposed to eat the
212
# 'Proxy-Authenticate' and 'Proxy-Authorization' headers also, but that
213
# doesn't sound like a good idea to me, because it makes it impossible
214
# to have a non-authenticating transparent proxy in front of an
215
# authenticating proxy. An authenticating proxy can eat them itself.
216
# 'Proxy-Connection' is an undocumented HTTP 1.0 abomination.
217
connHeaderNames = ['connection', 'content-length', 'keep-alive', 'te',
218
'trailers', 'transfer-encoding', 'upgrade',
220
inHeaders = self.inHeaders
221
connHeaders = http_headers.Headers()
224
if self.version < (1,1):
225
# Remove all headers mentioned in Connection, because a HTTP 1.0
226
# proxy might have erroneously forwarded it from a 1.1 client.
227
for name in connHeaders.getHeader('connection', ()):
228
if inHeaders.hasHeader(name):
229
inHeaders.removeHeader(name)
231
# Otherwise, just add the headers listed to the list of those to move
232
connHeaderNames.extend(connHeaders.getHeader('connection', ()))
234
for headername in connHeaderNames:
239
def setConnectionParams(self, connHeaders):
240
# Figure out persistent connection stuff
241
if self.version >= (1,1):
242
if 'close' in connHeaders.getHeader('connection', ()):
243
readPersistent = False
245
readPersistent = PERSIST_PIPELINE
246
elif 'keep-alive' in connHeaders.getHeader('connection', ()):
247
readPersistent = PERSIST_NO_PIPELINE
249
readPersistent = False
252
# Okay, now implement section 4.4 Message Length to determine
253
# how to find the end of the incoming HTTP message.
254
transferEncoding = connHeaders.getHeader('transfer-encoding')
257
if transferEncoding[-1] == 'chunked':
260
# Cut off the chunked encoding (cause it's special)
261
transferEncoding = transferEncoding[:-1]
262
elif not self.parseCloseAsEnd:
263
# Would close on end of connection, except this can't happen for
264
# client->server data. (Well..it could actually, since TCP has half-close
265
# but the HTTP spec says it can't, so we'll pretend it's right.)
266
self._abortWithError(responsecode.BAD_REQUEST, "Transfer-Encoding received without chunked in last position.")
268
# TODO: support gzip/etc encodings.
269
# FOR NOW: report an error if the client uses any encodings.
270
# They shouldn't, because we didn't send a TE: header saying it's okay.
272
self._abortWithError(responsecode.NOT_IMPLEMENTED, "Transfer-Encoding %s not supported." % transferEncoding)
274
# No transfer-coding.
276
if self.parseCloseAsEnd:
277
# If no Content-Length, indeterminate length data
278
# (unless the responsecode was one of the special ones, or
279
# the request method was HEAD.
280
# If the request was HEAD, self.length has been set to 0 by
281
# HTTPClientRequest.submit)
282
if self.code in http.NO_BODY_CODES:
285
self.length = connHeaders.getHeader('content-length', self.length)
287
# If it's an indeterminate stream without transfer encoding, it must be
289
if self.length is None:
290
readPersistent = False
292
# If no Content-Length either, assume no content.
293
self.length = connHeaders.getHeader('content-length', 0)
295
# Set the calculated persistence
296
self.channel.setReadPersistent(readPersistent)
298
def abortParse(self):
299
# If we're erroring out while still reading the request
300
if not self.finishedReading:
301
self.finishedReading = True
302
self.channel.setReadPersistent(False)
303
self.channel.requestReadFinished(self)
306
def pauseProducing(self):
307
if not self.finishedReading:
308
self.channel.pauseProducing()
310
def resumeProducing(self):
311
if not self.finishedReading:
312
self.channel.resumeProducing()
314
def stopProducing(self):
315
if not self.finishedReading:
316
self.channel.stopProducing()
318
class HTTPChannelRequest(HTTPParser):
319
"""This class handles the state and parsing for one HTTP request.
320
It is responsible for all the low-level connection oriented behavior.
321
Thus, it takes care of keep-alive, de-chunking, etc., and passes
322
the non-connection headers on to the user-level Request object."""
324
command = path = version = None
328
out_version = "HTTP/1.1"
330
def __init__(self, channel, queued=0):
331
HTTPParser.__init__(self, channel)
334
# Buffer writes to a string until we're first in line
335
# to write a response
337
self.transport = StringTransport()
339
self.transport = self.channel.transport
341
# set the version to a fallback for error generation
345
def gotInitialLine(self, initialLine):
346
parts = initialLine.split()
348
# Parse the initial request line
352
if len(parts) == 2 and parts[1][0] == '/':
353
parts.append('HTTP/0.9')
355
self._abortWithError(responsecode.BAD_REQUEST, 'Bad request line: %s' % initialLine)
357
self.command, self.path, strversion = parts
359
protovers = http.parseVersion(strversion)
360
if protovers[0] != 'http':
363
self._abortWithError(responsecode.BAD_REQUEST, "Unknown protocol: %s" % strversion)
365
self.version = protovers[1:3]
367
# Ensure HTTP 0 or HTTP 1.
368
if self.version[0] > 1:
369
self._abortWithError(responsecode.HTTP_VERSION_NOT_SUPPORTED, 'Only HTTP 0.9 and HTTP 1.x are supported.')
371
if self.version[0] == 0:
372
# simulate end of headers, as HTTP 0 doesn't have headers.
373
self.lineReceived('')
375
def lineLengthExceeded(self, line, wasFirst=False):
376
code = wasFirst and responsecode.REQUEST_URI_TOO_LONG or responsecode.BAD_REQUEST
377
self._abortWithError(code, 'Header line too long.')
379
def createRequest(self):
380
self.request = self.channel.requestFactory(self, self.command, self.path, self.version, self.length, self.inHeaders)
383
def processRequest(self):
384
self.request.process()
386
def handleContentChunk(self, data):
387
self.request.handleContentChunk(data)
389
def handleContentComplete(self):
390
self.request.handleContentComplete()
392
############## HTTPChannelRequest *RESPONSE* methods #############
397
##### Request Callbacks #####
398
def writeIntermediateResponse(self, code, headers=None):
399
if self.version >= (1,1):
400
self._writeHeaders(code, headers, False)
402
def writeHeaders(self, code, headers):
403
self._writeHeaders(code, headers, True)
405
def _writeHeaders(self, code, headers, addConnectionHeaders):
406
# HTTP 0.9 doesn't have headers.
407
if self.version[0] == 0:
411
code_message = responsecode.RESPONSES.get(code, "Unknown Status")
413
l.append('%s %s %s\r\n' % (self.out_version, code,
415
if headers is not None:
416
for name, valuelist in headers.getAllRawHeaders():
417
for value in valuelist:
418
l.append("%s: %s\r\n" % (name, value))
420
if addConnectionHeaders:
421
# if we don't have a content length, we send data in
422
# chunked mode, so that we can support persistent connections.
423
if (headers.getHeader('content-length') is None and
424
self.command != "HEAD" and code not in http.NO_BODY_CODES):
425
if self.version >= (1,1):
426
l.append("%s: %s\r\n" % ('Transfer-Encoding', 'chunked'))
427
self.chunkedOut = True
429
# Cannot use persistent connections if we can't do chunking
430
self.channel.dropQueuedRequests()
432
if self.channel.isLastRequest(self):
433
l.append("%s: %s\r\n" % ('Connection', 'close'))
434
elif self.version < (1,1):
435
l.append("%s: %s\r\n" % ('Connection', 'Keep-Alive'))
438
self.transport.writeSequence(l)
441
def write(self, data):
444
elif self.chunkedOut:
445
self.transport.writeSequence(("%X\r\n" % len(data), data, "\r\n"))
447
self.transport.write(data)
450
"""We are finished writing data."""
452
warnings.warn("Warning! request.finish called twice.", stacklevel=2)
456
# write last chunk and closing CRLF
457
self.transport.write("0\r\n\r\n")
464
def abortConnection(self, closeWrite=True):
465
"""Abort the HTTP connection because of some kind of unrecoverable
466
error. If closeWrite=False, then only abort reading, but leave
467
the writing side alone. This is mostly for internal use by
468
the HTTP request parsing logic, so that it can call an error
471
Otherwise, completely shut down the connection.
476
self.producer.stopProducing()
477
self.unregisterProducer()
481
self.transport.reset()
482
self.transport.truncate()
486
def getHostInfo(self):
487
t=self.channel.transport
488
secure = interfaces.ISSLTransport(t, None) is not None
490
host.host = _cachedGetHostByAddr(host.host)
493
def getRemoteHost(self):
494
return self.channel.transport.getPeer()
496
##### End Request Callbacks #####
498
def _abortWithError(self, errorcode, text=''):
499
"""Handle low level protocol errors."""
500
headers = http_headers.Headers()
501
headers.setHeader('content-length', len(text)+1)
503
self.abortConnection(closeWrite=False)
504
self.writeHeaders(errorcode, headers)
508
raise AbortedException
511
"""Called when have finished responding and are no longer queued."""
513
log.err(RuntimeError("Producer was not unregistered for %s" % self))
514
self.unregisterProducer()
515
self.channel.requestWriteFinished(self)
518
# methods for channel - end users should not use these
520
def noLongerQueued(self):
521
"""Notify the object that it is no longer queued.
523
We start writing whatever data we have to the transport, etc.
525
This method is not intended for users.
528
raise RuntimeError, "noLongerQueued() got called unnecessarily."
532
# set transport to real one and send any buffer data
533
data = self.transport.getvalue()
534
self.transport = self.channel.transport
536
self.transport.write(data)
538
# if we have producer, register it with transport
539
if (self.producer is not None) and not self.finished:
540
self.transport.registerProducer(self.producer, True)
542
# if we're finished, clean up
548
def registerProducer(self, producer, streaming):
549
"""Register a producer.
553
raise ValueError, "registering producer %s before previous one (%s) was unregistered" % (producer, self.producer)
555
self.producer = producer
558
producer.pauseProducing()
560
self.transport.registerProducer(producer, streaming)
562
def unregisterProducer(self):
563
"""Unregister the producer."""
565
self.transport.unregisterProducer()
568
def connectionLost(self, reason):
569
"""connection was lost"""
570
if self.queued and self.producer:
571
self.producer.stopProducing()
574
self.request.connectionLost(reason)
576
class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin, object):
577
"""A receiver for HTTP requests. Handles splitting up the connection
578
for the multiple HTTPChannelRequests that may be in progress on this
581
@ivar timeOut: number of seconds to wait before terminating an
584
@ivar maxPipeline: number of outstanding in-progress requests
585
to allow before pausing the input.
587
@ivar maxHeaderLength: number of bytes of header to accept from
592
implements(interfaces.IHalfCloseableProtocol)
594
## Configuration parameters. Set in instances or subclasses.
596
# How many simultaneous requests to handle.
599
# Timeout when between two requests
600
betweenRequestsTimeOut = 15
601
# Timeout between lines or bytes while reading a request
602
inputTimeOut = 60 * 4
604
# maximum length of headers (10KiB)
605
maxHeaderLength = 10240
607
# Allow persistent connections?
608
allowPersistentConnections = True
611
chanRequestFactory = HTTPChannelRequest
612
requestFactory = http.Request
616
readPersistent = PERSIST_PIPELINE
624
def _callLater(self, secs, fun):
625
reactor.callLater(secs, fun)
631
def connectionMade(self):
632
self.setTimeout(self.inputTimeOut)
633
self.factory.outstandingRequests+=1
635
def lineReceived(self, line):
637
self.setTimeout(self.inputTimeOut)
638
# if this connection is not persistent, drop any data which
639
# the client (illegally) sent after the last request.
640
if not self.readPersistent:
641
self.dataReceived = self.lineReceived = lambda *args: None
644
# IE sends an extraneous empty line (\r\n) after a POST request;
645
# eat up such a line, but only ONCE
646
if not line and self._first_line == 1:
652
if not self.allowPersistentConnections:
653
# Don't allow a second request
654
self.readPersistent = False
657
self.chanRequest = self.chanRequestFactory(self, len(self.requests))
658
self.requests.append(self.chanRequest)
659
self.chanRequest.gotInitialLine(line)
660
except AbortedException:
664
self.chanRequest.lineReceived(line)
665
except AbortedException:
668
def lineLengthExceeded(self, line):
670
# Fabricate a request object to respond to the line length violation.
671
self.chanRequest = self.chanRequestFactory(self,
673
self.requests.append(self.chanRequest)
674
self.chanRequest.gotInitialLine("GET fake HTTP/1.0")
676
self.chanRequest.lineLengthExceeded(line, self._first_line)
677
except AbortedException:
680
def rawDataReceived(self, data):
681
self.setTimeout(self.inputTimeOut)
683
self.chanRequest.rawDataReceived(data)
684
except AbortedException:
687
def requestReadFinished(self, request):
688
if(self.readPersistent is PERSIST_NO_PIPELINE or
689
len(self.requests) >= self.maxPipeline):
690
self.pauseProducing()
692
# reset state variables
694
self.chanRequest = None
697
# Disable the idle timeout, in case this request takes a long
698
# time to finish generating output.
699
if len(self.requests) > 0:
700
self.setTimeout(None)
702
def _startNextRequest(self):
703
# notify next request, if present, it can start writing
707
self.transport.loseConnection()
709
self.requests[0].noLongerQueued()
711
# resume reading if allowed to
712
if(not self._readLost and
713
self.readPersistent is not PERSIST_NO_PIPELINE and
714
len(self.requests) < self.maxPipeline):
715
self.resumeProducing()
717
# No more incoming data, they already closed!
718
self.transport.loseConnection()
720
# no requests in queue, resume reading
721
self.setTimeout(self.betweenRequestsTimeOut)
722
self.resumeProducing()
724
def setReadPersistent(self, persistent):
725
if self.readPersistent:
726
# only allow it to be set if it's not currently False
727
self.readPersistent = persistent
729
def dropQueuedRequests(self):
730
"""Called when a response is written that forces a connection close."""
731
self.readPersistent = False
732
# Tell all requests but first to abort.
733
for request in self.requests[1:]:
734
request.connectionLost(None)
735
del self.requests[1:]
737
def isLastRequest(self, request):
738
# Is this channel handling the last possible request
739
return not self.readPersistent and self.requests[-1] == request
741
def requestWriteFinished(self, request):
742
"""Called by first request in queue when it is done."""
743
if request != self.requests[0]: raise TypeError
745
# Don't del because we haven't finished cleanup, so,
746
# don't want queue len to be 0 yet.
747
self.requests[0] = None
749
if self.readPersistent or len(self.requests) > 1:
750
# Do this in the next reactor loop so as to
751
# not cause huge call stacks with fast
753
self._callLater(0, self._startNextRequest)
755
self.lingeringClose()
757
def timeoutConnection(self):
758
#log.msg("Timing out client: %s" % str(self.transport.getPeer()))
759
policies.TimeoutMixin.timeoutConnection(self)
761
def lingeringClose(self):
763
This is a bit complicated. This process is necessary to ensure proper
764
workingness when HTTP pipelining is in use.
766
Here is what it wants to do:
768
1. Finish writing any buffered data, then close our write side.
769
While doing so, read and discard any incoming data.
771
2. When that happens (writeConnectionLost called), wait up to 20
772
seconds for the remote end to close their write side (our read
776
- If they do (readConnectionLost called), close the socket,
777
and cancel the timeout.
779
- If that doesn't happen, the timer fires, and makes the
780
socket close anyways.
784
self.transport.loseWriteConnection()
786
# Throw out any incoming data
787
self.dataReceived = self.lineReceived = lambda *args: None
788
self.transport.resumeProducing()
790
def writeConnectionLost(self):
791
# Okay, all data has been written
792
# In 20 seconds, actually close the socket
793
self._lingerTimer = reactor.callLater(20, self._lingerClose)
794
self._writeLost = True
796
def _lingerClose(self):
797
self._lingerTimer = None
798
self.transport.loseConnection()
800
def readConnectionLost(self):
801
"""Read connection lost"""
802
# If in the lingering-close state, lose the socket.
803
if self._lingerTimer:
804
self._lingerTimer.cancel()
805
self._lingerTimer = None
806
self.transport.loseConnection()
809
# If between requests, drop connection
810
# when all current requests have written their data.
811
self._readLost = True
812
if not self.requests:
813
# No requests in progress, lose now.
814
self.transport.loseConnection()
816
# If currently in the process of reading a request, this is
817
# probably a client abort, so lose the connection.
819
self.transport.loseConnection()
821
def connectionLost(self, reason):
822
self.factory.outstandingRequests-=1
824
self._writeLost = True
825
self.readConnectionLost()
826
self.setTimeout(None)
828
# Tell all requests to abort.
829
for request in self.requests:
830
if request is not None:
831
request.connectionLost(reason)
833
class OverloadedServerProtocol(protocol.Protocol):
834
def connectionMade(self):
835
self.transport.write("HTTP/1.0 503 Service Unavailable\r\n"
836
"Content-Type: text/html\r\n"
837
"Connection: close\r\n\r\n"
838
"<html><head><title>503 Service Unavailable</title></head>"
839
"<body><h1>Service Unavailable</h1>"
840
"The server is currently overloaded, "
841
"please try again later.</body></html>")
842
self.transport.loseConnection()
844
class HTTPFactory(protocol.ServerFactory):
845
"""Factory for HTTP server."""
847
protocol = HTTPChannel
851
outstandingRequests = 0
853
def __init__(self, requestFactory, maxRequests=600, **kwargs):
854
self.maxRequests=maxRequests
855
self.protocolArgs = kwargs
856
self.protocolArgs['requestFactory']=requestFactory
858
def buildProtocol(self, addr):
859
if self.outstandingRequests >= self.maxRequests:
860
return OverloadedServerProtocol()
862
p = protocol.ServerFactory.buildProtocol(self, addr)
864
for arg,value in self.protocolArgs.iteritems():
865
setattr(p, arg, value)
868
__all__ = ['HTTPFactory', ]