~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/tornado/tornado/httpclient.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright 2009 Facebook
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#     http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
"""Blocking and non-blocking HTTP client implementations using pycurl."""
 
18
 
 
19
import calendar
 
20
import collections
 
21
import cStringIO
 
22
import email.utils
 
23
import errno
 
24
import functools
 
25
import httplib
 
26
import ioloop
 
27
import logging
 
28
import pycurl
 
29
import time
 
30
import weakref
 
31
 
 
32
_log = logging.getLogger('tornado.httpclient')
 
33
 
 
34
class HTTPClient(object):
 
35
    """A blocking HTTP client backed with pycurl.
 
36
 
 
37
    Typical usage looks like this:
 
38
 
 
39
        http_client = httpclient.HTTPClient()
 
40
        try:
 
41
            response = http_client.fetch("http://www.google.com/")
 
42
            print response.body
 
43
        except httpclient.HTTPError, e:
 
44
            print "Error:", e
 
45
 
 
46
    fetch() can take a string URL or an HTTPRequest instance, which offers
 
47
    more options, like executing POST/PUT/DELETE requests.
 
48
    """
 
49
    def __init__(self, max_simultaneous_connections=None):
 
50
        self._curl = _curl_create(max_simultaneous_connections)
 
51
 
 
52
    def __del__(self):
 
53
        self._curl.close()
 
54
 
 
55
    def fetch(self, request, **kwargs):
 
56
        """Executes an HTTPRequest, returning an HTTPResponse.
 
57
 
 
58
        If an error occurs during the fetch, we raise an HTTPError.
 
59
        """
 
60
        if not isinstance(request, HTTPRequest):
 
61
           request = HTTPRequest(url=request, **kwargs)
 
62
        buffer = cStringIO.StringIO()
 
63
        headers = {}
 
64
        try:
 
65
            _curl_setup_request(self._curl, request, buffer, headers)
 
66
            self._curl.perform()
 
67
            code = self._curl.getinfo(pycurl.HTTP_CODE)
 
68
            if code < 200 or code >= 300:
 
69
                raise HTTPError(code)
 
70
            effective_url = self._curl.getinfo(pycurl.EFFECTIVE_URL)
 
71
            return HTTPResponse(
 
72
                request=request, code=code, headers=headers,
 
73
                body=buffer.getvalue(), effective_url=effective_url)
 
74
        except pycurl.error, e:
 
75
            raise CurlError(*e)
 
76
        finally:
 
77
            buffer.close()
 
78
 
 
79
 
 
80
class AsyncHTTPClient(object):
 
81
    """An non-blocking HTTP client backed with pycurl.
 
82
 
 
83
    Example usage:
 
84
 
 
85
        import ioloop
 
86
 
 
87
        def handle_request(response):
 
88
            if response.error:
 
89
                print "Error:", response.error
 
90
            else:
 
91
                print response.body
 
92
            ioloop.IOLoop.instance().stop()
 
93
 
 
94
        http_client = httpclient.AsyncHTTPClient()
 
95
        http_client.fetch("http://www.google.com/", handle_request)
 
96
        ioloop.IOLoop.instance().start()
 
97
 
 
98
    fetch() can take a string URL or an HTTPRequest instance, which offers
 
99
    more options, like executing POST/PUT/DELETE requests.
 
100
 
 
101
    The keyword argument max_clients to the AsyncHTTPClient constructor
 
102
    determines the maximum number of simultaneous fetch() operations that
 
103
    can execute in parallel on each IOLoop.
 
104
    """
 
105
    _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
 
106
 
 
107
    def __new__(cls, io_loop=None, max_clients=10,
 
108
                max_simultaneous_connections=None):
 
109
        # There is one client per IOLoop since they share curl instances
 
110
        io_loop = io_loop or ioloop.IOLoop.instance()
 
111
        if io_loop in cls._ASYNC_CLIENTS:
 
112
            return cls._ASYNC_CLIENTS[io_loop]
 
113
        else:
 
114
            instance = super(AsyncHTTPClient, cls).__new__(cls)
 
115
            instance.io_loop = io_loop
 
116
            instance._multi = pycurl.CurlMulti()
 
117
            instance._curls = [_curl_create(max_simultaneous_connections)
 
118
                               for i in xrange(max_clients)]
 
119
            instance._free_list = instance._curls[:]
 
120
            instance._requests = collections.deque()
 
121
            instance._fds = {}
 
122
            instance._events = {}
 
123
            instance._added_perform_callback = False
 
124
            instance._timeout = None
 
125
            cls._ASYNC_CLIENTS[io_loop] = instance
 
126
            return instance
 
127
 
 
128
    def close(self):
 
129
        """Destroys this http client, freeing any file descriptors used.
 
130
        Not needed in normal use, but may be helpful in unittests that
 
131
        create and destroy http clients.  No other methods may be called
 
132
        on the AsyncHTTPClient after close().
 
133
        """
 
134
        del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
 
135
        for curl in self._curls:
 
136
            curl.close()
 
137
        self._multi.close()
 
138
 
 
139
    def fetch(self, request, callback, **kwargs):
 
140
        """Executes an HTTPRequest, calling callback with an HTTPResponse.
 
141
 
 
142
        If an error occurs during the fetch, the HTTPResponse given to the
 
143
        callback has a non-None error attribute that contains the exception
 
144
        encountered during the request. You can call response.reraise() to
 
145
        throw the exception (if any) in the callback.
 
146
        """
 
147
        if not isinstance(request, HTTPRequest):
 
148
           request = HTTPRequest(url=request, **kwargs)
 
149
        self._requests.append((request, callback))
 
150
        self._add_perform_callback()
 
151
 
 
152
    def _add_perform_callback(self):
 
153
        if not self._added_perform_callback:
 
154
            self.io_loop.add_callback(self._perform)
 
155
            self._added_perform_callback = True
 
156
 
 
157
    def _handle_events(self, fd, events):
 
158
        self._events[fd] = events
 
159
        self._add_perform_callback()
 
160
 
 
161
    def _handle_timeout(self):
 
162
        self._timeout = None
 
163
        self._perform()
 
164
 
 
165
    def _perform(self):
 
166
        self._added_perform_callback = False
 
167
 
 
168
        while True:
 
169
            while True:
 
170
                ret, num_handles = self._multi.perform()
 
171
                if ret != pycurl.E_CALL_MULTI_PERFORM:
 
172
                    break
 
173
 
 
174
            # Handle completed fetches
 
175
            completed = 0
 
176
            while True:
 
177
                num_q, ok_list, err_list = self._multi.info_read()
 
178
                for curl in ok_list:
 
179
                    self._finish(curl)
 
180
                    completed += 1
 
181
                for curl, errnum, errmsg in err_list:
 
182
                    self._finish(curl, errnum, errmsg)
 
183
                    completed += 1
 
184
                if num_q == 0:
 
185
                    break
 
186
 
 
187
            # Start fetching new URLs
 
188
            started = 0
 
189
            while self._free_list and self._requests:
 
190
                started += 1
 
191
                curl = self._free_list.pop()
 
192
                (request, callback) = self._requests.popleft()
 
193
                curl.info = {
 
194
                    "headers": {},
 
195
                    "buffer": cStringIO.StringIO(),
 
196
                    "request": request,
 
197
                    "callback": callback,
 
198
                    "start_time": time.time(),
 
199
                }
 
200
                _curl_setup_request(curl, request, curl.info["buffer"],
 
201
                                    curl.info["headers"])
 
202
                self._multi.add_handle(curl)
 
203
 
 
204
            if not started and not completed:
 
205
                break
 
206
 
 
207
        if self._timeout is not None:
 
208
            self.io_loop.remove_timeout(self._timeout)
 
209
            self._timeout = None
 
210
 
 
211
        if num_handles:
 
212
            self._timeout = self.io_loop.add_timeout(
 
213
                time.time() + 0.2, self._handle_timeout)
 
214
 
 
215
        # Wait for more I/O
 
216
        fds = {}
 
217
        (readable, writable, exceptable) = self._multi.fdset()
 
218
        for fd in readable:
 
219
            fds[fd] = fds.get(fd, 0) | 0x1 | 0x2
 
220
        for fd in writable:
 
221
            fds[fd] = fds.get(fd, 0) | 0x4
 
222
        for fd in exceptable:
 
223
            fds[fd] = fds.get(fd, 0) | 0x8 | 0x10
 
224
 
 
225
        for fd in self._fds:
 
226
            if fd not in fds:
 
227
                try:
 
228
                    self.io_loop.remove_handler(fd)
 
229
                except (OSError, IOError), e:
 
230
                    if e[0] != errno.ENOENT:
 
231
                        raise
 
232
 
 
233
        for fd, events in fds.iteritems():
 
234
            old_events = self._fds.get(fd, None)
 
235
            if old_events is None:
 
236
                self.io_loop.add_handler(fd, self._handle_events, events)
 
237
            elif old_events != events:
 
238
                try:
 
239
                    self.io_loop.update_handler(fd, events)
 
240
                except (OSError, IOError), e:
 
241
                    if e[0] == errno.ENOENT:
 
242
                        self.io_loop.add_handler(fd, self._handle_events,
 
243
                                                 events)
 
244
                    else:
 
245
                        raise
 
246
        self._fds = fds
 
247
 
 
248
    def _finish(self, curl, curl_error=None, curl_message=None):
 
249
        info = curl.info
 
250
        curl.info = None
 
251
        self._multi.remove_handle(curl)
 
252
        self._free_list.append(curl)
 
253
        if curl_error:
 
254
            error = CurlError(curl_error, curl_message)
 
255
            code = error.code
 
256
            body = None
 
257
            effective_url = None
 
258
        else:
 
259
            error = None
 
260
            code = curl.getinfo(pycurl.HTTP_CODE)
 
261
            body = info["buffer"].getvalue()
 
262
            effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
 
263
        info["buffer"].close()
 
264
        info["callback"](HTTPResponse(
 
265
            request=info["request"], code=code, headers=info["headers"],
 
266
            body=body, effective_url=effective_url, error=error,
 
267
            request_time=time.time() - info["start_time"]))
 
268
 
 
269
 
 
270
class HTTPRequest(object):
 
271
    def __init__(self, url, method="GET", headers={}, body=None,
 
272
                 auth_username=None, auth_password=None,
 
273
                 connect_timeout=20.0, request_timeout=20.0,
 
274
                 if_modified_since=None, follow_redirects=True,
 
275
                 max_redirects=5, user_agent=None, use_gzip=True,
 
276
                 network_interface=None, streaming_callback=None,
 
277
                 header_callback=None, prepare_curl_callback=None):
 
278
        if if_modified_since:
 
279
            timestamp = calendar.timegm(if_modified_since.utctimetuple())
 
280
            headers["If-Modified-Since"] = email.utils.formatdate(
 
281
                timestamp, localtime=False, usegmt=True)
 
282
        if "Pragma" not in headers:
 
283
            headers["Pragma"] = ""
 
284
        self.url = _utf8(url)
 
285
        self.method = method
 
286
        self.headers = headers
 
287
        self.body = body
 
288
        self.auth_username = _utf8(auth_username)
 
289
        self.auth_password = _utf8(auth_password)
 
290
        self.connect_timeout = connect_timeout
 
291
        self.request_timeout = request_timeout
 
292
        self.follow_redirects = follow_redirects
 
293
        self.max_redirects = max_redirects
 
294
        self.user_agent = user_agent
 
295
        self.use_gzip = use_gzip
 
296
        self.network_interface = network_interface
 
297
        self.streaming_callback = streaming_callback
 
298
        self.header_callback = header_callback
 
299
        self.prepare_curl_callback = prepare_curl_callback
 
300
 
 
301
 
 
302
class HTTPResponse(object):
 
303
    def __init__(self, request, code, headers={}, body="", effective_url=None,
 
304
                 error=None, request_time=None):
 
305
        self.request = request
 
306
        self.code = code
 
307
        self.headers = headers
 
308
        self.body = body
 
309
        if effective_url is None:
 
310
            self.effective_url = request.url
 
311
        else:
 
312
            self.effective_url = effective_url
 
313
        if error is None:
 
314
            if self.code < 200 or self.code >= 300:
 
315
                self.error = HTTPError(self.code)
 
316
            else:
 
317
                self.error = None
 
318
        else:
 
319
            self.error = error
 
320
        self.request_time = request_time
 
321
 
 
322
    def rethrow(self):
 
323
        if self.error:
 
324
            raise self.error
 
325
 
 
326
    def __repr__(self):
 
327
        args = ",".join("%s=%r" % i for i in self.__dict__.iteritems())
 
328
        return "%s(%s)" % (self.__class__.__name__, args)
 
329
 
 
330
 
 
331
class HTTPError(Exception):
 
332
    def __init__(self, code, message=None):
 
333
        self.code = code
 
334
        message = message or httplib.responses.get(code, "Unknown")
 
335
        Exception.__init__(self, "HTTP %d: %s" % (self.code, message))
 
336
 
 
337
 
 
338
class CurlError(HTTPError):
 
339
    def __init__(self, errno, message):
 
340
        HTTPError.__init__(self, 599, message)
 
341
        self.errno = errno
 
342
 
 
343
 
 
344
def _curl_create(max_simultaneous_connections=None):
 
345
    curl = pycurl.Curl()
 
346
    if _log.isEnabledFor(logging.DEBUG):
 
347
        curl.setopt(pycurl.VERBOSE, 1)
 
348
        curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
 
349
    curl.setopt(pycurl.MAXCONNECTS, max_simultaneous_connections or 5)
 
350
    return curl
 
351
 
 
352
 
 
353
def _curl_setup_request(curl, request, buffer, headers):
 
354
    curl.setopt(pycurl.URL, request.url)
 
355
    curl.setopt(pycurl.HTTPHEADER,
 
356
                ["%s: %s" % i for i in request.headers.iteritems()])
 
357
    try:
 
358
        if request.header_callback:
 
359
            curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
 
360
        else:
 
361
            curl.setopt(pycurl.HEADERFUNCTION,
 
362
                        functools.partial(_curl_header_callback, headers))
 
363
    except:
 
364
        # Old version of curl; response will not include headers
 
365
        pass
 
366
    if request.streaming_callback:
 
367
        curl.setopt(pycurl.WRITEFUNCTION, request.streaming_callback)
 
368
    else:
 
369
        curl.setopt(pycurl.WRITEFUNCTION, buffer.write)
 
370
    curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
 
371
    curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
 
372
    curl.setopt(pycurl.CONNECTTIMEOUT, int(request.connect_timeout))
 
373
    curl.setopt(pycurl.TIMEOUT, int(request.request_timeout))
 
374
    if request.user_agent:
 
375
        curl.setopt(pycurl.USERAGENT, request.user_agent)
 
376
    else:
 
377
        curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
 
378
    if request.network_interface:
 
379
        curl.setopt(pycurl.INTERFACE, request.network_interface)
 
380
    if request.use_gzip:
 
381
        curl.setopt(pycurl.ENCODING, "gzip,deflate")
 
382
    else:
 
383
        curl.setopt(pycurl.ENCODING, "none")
 
384
 
 
385
    # Set the request method through curl's retarded interface which makes
 
386
    # up names for almost every single method
 
387
    curl_options = {
 
388
        "GET": pycurl.HTTPGET,
 
389
        "POST": pycurl.POST,
 
390
        "PUT": pycurl.UPLOAD,
 
391
        "HEAD": pycurl.NOBODY,
 
392
    }
 
393
    custom_methods = set(["DELETE"])
 
394
    for o in curl_options.values():
 
395
        curl.setopt(o, False)
 
396
    if request.method in curl_options:
 
397
        curl.unsetopt(pycurl.CUSTOMREQUEST)
 
398
        curl.setopt(curl_options[request.method], True)
 
399
    elif request.method in custom_methods:
 
400
        curl.setopt(pycurl.CUSTOMREQUEST, request.method)
 
401
    else:
 
402
        raise KeyError('unknown method ' + request.method)
 
403
 
 
404
    # Handle curl's cryptic options for every individual HTTP method
 
405
    if request.method in ("POST", "PUT"):
 
406
        request_buffer =  cStringIO.StringIO(request.body)
 
407
        curl.setopt(pycurl.READFUNCTION, request_buffer.read)
 
408
        if request.method == "POST":
 
409
            def ioctl(cmd):
 
410
                if cmd == curl.IOCMD_RESTARTREAD:
 
411
                    request_buffer.seek(0)
 
412
            curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
 
413
            curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
 
414
        else:
 
415
            curl.setopt(pycurl.INFILESIZE, len(request.body))
 
416
 
 
417
    if request.auth_username and request.auth_password:
 
418
        userpwd = "%s:%s" % (request.auth_username, request.auth_password)
 
419
        curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
 
420
        curl.setopt(pycurl.USERPWD, userpwd)
 
421
        _log.info("%s %s (username: %r)", request.method, request.url,
 
422
                     request.auth_username)
 
423
    else:
 
424
        curl.unsetopt(pycurl.USERPWD)
 
425
        _log.info("%s %s", request.method, request.url)
 
426
    if request.prepare_curl_callback is not None:
 
427
        request.prepare_curl_callback(curl)
 
428
 
 
429
 
 
430
def _curl_header_callback(headers, header_line):
 
431
    if header_line.startswith("HTTP/"):
 
432
        headers.clear()
 
433
        return
 
434
    if header_line == "\r\n":
 
435
        return
 
436
    parts = header_line.split(": ")
 
437
    if len(parts) != 2:
 
438
        _log.warning("Invalid HTTP response header line %r", header_line)
 
439
        return
 
440
    name = parts[0].strip()
 
441
    value = parts[1].strip()
 
442
    if name in headers:
 
443
        headers[name] = headers[name] + ',' + value
 
444
    else:
 
445
        headers[name] = value
 
446
 
 
447
 
 
448
def _curl_debug(debug_type, debug_msg):
 
449
    debug_types = ('I', '<', '>', '<', '>')
 
450
    if debug_type == 0:
 
451
        _log.debug('%s', debug_msg.strip())
 
452
    elif debug_type in (1, 2):
 
453
        for line in debug_msg.splitlines():
 
454
            _log.debug('%s %s', debug_types[debug_type], line)
 
455
    elif debug_type == 4:
 
456
        _log.debug('%s %r', debug_types[debug_type], debug_msg)
 
457
 
 
458
 
 
459
def _utf8(value):
 
460
    if value is None:
 
461
        return value
 
462
    if isinstance(value, unicode):
 
463
        return value.encode("utf-8")
 
464
    assert isinstance(value, str)
 
465
    return value