3
# Copyright 2009 Facebook
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
17
"""Blocking and non-blocking HTTP client implementations using pycurl."""
32
_log = logging.getLogger('tornado.httpclient')
34
class HTTPClient(object):
35
"""A blocking HTTP client backed with pycurl.
37
Typical usage looks like this:
39
http_client = httpclient.HTTPClient()
41
response = http_client.fetch("http://www.google.com/")
43
except httpclient.HTTPError, e:
46
fetch() can take a string URL or an HTTPRequest instance, which offers
47
more options, like executing POST/PUT/DELETE requests.
49
def __init__(self, max_simultaneous_connections=None):
50
self._curl = _curl_create(max_simultaneous_connections)
55
def fetch(self, request, **kwargs):
56
"""Executes an HTTPRequest, returning an HTTPResponse.
58
If an error occurs during the fetch, we raise an HTTPError.
60
if not isinstance(request, HTTPRequest):
61
request = HTTPRequest(url=request, **kwargs)
62
buffer = cStringIO.StringIO()
65
_curl_setup_request(self._curl, request, buffer, headers)
67
code = self._curl.getinfo(pycurl.HTTP_CODE)
68
if code < 200 or code >= 300:
70
effective_url = self._curl.getinfo(pycurl.EFFECTIVE_URL)
72
request=request, code=code, headers=headers,
73
body=buffer.getvalue(), effective_url=effective_url)
74
except pycurl.error, e:
80
class AsyncHTTPClient(object):
81
"""An non-blocking HTTP client backed with pycurl.
87
def handle_request(response):
89
print "Error:", response.error
92
ioloop.IOLoop.instance().stop()
94
http_client = httpclient.AsyncHTTPClient()
95
http_client.fetch("http://www.google.com/", handle_request)
96
ioloop.IOLoop.instance().start()
98
fetch() can take a string URL or an HTTPRequest instance, which offers
99
more options, like executing POST/PUT/DELETE requests.
101
The keyword argument max_clients to the AsyncHTTPClient constructor
102
determines the maximum number of simultaneous fetch() operations that
103
can execute in parallel on each IOLoop.
105
_ASYNC_CLIENTS = weakref.WeakKeyDictionary()
107
def __new__(cls, io_loop=None, max_clients=10,
108
max_simultaneous_connections=None):
109
# There is one client per IOLoop since they share curl instances
110
io_loop = io_loop or ioloop.IOLoop.instance()
111
if io_loop in cls._ASYNC_CLIENTS:
112
return cls._ASYNC_CLIENTS[io_loop]
114
instance = super(AsyncHTTPClient, cls).__new__(cls)
115
instance.io_loop = io_loop
116
instance._multi = pycurl.CurlMulti()
117
instance._curls = [_curl_create(max_simultaneous_connections)
118
for i in xrange(max_clients)]
119
instance._free_list = instance._curls[:]
120
instance._requests = collections.deque()
122
instance._events = {}
123
instance._added_perform_callback = False
124
instance._timeout = None
125
cls._ASYNC_CLIENTS[io_loop] = instance
129
"""Destroys this http client, freeing any file descriptors used.
130
Not needed in normal use, but may be helpful in unittests that
131
create and destroy http clients. No other methods may be called
132
on the AsyncHTTPClient after close().
134
del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
135
for curl in self._curls:
139
def fetch(self, request, callback, **kwargs):
140
"""Executes an HTTPRequest, calling callback with an HTTPResponse.
142
If an error occurs during the fetch, the HTTPResponse given to the
143
callback has a non-None error attribute that contains the exception
144
encountered during the request. You can call response.reraise() to
145
throw the exception (if any) in the callback.
147
if not isinstance(request, HTTPRequest):
148
request = HTTPRequest(url=request, **kwargs)
149
self._requests.append((request, callback))
150
self._add_perform_callback()
152
def _add_perform_callback(self):
153
if not self._added_perform_callback:
154
self.io_loop.add_callback(self._perform)
155
self._added_perform_callback = True
157
def _handle_events(self, fd, events):
158
self._events[fd] = events
159
self._add_perform_callback()
161
def _handle_timeout(self):
166
self._added_perform_callback = False
170
ret, num_handles = self._multi.perform()
171
if ret != pycurl.E_CALL_MULTI_PERFORM:
174
# Handle completed fetches
177
num_q, ok_list, err_list = self._multi.info_read()
181
for curl, errnum, errmsg in err_list:
182
self._finish(curl, errnum, errmsg)
187
# Start fetching new URLs
189
while self._free_list and self._requests:
191
curl = self._free_list.pop()
192
(request, callback) = self._requests.popleft()
195
"buffer": cStringIO.StringIO(),
197
"callback": callback,
198
"start_time": time.time(),
200
_curl_setup_request(curl, request, curl.info["buffer"],
201
curl.info["headers"])
202
self._multi.add_handle(curl)
204
if not started and not completed:
207
if self._timeout is not None:
208
self.io_loop.remove_timeout(self._timeout)
212
self._timeout = self.io_loop.add_timeout(
213
time.time() + 0.2, self._handle_timeout)
217
(readable, writable, exceptable) = self._multi.fdset()
219
fds[fd] = fds.get(fd, 0) | 0x1 | 0x2
221
fds[fd] = fds.get(fd, 0) | 0x4
222
for fd in exceptable:
223
fds[fd] = fds.get(fd, 0) | 0x8 | 0x10
228
self.io_loop.remove_handler(fd)
229
except (OSError, IOError), e:
230
if e[0] != errno.ENOENT:
233
for fd, events in fds.iteritems():
234
old_events = self._fds.get(fd, None)
235
if old_events is None:
236
self.io_loop.add_handler(fd, self._handle_events, events)
237
elif old_events != events:
239
self.io_loop.update_handler(fd, events)
240
except (OSError, IOError), e:
241
if e[0] == errno.ENOENT:
242
self.io_loop.add_handler(fd, self._handle_events,
248
def _finish(self, curl, curl_error=None, curl_message=None):
251
self._multi.remove_handle(curl)
252
self._free_list.append(curl)
254
error = CurlError(curl_error, curl_message)
260
code = curl.getinfo(pycurl.HTTP_CODE)
261
body = info["buffer"].getvalue()
262
effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
263
info["buffer"].close()
264
info["callback"](HTTPResponse(
265
request=info["request"], code=code, headers=info["headers"],
266
body=body, effective_url=effective_url, error=error,
267
request_time=time.time() - info["start_time"]))
270
class HTTPRequest(object):
271
def __init__(self, url, method="GET", headers={}, body=None,
272
auth_username=None, auth_password=None,
273
connect_timeout=20.0, request_timeout=20.0,
274
if_modified_since=None, follow_redirects=True,
275
max_redirects=5, user_agent=None, use_gzip=True,
276
network_interface=None, streaming_callback=None,
277
header_callback=None, prepare_curl_callback=None):
278
if if_modified_since:
279
timestamp = calendar.timegm(if_modified_since.utctimetuple())
280
headers["If-Modified-Since"] = email.utils.formatdate(
281
timestamp, localtime=False, usegmt=True)
282
if "Pragma" not in headers:
283
headers["Pragma"] = ""
284
self.url = _utf8(url)
286
self.headers = headers
288
self.auth_username = _utf8(auth_username)
289
self.auth_password = _utf8(auth_password)
290
self.connect_timeout = connect_timeout
291
self.request_timeout = request_timeout
292
self.follow_redirects = follow_redirects
293
self.max_redirects = max_redirects
294
self.user_agent = user_agent
295
self.use_gzip = use_gzip
296
self.network_interface = network_interface
297
self.streaming_callback = streaming_callback
298
self.header_callback = header_callback
299
self.prepare_curl_callback = prepare_curl_callback
302
class HTTPResponse(object):
303
def __init__(self, request, code, headers={}, body="", effective_url=None,
304
error=None, request_time=None):
305
self.request = request
307
self.headers = headers
309
if effective_url is None:
310
self.effective_url = request.url
312
self.effective_url = effective_url
314
if self.code < 200 or self.code >= 300:
315
self.error = HTTPError(self.code)
320
self.request_time = request_time
327
args = ",".join("%s=%r" % i for i in self.__dict__.iteritems())
328
return "%s(%s)" % (self.__class__.__name__, args)
331
class HTTPError(Exception):
332
def __init__(self, code, message=None):
334
message = message or httplib.responses.get(code, "Unknown")
335
Exception.__init__(self, "HTTP %d: %s" % (self.code, message))
338
class CurlError(HTTPError):
339
def __init__(self, errno, message):
340
HTTPError.__init__(self, 599, message)
344
def _curl_create(max_simultaneous_connections=None):
346
if _log.isEnabledFor(logging.DEBUG):
347
curl.setopt(pycurl.VERBOSE, 1)
348
curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
349
curl.setopt(pycurl.MAXCONNECTS, max_simultaneous_connections or 5)
353
def _curl_setup_request(curl, request, buffer, headers):
354
curl.setopt(pycurl.URL, request.url)
355
curl.setopt(pycurl.HTTPHEADER,
356
["%s: %s" % i for i in request.headers.iteritems()])
358
if request.header_callback:
359
curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
361
curl.setopt(pycurl.HEADERFUNCTION,
362
functools.partial(_curl_header_callback, headers))
364
# Old version of curl; response will not include headers
366
if request.streaming_callback:
367
curl.setopt(pycurl.WRITEFUNCTION, request.streaming_callback)
369
curl.setopt(pycurl.WRITEFUNCTION, buffer.write)
370
curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
371
curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
372
curl.setopt(pycurl.CONNECTTIMEOUT, int(request.connect_timeout))
373
curl.setopt(pycurl.TIMEOUT, int(request.request_timeout))
374
if request.user_agent:
375
curl.setopt(pycurl.USERAGENT, request.user_agent)
377
curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
378
if request.network_interface:
379
curl.setopt(pycurl.INTERFACE, request.network_interface)
381
curl.setopt(pycurl.ENCODING, "gzip,deflate")
383
curl.setopt(pycurl.ENCODING, "none")
385
# Set the request method through curl's retarded interface which makes
386
# up names for almost every single method
388
"GET": pycurl.HTTPGET,
390
"PUT": pycurl.UPLOAD,
391
"HEAD": pycurl.NOBODY,
393
custom_methods = set(["DELETE"])
394
for o in curl_options.values():
395
curl.setopt(o, False)
396
if request.method in curl_options:
397
curl.unsetopt(pycurl.CUSTOMREQUEST)
398
curl.setopt(curl_options[request.method], True)
399
elif request.method in custom_methods:
400
curl.setopt(pycurl.CUSTOMREQUEST, request.method)
402
raise KeyError('unknown method ' + request.method)
404
# Handle curl's cryptic options for every individual HTTP method
405
if request.method in ("POST", "PUT"):
406
request_buffer = cStringIO.StringIO(request.body)
407
curl.setopt(pycurl.READFUNCTION, request_buffer.read)
408
if request.method == "POST":
410
if cmd == curl.IOCMD_RESTARTREAD:
411
request_buffer.seek(0)
412
curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
413
curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
415
curl.setopt(pycurl.INFILESIZE, len(request.body))
417
if request.auth_username and request.auth_password:
418
userpwd = "%s:%s" % (request.auth_username, request.auth_password)
419
curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
420
curl.setopt(pycurl.USERPWD, userpwd)
421
_log.info("%s %s (username: %r)", request.method, request.url,
422
request.auth_username)
424
curl.unsetopt(pycurl.USERPWD)
425
_log.info("%s %s", request.method, request.url)
426
if request.prepare_curl_callback is not None:
427
request.prepare_curl_callback(curl)
430
def _curl_header_callback(headers, header_line):
431
if header_line.startswith("HTTP/"):
434
if header_line == "\r\n":
436
parts = header_line.split(": ")
438
_log.warning("Invalid HTTP response header line %r", header_line)
440
name = parts[0].strip()
441
value = parts[1].strip()
443
headers[name] = headers[name] + ',' + value
445
headers[name] = value
448
def _curl_debug(debug_type, debug_msg):
449
debug_types = ('I', '<', '>', '<', '>')
451
_log.debug('%s', debug_msg.strip())
452
elif debug_type in (1, 2):
453
for line in debug_msg.splitlines():
454
_log.debug('%s %s', debug_types[debug_type], line)
455
elif debug_type == 4:
456
_log.debug('%s %r', debug_types[debug_type], debug_msg)
462
if isinstance(value, unicode):
463
return value.encode("utf-8")
464
assert isinstance(value, str)