~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/web2/wsgi.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
An implementation of PEP 333: Python Web Server Gateway Interface (WSGI).
 
3
"""
 
4
 
 
5
import os, threading
 
6
import Queue
 
7
from zope.interface import implements
 
8
 
 
9
from twisted.internet import defer
 
10
from twisted.python import log, failure
 
11
from twisted.web2 import http
 
12
from twisted.web2 import iweb
 
13
from twisted.web2 import server
 
14
from twisted.web2 import stream
 
15
from twisted.web2.twcgi import createCGIEnvironment
 
16
 
 
17
 
 
18
class AlreadyStartedResponse(Exception):
 
19
    pass
 
20
 
 
21
 
 
22
# This isn't a subclass of resource.Resource, because it shouldn't do
 
23
# any method-specific actions at all. All that stuff is totally up to
 
24
# the contained wsgi application
 
25
class WSGIResource(object):
 
26
    """
 
27
    A web2 Resource which wraps the given WSGI application callable.
 
28
 
 
29
    The WSGI application will be called in a separate thread (using
 
30
    the reactor threadpool) whenever a request for this resource or
 
31
    any lower part of the url hierarchy is received.
 
32
    """
 
33
    implements(iweb.IResource)
 
34
    
 
35
    def __init__(self, application):
 
36
        self.application = application
 
37
 
 
38
    def renderHTTP(self, req):
 
39
        from twisted.internet import reactor
 
40
        # Do stuff with WSGIHandler.
 
41
        handler = WSGIHandler(self.application, req)
 
42
        # Get deferred
 
43
        d = handler.responseDeferred
 
44
        # Run it in a thread
 
45
        reactor.callInThread(handler.run)
 
46
        return d
 
47
    
 
48
    def locateChild(self, request, segments):
 
49
        return self, server.StopTraversal
 
50
            
 
51
def callInReactor(__f, *__a, **__kw):
 
52
    from twisted.internet import reactor
 
53
    queue = Queue.Queue()
 
54
    reactor.callFromThread(__callFromThread, queue, __f, __a, __kw)
 
55
    result = queue.get()
 
56
    if isinstance(result, failure.Failure):
 
57
        result.raiseException()
 
58
    return result
 
59
 
 
60
def __callFromThread(queue, f, a, kw):
 
61
    result = defer.maybeDeferred(f, *a, **kw)
 
62
    result.addBoth(queue.put)
 
63
 
 
64
class InputStream(object):
 
65
    """
 
66
    This class implements the 'wsgi.input' object. The methods are
 
67
    expected to have the same behavior as the same-named methods for
 
68
    python's builtin file object.
 
69
    """
 
70
    
 
71
    def __init__(self, newstream):
 
72
        # Called in IO thread
 
73
        self.stream = stream.BufferedStream(newstream)
 
74
        
 
75
    def read(self, size=None):
 
76
        """
 
77
        Read at most size bytes from the input, or less if EOF is
 
78
        encountered. If size is ommitted or negative, read until EOF.
 
79
        """
 
80
        # Called in application thread
 
81
        if size < 0:
 
82
            size = None
 
83
        return callInReactor(self.stream.readExactly, size)
 
84
 
 
85
    def readline(self, size=None):
 
86
        """
 
87
        Read a line, delimited by a newline. If the stream reaches EOF
 
88
        or size bytes have been read before reaching a newline (if
 
89
        size is given), the partial line is returned.
 
90
 
 
91
        COMPATIBILITY NOTE: the size argument is excluded from the
 
92
        WSGI specification, but is provided here anyhow, because
 
93
        useful libraries such as python stdlib's cgi.py assume their
 
94
        input file-like-object supports readline with a size
 
95
        argument. If you use it, be aware your application may not be
 
96
        portable to other conformant WSGI servers.
 
97
        """
 
98
        # Called in application thread
 
99
        if size < 0:
 
100
            # E.g. -1, which is the default readline size for *some*
 
101
            # other file-like-objects...
 
102
            size = None
 
103
 
 
104
        return callInReactor(self.stream.readline, '\n', size = size)
 
105
    
 
106
    def readlines(self, hint=None):
 
107
        """
 
108
        Read until EOF, collecting all lines in a list, and returns
 
109
        that list. The hint argument is ignored (as is allowed in the
 
110
        API specification)
 
111
        """
 
112
        # Called in application thread
 
113
        data = self.read()
 
114
        lines = data.split('\n')
 
115
        last = lines.pop()
 
116
        lines = [s+'\n' for s in lines]
 
117
        if last != '':
 
118
            lines.append(last)
 
119
        return lines
 
120
 
 
121
    def __iter__(self):
 
122
        """
 
123
        Returns an iterator, each iteration of which returns the
 
124
        result of readline(), and stops when readline() returns an
 
125
        empty string.
 
126
        """
 
127
        while 1:
 
128
            line = self.readline()
 
129
            if not line:
 
130
                return
 
131
            yield line
 
132
                
 
133
    
 
134
class ErrorStream(object):
 
135
    """
 
136
    This class implements the 'wsgi.error' object.
 
137
    """
 
138
    def flush(self):
 
139
        # Called in application thread
 
140
        return
 
141
 
 
142
    def write(self, s):
 
143
        # Called in application thread
 
144
        log.msg("WSGI app error: "+s, isError=True)
 
145
 
 
146
    def writelines(self, seq):
 
147
        # Called in application thread
 
148
        s = ''.join(seq)
 
149
        log.msg("WSGI app error: "+s, isError=True)
 
150
 
 
151
class WSGIHandler(object):
 
152
    headersSent = False
 
153
    stopped = False
 
154
    stream = None
 
155
    
 
156
    def __init__(self, application, request):
 
157
        # Called in IO thread
 
158
        self.setupEnvironment(request)
 
159
        self.application = application
 
160
        self.request = request
 
161
        self.response = None
 
162
        self.responseDeferred = defer.Deferred()
 
163
 
 
164
    def setupEnvironment(self, request):
 
165
        # Called in IO thread
 
166
        env = createCGIEnvironment(request)
 
167
        env['wsgi.version']      = (1, 0)
 
168
        env['wsgi.url_scheme']   = env['REQUEST_SCHEME']
 
169
        env['wsgi.input']        = InputStream(request.stream)
 
170
        env['wsgi.errors']       = ErrorStream()
 
171
        env['wsgi.multithread']  = True
 
172
        env['wsgi.multiprocess'] = False
 
173
        env['wsgi.run_once']     = False
 
174
        env['wsgi.file_wrapper'] = FileWrapper
 
175
        self.environment = env
 
176
        
 
177
    def startWSGIResponse(self, status, response_headers, exc_info=None):
 
178
        # Called in application thread
 
179
        if exc_info is not None:
 
180
            try:
 
181
                if self.headersSent:
 
182
                    raise exc_info[0], exc_info[1], exc_info[2]
 
183
            finally:
 
184
                exc_info = None
 
185
        elif self.response is not None:
 
186
            raise AlreadyStartedResponse, 'startWSGIResponse(%r)' % status
 
187
        status = int(status.split(' ')[0])
 
188
        self.response = http.Response(status)
 
189
        for key, value in response_headers:
 
190
            self.response.headers.addRawHeader(key, value)
 
191
        return self.write
 
192
 
 
193
 
 
194
    def run(self):
 
195
        from twisted.internet import reactor
 
196
        # Called in application thread
 
197
        try:
 
198
            result = self.application(self.environment, self.startWSGIResponse)
 
199
            self.handleResult(result)
 
200
        except:
 
201
            if not self.headersSent:
 
202
                reactor.callFromThread(self.__error, failure.Failure())
 
203
            else:
 
204
                reactor.callFromThread(self.stream.finish, failure.Failure())
 
205
 
 
206
    def __callback(self):
 
207
        # Called in IO thread
 
208
        self.responseDeferred.callback(self.response)
 
209
        self.responseDeferred = None
 
210
 
 
211
    def __error(self, f):
 
212
        # Called in IO thread
 
213
        self.responseDeferred.errback(f)
 
214
        self.responseDeferred = None
 
215
            
 
216
    def write(self, output):
 
217
        # Called in application thread
 
218
        from twisted.internet import reactor
 
219
        if self.response is None:
 
220
            raise RuntimeError(
 
221
                "Application didn't call startResponse before writing data!")
 
222
        if not self.headersSent:
 
223
            self.stream=self.response.stream=stream.ProducerStream()
 
224
            self.headersSent = True
 
225
            
 
226
            # threadsafe event object to communicate paused state.
 
227
            self.unpaused = threading.Event()
 
228
            
 
229
            # After this, we cannot touch self.response from this
 
230
            # thread any more
 
231
            def _start():
 
232
                # Called in IO thread
 
233
                self.stream.registerProducer(self, True)
 
234
                self.__callback()
 
235
                # Notify application thread to start writing
 
236
                self.unpaused.set()
 
237
            reactor.callFromThread(_start)
 
238
        # Wait for unpaused to be true
 
239
        self.unpaused.wait()
 
240
        reactor.callFromThread(self.stream.write, output)
 
241
 
 
242
    def writeAll(self, result):
 
243
        # Called in application thread
 
244
        from twisted.internet import reactor
 
245
        if not self.headersSent:
 
246
            if self.response is None:
 
247
                raise RuntimeError(
 
248
                    "Application didn't call startResponse before writing data!")
 
249
            l = 0
 
250
            for item in result:
 
251
                l += len(item)
 
252
            self.response.stream=stream.ProducerStream(length=l)
 
253
            self.response.stream.buffer = list(result)
 
254
            self.response.stream.finish()
 
255
            reactor.callFromThread(self.__callback)
 
256
        else:
 
257
            # Has already been started, cannot replace the stream
 
258
            def _write():
 
259
                # Called in IO thread
 
260
                for s in result:
 
261
                    self.stream.write(s)
 
262
                self.stream.finish()
 
263
            reactor.callFromThread(_write)
 
264
            
 
265
            
 
266
    def handleResult(self, result):
 
267
        # Called in application thread
 
268
        try:
 
269
            from twisted.internet import reactor
 
270
            if (isinstance(result, FileWrapper) and 
 
271
                   hasattr(result.filelike, 'fileno') and
 
272
                   not self.headersSent):
 
273
                if self.response is None:
 
274
                    raise RuntimeError(
 
275
                        "Application didn't call startResponse before writing data!")
 
276
                self.headersSent = True
 
277
                # Make FileStream and output it. We make a new file
 
278
                # object from the fd, just in case the original one
 
279
                # isn't an actual file object.
 
280
                self.response.stream = stream.FileStream(
 
281
                    os.fdopen(os.dup(result.filelike.fileno())))
 
282
                reactor.callFromThread(self.__callback)
 
283
                return
 
284
 
 
285
            if type(result) in (list,tuple):
 
286
                # If it's a list or tuple (exactly, not subtype!),
 
287
                # then send the entire thing down to Twisted at once,
 
288
                # and free up this thread to do other work.
 
289
                self.writeAll(result)
 
290
                return
 
291
            
 
292
            # Otherwise, this thread has to keep running to provide the
 
293
            # data.
 
294
            for data in result:
 
295
                if self.stopped:
 
296
                    return
 
297
                self.write(data)
 
298
            
 
299
            if not self.headersSent:
 
300
                if self.response is None:
 
301
                    raise RuntimeError(
 
302
                        "Application didn't call startResponse, and didn't send any data!")
 
303
                
 
304
                self.headersSent = True
 
305
                reactor.callFromThread(self.__callback)
 
306
            else:
 
307
                reactor.callFromThread(self.stream.finish)
 
308
                
 
309
        finally:
 
310
            if hasattr(result,'close'):
 
311
                result.close()
 
312
 
 
313
    def pauseProducing(self):
 
314
        # Called in IO thread
 
315
        self.unpaused.set()
 
316
 
 
317
    def resumeProducing(self):
 
318
        # Called in IO thread
 
319
        self.unpaused.clear()
 
320
        
 
321
    def stopProducing(self):
 
322
        self.stopped = True
 
323
        
 
324
class FileWrapper(object):
 
325
    """
 
326
    Wrapper to convert file-like objects to iterables, to implement
 
327
    the optional 'wsgi.file_wrapper' object.
 
328
    """
 
329
 
 
330
    def __init__(self, filelike, blksize=8192):
 
331
        self.filelike = filelike
 
332
        self.blksize = blksize
 
333
        if hasattr(filelike,'close'):
 
334
            self.close = filelike.close
 
335
            
 
336
    def __iter__(self):
 
337
        return self
 
338
        
 
339
    def next(self):
 
340
        data = self.filelike.read(self.blksize)
 
341
        if data:
 
342
            return data
 
343
        raise StopIteration
 
344
 
 
345
__all__ = ['WSGIResource']