267
269
setattr(module, attr, value)
268
270
for module, attr in deletes:
269
271
delattr(module, attr)
274
def fake_http_connect(*code_iter, **kwargs):
276
class FakeConn(object):
278
def __init__(self, status, etag=None, body='', timestamp='1',
281
if expect_status is None:
282
self.expect_status = self.status
284
self.expect_status = expect_status
286
self.host = '1.2.3.4'
292
self.timestamp = timestamp
294
def getresponse(self):
295
if kwargs.get('raise_exc'):
296
raise Exception('test')
297
if kwargs.get('raise_timeout_exc'):
302
if self.expect_status == -2:
303
raise HTTPException()
304
if self.expect_status == -3:
306
if self.expect_status == -4:
310
def getheaders(self):
313
if isinstance(self.body, str):
314
etag = '"' + md5(self.body).hexdigest() + '"'
316
etag = '"68b329da9893e34099c7d8ad5cb9c940"'
318
headers = {'content-length': len(self.body),
319
'content-type': 'x-application/test',
320
'x-timestamp': self.timestamp,
321
'last-modified': self.timestamp,
322
'x-object-meta-test': 'testing',
325
'x-account-container-count': kwargs.get('count', 12345)}
326
if not self.timestamp:
327
del headers['x-timestamp']
329
if container_ts_iter.next() is False:
330
headers['x-container-timestamp'] = '1'
331
except StopIteration:
334
headers['content-length'] = '4'
335
if 'headers' in kwargs:
336
headers.update(kwargs['headers'])
337
return headers.items()
339
def read(self, amt=None):
346
self.body = self.body[amt:]
349
def send(self, amt=None):
351
if self.received < 4:
355
def getheader(self, name, default=None):
356
return dict(self.getheaders()).get(name.lower(), default)
358
timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter))
359
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
360
x = kwargs.get('missing_container', [False] * len(code_iter))
361
if not isinstance(x, (tuple, list)):
362
x = [x] * len(code_iter)
363
container_ts_iter = iter(x)
364
code_iter = iter(code_iter)
365
static_body = kwargs.get('body', None)
366
body_iter = kwargs.get('body_iter', None)
368
body_iter = iter(body_iter)
370
def connect(*args, **ckwargs):
371
if 'give_content_type' in kwargs:
372
if len(args) >= 7 and 'Content-Type' in args[6]:
373
kwargs['give_content_type'](args[6]['Content-Type'])
375
kwargs['give_content_type']('')
376
if 'give_connect' in kwargs:
377
kwargs['give_connect'](*args, **ckwargs)
378
status = code_iter.next()
379
if isinstance(status, tuple):
380
status, expect_status = status
382
expect_status = status
383
etag = etag_iter.next()
384
timestamp = timestamps_iter.next()
387
raise HTTPException()
388
if body_iter is None:
389
body = static_body or ''
391
body = body_iter.next()
392
return FakeConn(status, etag, body=body, timestamp=timestamp,
393
expect_status=expect_status)