14
def _urlopen_with_retry(host, *args, **kwargs):
15
# Connecting to remote hosts is flaky. Make it more robust
16
# by retrying the connection several times.
14
def _retry_thrice(func, exc, *args, **kwargs):
19
return urllib2.urlopen(host, *args, **kwargs)
20
except urllib2.URLError, last_exc:
17
return func(*args, **kwargs)
24
def _wrap_with_retry_thrice(func, exc):
25
def wrapped(*args, **kwargs):
26
return _retry_thrice(func, exc, *args, **kwargs)
29
# Connecting to remote hosts is flaky. Make it more robust by retrying
30
# the connection several times.
31
_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
28
34
class AuthTests(unittest.TestCase):
148
154
## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
150
def _test_urls(self, urls, handlers, urlopen=_urlopen_with_retry):
156
def _test_urls(self, urls, handlers, retry=True):
154
160
debug = logging.getLogger("test_urllib2").debug
156
urllib2.install_opener(urllib2.build_opener(*handlers))
162
urlopen = urllib2.build_opener(*handlers).open
164
urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
159
167
if isinstance(url, tuple):
190
198
class TimeoutTest(unittest.TestCase):
191
199
def test_http_basic(self):
200
self.assertTrue(socket.getdefaulttimeout() is None)
192
201
u = _urlopen_with_retry("http://www.python.org")
193
202
self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
195
def test_http_NoneWithdefault(self):
196
prev = socket.getdefaulttimeout()
204
def test_http_default_timeout(self):
205
self.assertTrue(socket.getdefaulttimeout() is None)
206
socket.setdefaulttimeout(60)
208
u = _urlopen_with_retry("http://www.python.org")
210
socket.setdefaulttimeout(None)
211
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
213
def test_http_no_timeout(self):
214
self.assertTrue(socket.getdefaulttimeout() is None)
197
215
socket.setdefaulttimeout(60)
199
217
u = _urlopen_with_retry("http://www.python.org", timeout=None)
200
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
202
socket.setdefaulttimeout(prev)
219
socket.setdefaulttimeout(None)
220
self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
204
def test_http_Value(self):
222
def test_http_timeout(self):
205
223
u = _urlopen_with_retry("http://www.python.org", timeout=120)
206
224
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
208
def test_http_NoneNodefault(self):
209
u = _urlopen_with_retry("http://www.python.org", timeout=None)
210
self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
212
226
FTP_HOST = "ftp://ftp.mirror.nl/pub/mirror/gnu/"
214
228
def test_ftp_basic(self):
229
self.assertTrue(socket.getdefaulttimeout() is None)
215
230
u = _urlopen_with_retry(self.FTP_HOST)
216
231
self.assertTrue(u.fp.fp._sock.gettimeout() is None)
218
def test_ftp_NoneWithdefault(self):
219
prev = socket.getdefaulttimeout()
233
def test_ftp_default_timeout(self):
234
self.assertTrue(socket.getdefaulttimeout() is None)
235
socket.setdefaulttimeout(60)
237
u = _urlopen_with_retry(self.FTP_HOST)
239
socket.setdefaulttimeout(None)
240
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
242
def test_ftp_no_timeout(self):
243
self.assertTrue(socket.getdefaulttimeout() is None)
220
244
socket.setdefaulttimeout(60)
222
246
u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
223
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
225
socket.setdefaulttimeout(prev)
227
def test_ftp_NoneNodefault(self):
228
u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
248
socket.setdefaulttimeout(None)
229
249
self.assertTrue(u.fp.fp._sock.gettimeout() is None)
231
def test_ftp_Value(self):
251
def test_ftp_timeout(self):
232
252
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
233
253
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)