1
# Copyright 2008-2015 Canonical
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU Affero General Public License as
5
# published by the Free Software Foundation, either version 3 of the
6
# License, or (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
# For further info, check http://launchpad.net/filesync-server
18
"""Assorted stuff used by test_action_queue."""
26
from StringIO import StringIO
27
from functools import partial
32
from dbus.mainloop.glib import DBusGMainLoop
33
from twisted.internet import defer, reactor
34
from twisted.names import dns
35
from twisted.names.common import ResolverBase
36
from twisted.python.failure import Failure
38
from ubuntuone.storage.server.testing.testcase import (
43
from config import config
44
from backends.filesync.data import model
45
from backends.filesync.data.testing.testcase import StorageDALTestCase
46
from ubuntuone.storage.server import ssl_proxy
47
from ubuntuone.storageprotocol import request, sharersp, client
48
from ubuntuone.storageprotocol.content_hash import content_hash_factory, crc32
49
from ubuntuone.storage.server.auth import SimpleAuthProvider
50
from ubuntuone import platform
51
from ubuntuone.syncdaemon.action_queue import ActionQueue, ActionQueueCommand
52
from ubuntuone.syncdaemon import main, volume_manager, tritcask, logger
53
from ubuntuone.syncdaemon.event_queue import EventQueue
54
from ubuntuone.syncdaemon.filesystem_manager import FileSystemManager
55
from ubuntuone.syncdaemon.sync import Sync
56
from ubuntuone.syncdaemon.marker import MDMarker
58
ROOT_DIR = os.getcwd()
59
SD_CONFIG_DIR = ROOT_DIR + "/.sourcecode/ubuntuone-client/data"
60
SD_CONFIGS = [os.path.join(SD_CONFIG_DIR, 'syncdaemon.conf'),
61
os.path.join(SD_CONFIG_DIR, 'syncdaemon-dev.conf')]
65
HASH_EMPTY = model.EMPTY_CONTENT_HASH
69
TESTS_DIR = os.getcwd() + "/tmp/sync_tests"
73
"""Return current time with HH:MM:SS,xxx where xxx are msec."""
75
p1 = time.strftime("%H:%M:%S", time.localtime(t))
76
p2 = ("%.3f" % (t % 1))[2:]
77
return "%s,%s" % (p1, p2)
80
class NoCloseStringIO(StringIO):
81
"""a stringio subclass that doesnt destroy content on close."""
87
class ReallyAttentiveListener(object):
88
"""A listener that listens to everything and writes it down very tidily."""
93
def __getattr__(self, attr):
94
if attr.startswith('handle_'):
95
return partial(self.write_it_down, attr[7:])
96
# probably AttributeError, but just in case
97
return super(ReallyAttentiveListener, self).__getattr__(attr)
99
def write_it_down(self, attr, *a, **kw):
100
"""Write the event down."""
101
self.q.append((attr, kw))
103
def get_svhash_for(self, share_id, node_id):
105
find the latest SV_HASH_NEW for the given node
107
for ev, kw in reversed(self.q):
108
if ev == 'SV_HASH_NEW':
109
if kw.get('node_id') == node_id \
110
and kw.get('share_id') == share_id:
111
return kw.get('hash')
112
raise ValueError("no hash for %s in %s" % (node_id, self.q))
114
def get_rescan_from_scratch_for(self, volume_id):
115
"""Find the last AQ_RESCAN_FROM_SCRATCH_OK and return the kwargs."""
116
for ev, kw in reversed(self.q):
117
if ev == 'AQ_RESCAN_FROM_SCRATCH_OK':
118
if kw.get('volume_id') == volume_id:
120
raise ValueError("no AQ_RESCAN_FROM_SCRATCH_OK for %s in %s" %
123
def get_id_for_marker(self, marker, default=_marker):
125
find the latest AQ_(FILE|DIR)_NEW_OK for the given marker
127
for ev, kw in reversed(self.q):
128
if ev in ('AQ_FILE_NEW_OK', 'AQ_DIR_NEW_OK', 'AQ_CREATE_SHARE_OK'):
129
if kw.get('marker') == marker:
130
if ev == 'AQ_CREATE_SHARE_OK':
131
return kw.get('share_id')
133
return kw.get('new_id')
134
if default is _marker:
135
raise ValueError("no uuid for marker %s" % marker)
140
class DumbVolumeManager(volume_manager.VolumeManager):
141
"""A real VolumeManager but with dummy refresh_* and (a few) handle_*."""
143
def refresh_shares(self):
146
def refresh_volumes(self):
149
def on_server_root(self, root):
150
"""Asociate server root and nothing more."""
151
self.log.debug('on_server_root(%s)', root)
154
def handle_AQ_LIST_VOLUMES_ERROR(self, error):
157
def handle_AQ_LIST_SHARES_ERROR(self, error):
160
def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
164
class ReallyFakeMain(main.Main):
166
This main is so fake, it breaks nearly everything.
168
def __init__(self, port, root_dir, data_dir, partials_dir,
170
self.root_dir = root_dir
171
self.shares_dir = os.path.join(os.path.dirname(root_dir), 'shares_dir')
172
self.shares_dir_link = os.path.join(root_dir, 'shares_link')
173
self.data_dir = data_dir
174
self.fsmdir = os.path.join(data_dir, 'fsmdir')
175
self.partials_dir = partials_dir
176
self.tritcask_dir = os.path.join(self.data_dir, 'tritcask')
177
self.hash_q = type('fake hash queue', (),
178
{'empty': staticmethod(lambda: True),
179
'__len__': staticmethod(lambda: 0)})()
180
self.logger = logging.getLogger('fsyncsrvr.SyncDaemon.Main')
181
self.db = tritcask.Tritcask(self.tritcask_dir)
182
self.vm = DumbVolumeManager(self)
183
self.fs = FileSystemManager(self.fsmdir, self.partials_dir, self.vm,
185
self.event_q = EventQueue(self.fs)
186
self.event_q.subscribe(self.vm)
187
self.fs.register_eq(self.event_q)
188
self.sync = Sync(self)
189
self.action_q = ActionQueue(self.event_q, self, '127.0.0.1', port,
190
dns_srv, disable_ssl_verify=True)
191
self.state_manager = main.StateManager(self, handshake_timeout=30)
192
self.state_manager.connection.waiting_timeout = .1
195
def server_rescan(self):
196
"""Fake server rescan that doesn't actually rescan anything."""
197
return self.vm.server_rescan()
199
def local_rescan(self):
203
def failure_ignore(*failures):
204
"""A decorator to ignore the failure.
206
It marks a test method such that failures during the test will not be
207
marked as failures of the test itself, but simply ignored.
210
"""The wrapper function."""
211
func.failure_ignore = failures
216
def failure_expected(failure):
217
"""A decorator to expect a failure.
219
It marks a test method such that failures during the test will not be
220
marked as failures of the test itself, but rather the opposite: it is
221
the lack of the failure that is a failure.
224
"""The wrapper function."""
225
func.failure_expected = failure
230
class WaitingHelpingHandler(object):
231
"""An auxiliary class that helps wait for events."""
233
def __init__(self, event_queue, waiting_events, waiting_kwargs,
235
self.deferred = defer.Deferred()
236
self.event_queue = event_queue
238
self.waiting_events = waiting_events
239
self.waiting_kwargs = waiting_kwargs
240
event_queue.subscribe(self)
242
def handle_default(self, event, *args, **kwargs):
243
"""Got an event: fire if it's one we want"""
244
if event in self.waiting_events:
246
for wv in self.waiting_kwargs.values():
250
for wk, wv in self.waiting_kwargs.items():
251
if not (wk in kwargs and kwargs[wk] == wv):
256
"""start fire the callback"""
257
self.event_queue.unsubscribe(self)
258
reactor.callLater(0, lambda: self.deferred.callback(self.result))
261
# The following class is a duplicated from
262
# ubuntuone-client/tests/platform/linux/test_dbus.py
263
# will be removed when bug #917285 is resolved
264
class FakeNetworkManager(dbus.service.Object):
265
"""A fake NetworkManager that only emits StatusChanged signal."""
268
path = '/org/freedesktop/NetworkManager'
270
def __init__(self, bus):
272
self.bus.request_name('org.freedesktop.NetworkManager',
273
flags=dbus.bus.NAME_FLAG_REPLACE_EXISTING |
274
dbus.bus.NAME_FLAG_DO_NOT_QUEUE |
275
dbus.bus.NAME_FLAG_ALLOW_REPLACEMENT)
276
self.busName = dbus.service.BusName('org.freedesktop.NetworkManager',
278
dbus.service.Object.__init__(self, bus_name=self.busName,
279
object_path=self.path)
282
"""Shutdown the fake NetworkManager."""
283
self.busName.get_bus().release_name(self.busName.get_name())
284
self.remove_from_connection()
286
@dbus.service.method(dbus.PROPERTIES_IFACE,
287
in_signature='ss', out_signature='v',
288
async_callbacks=('reply_handler', 'error_handler'))
289
def Get(self, interface, propname, reply_handler=None, error_handler=None):
290
"""Fake dbus's Get method to get at the State property."""
292
reply_handler(getattr(self, propname, None))
297
class TestWithDatabase(BaseProtocolTestCase, StorageDALTestCase):
298
"""Hook up Trial, ORMTestCase, and our very own s4 and storage servers.
300
Large chunks have been copy-pasted from
301
server.testing.testcase.TestWithDatabase, hence the name.
303
auth_provider_class = SimpleAuthProvider
304
_do_teardown_eq = False
305
_ignore_cancelled_downloads = False
307
ssl_proxy_heartbeat_interval = 0
309
@defer.inlineCallbacks
312
yield super(TestWithDatabase, self).setUp()
315
# Patch AQ's deferreds, to support these tests still being run
316
# in Lucid, but while code calls .cancel() on them
317
# Remove this code when filesync project is taken to Precise.
318
defer.Deferred.cancel = lambda self: None
319
defer.DeferredList.cancel = lambda self: None
321
# Set up the main loop and bus connection
322
self.loop = DBusGMainLoop(set_as_default=True)
323
bus_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS', None)
324
self.bus = dbus.bus.BusConnection(address_or_type=bus_address,
327
# Monkeypatch the dbus.SessionBus/SystemBus methods, to ensure we
328
# always point at our own private bus instance.
329
self.patch(dbus, 'SessionBus', lambda: self.bus)
330
self.patch(dbus, 'SystemBus', lambda: self.bus)
332
self.nm = FakeNetworkManager(self.bus)
333
self.addCleanup(self.nm.shutdown)
335
# start the ssl proxy
336
self.ssl_service = ssl_proxy.ProxyService(self.ssl_cert, self.ssl_key,
339
"localhost", self.port,
341
self.patch(config.ssl_proxy, "heartbeat_interval",
342
self.ssl_proxy_heartbeat_interval)
343
yield self.ssl_service.startService()
345
# these tests require a "test" bucket to be avaialble,
346
# but don't create it using the s3 api...
347
self.s4_site.resource._add_bucket("test")
348
if os.path.exists(self.tmpdir):
349
self.rmtree(self.tmpdir)
352
(u'jack', u'jackpass', u'shard0'),
353
(u'jane', u'janepass', u'shard1'),
354
(u'john', u'johnpass', u'shard2'),
356
self.access_tokens = {}
357
self.storage_users = {}
358
for username, password, shard in _user_data:
359
self.access_tokens[username] = {
360
'username': username,
361
'password': password,
363
user = create_test_user(username=username,
364
password=password, shard_id=shard)
365
self.storage_users[username] = user
368
# override and cleanup user config
369
self.old_get_config_files = main.config.get_config_files
370
main.config.get_config_files = lambda: SD_CONFIGS
371
main.config._user_config = None
372
user_config = main.config.get_user_config()
373
for section in user_config.sections():
374
user_config.remove_section(section)
375
main.config.get_user_config().set_throttling_read_limit(-1)
376
main.config.get_user_config().set_throttling_write_limit(-1)
377
main.config.get_user_config().set_autoconnect(False)
379
# logging can not be configured dinamically, touch the general logger
380
# to get one big file and be able to get the logs if failure
382
logger.set_max_bytes(0)
383
yield self.client_setup()
387
"""Override default tmpdir property."""
392
main.config.get_config_files = self.old_get_config_files
393
d = super(TestWithDatabase, self).tearDown()
394
d.addCallback(lambda _: self.ssl_service.stopService())
395
if self._do_teardown_eq:
396
d.addCallback(lambda _: self.eq.shutdown())
397
d.addCallback(lambda _: self.main.state_manager.shutdown())
398
d.addCallback(lambda _: self.main.db.shutdown())
399
test_method = getattr(self, self._testMethodName)
400
failure_expected = getattr(test_method, 'failure_expected', False)
401
if failure_expected and failure_expected != self.failed:
402
msg = "test method %r should've failed with %s and " \
403
% (self._testMethodName, failure_expected)
405
msg += 'instead failed with: %s' % (self.failed,)
408
d.addCallback(lambda _: Failure(AssertionError(msg)))
409
if self.failed and failure_expected != self.failed:
410
failure_ignore = getattr(test_method, 'failure_ignore', ())
411
if self.failed and self.failed not in failure_ignore:
412
msg = "test method %r failed with: %s" \
413
% (self._testMethodName, self.failed)
414
d.addCallback(lambda _: Failure(AssertionError(msg)))
416
def temp_dir_cleanup(_):
417
"""Clean up tmpdir."""
418
if os.path.exists(self.tmpdir):
419
self.rmtree(self.tmpdir)
421
d.addBoth(temp_dir_cleanup)
424
def mktemp(self, name='temp'):
425
""" Customized mktemp that accepts an optional name argument. """
426
tempdir = os.path.join(self.tmpdir, name)
427
if os.path.exists(tempdir):
430
self.addCleanup(self.rmtree, tempdir)
433
def rmtree(self, path):
434
"""Custom rmtree that handle ro parent(s) and childs."""
435
# change perms to rw, so we can delete the temp dir
436
if path != self.__root:
437
platform.set_dir_readwrite(os.path.dirname(path))
438
if not platform.can_write(path):
439
platform.set_dir_readwrite(path)
441
for dirpath, dirs, files in os.walk(path):
443
adir = os.path.join(dirpath, adir)
444
if not platform.can_write(adir):
445
platform.set_dir_readwrite(adir)
449
def client_setup(self):
450
"""Create the clients needed for the tests."""
451
self._do_teardown_eq = True
452
root_dir = self.mktemp('fake_root_dir')
453
data_dir = self.mktemp('fake_data_dir')
454
partials = self.mktemp('partials_dir')
455
self.main = ReallyFakeMain(self.port, root_dir,
456
data_dir, partials, self.dns_srv)
457
self.state = self.main.state_manager
458
self.eq = self.main.event_q
459
self.listener = ReallyAttentiveListener()
460
self.eq.subscribe(self.listener)
461
self.eq.subscribe(self)
462
self.aq = self.main.action_q
467
return self.ssl_service.port
469
def nuke_client_method(self, method_name, callback,
470
method_retval_cb=defer.Deferred):
472
Nuke the client method, call the callback, and de-nuke it
474
old_method = getattr(self.aq.client, method_name)
475
setattr(self.aq.client, method_name,
476
lambda *_, **__: method_retval_cb())
480
if self.aq.client is not None:
481
setattr(self.aq.client, method_name, old_method)
484
def wait_for(self, *waiting_events, **waiting_kwargs):
485
"""defer until event appears"""
486
return WaitingHelpingHandler(self.main.event_q,
488
waiting_kwargs).deferred
490
def wait_for_cb(self, *waiting_events, **waiting_kwargs):
492
Returns a callable that returns a deferred that fires when an event
495
return lambda result: WaitingHelpingHandler(self.main.event_q,
500
def handle_default(self, event, *args, **kwargs):
502
Handle events. In particular, catch errors and store them
503
under the 'failed' attribute
505
if 'error' in kwargs:
506
self.failed = kwargs['error']
508
def handle_AQ_DOWNLOAD_CANCELLED(self, *args, **kwargs):
509
"""handle the case of CANCEL"""
510
if not self._ignore_cancelled_downloads:
511
self.failed = 'CANCELLED'
513
def wait_for_nirvana(self, last_event_interval=.5):
514
"""Get a deferred that will fire when there are no more
515
events or transfers."""
516
return self.main.wait_for_nirvana(last_event_interval)
518
def connect(self, do_connect=True):
519
"""Encourage the AQ to connect."""
520
d = self.wait_for('SYS_CONNECTION_MADE')
521
self.eq.push('SYS_INIT_DONE')
522
self.eq.push('SYS_LOCAL_RESCAN_DONE')
523
self.eq.push('SYS_USER_CONNECT',
524
access_token=self.access_tokens['jack'])
526
self.eq.push('SYS_NET_CONNECTED')
530
class _Placeholder(object):
531
"""Object you can use in eq comparison w'out knowing equality with what."""
532
def __init__(self, label):
536
return "<placeholder for %s>" % self.label
539
class _HashPlaceholder(_Placeholder):
540
"""A placeholder for a hash"""
541
def __eq__(self, other):
542
return all((isinstance(other, str),
543
other.startswith('sha1:'),
547
class _UUIDPlaceholder(_Placeholder):
548
"""A placeholder for an uuid"""
550
def __init__(self, label, exceptions=()):
551
super(_UUIDPlaceholder, self).__init__(label)
552
self.exceptions = exceptions
554
def __eq__(self, other):
555
if other in self.exceptions:
558
str(uuid.UUID(other))
565
class _TypedPlaceholder(_Placeholder):
566
"""A placeholder for an object of a certain type"""
568
def __init__(self, label, a_type):
569
super(_TypedPlaceholder, self).__init__(label)
572
def __eq__(self, other):
573
return isinstance(other, self.type)
576
class _ShareListPlaceholder(_Placeholder):
577
"""A placeholder for a list of shares"""
579
def __init__(self, label, shares):
580
super(_ShareListPlaceholder, self).__init__(label)
583
def __cmp__(self, other):
584
return cmp(self.shares, other.shares)
586
aHash = _HashPlaceholder('a hash')
587
anUUID = _UUIDPlaceholder('an UUID')
588
aShareUUID = _UUIDPlaceholder('a share UUID', ('',))
589
anEmptyShareList = _ShareListPlaceholder('an empty share list', [])
590
aShareInfo = _TypedPlaceholder('a share info', sharersp.NotifyShareHolder)
591
aGetContentRequest = _TypedPlaceholder('a get_content request',
593
anAQCommand = _TypedPlaceholder('an action queue command',
597
class TestBase(TestWithDatabase):
598
"""Base class for TestMeta and TestContent."""
601
@defer.inlineCallbacks
604
yield super(TestBase, self).setUp()
606
self.client = self.aq.client
607
self.assertFalse(self.client.factory.connector is None)
608
self.root = yield self.client.get_root()
609
yield self.wait_for_nirvana(.5)
614
return super(TestBase, self).tearDown()
616
def assertEvent(self, event, msg=None):
617
"""Check if an event happened."""
618
self.assertIn(event, self.listener.q, msg)
620
def assertInQ(self, deferred, containee, msg=None):
621
"""Deferredly assert that the containee is in the event queue.
623
Containee can be callable, in which case it's called before asserting.
626
"""The check itself."""
627
ce = containee() if callable(containee) else containee
628
self.assertIn(ce, self.listener.q, msg)
629
deferred.addCallback(check_queue)
631
def assertOneInQ(self, deferred, containees, msg=None):
632
"""Deferredly assert that one of the containee is in the event queue.
634
Containee can be callable, in which case it's called before asserting.
636
def check_queue(_, msg=msg):
637
"""The check itself."""
638
ce = containees() if callable(containees) else containees
640
if i in self.listener.q:
644
msg = 'None of %s were found in %s' % (ce, self.listener.q)
645
raise AssertionError(msg)
646
deferred.addCallback(check_queue)
648
def assertNotInQ(self, deferred, containee, msg=None):
649
"""Deferredly assert that the containee is not in the event queue.
651
Containee can be callable, in which case it's called before asserting.
654
"""The check itself."""
655
ce = containee() if callable(containee) else containee
656
self.assertNotIn(ce, self.listener.q, msg)
657
deferred.addCallback(check_queue)
659
@defer.inlineCallbacks
660
def _gmk(self, what, name, parent, default_id, path):
661
"""Generalized _mk* helper."""
663
path = name + str(uuid.uuid4())
666
parent_path = self.main.fs.get_by_node_id(request.ROOT, parent).path
667
mdid = self.main.fs.create(os.path.join(parent_path, path),
669
marker = MDMarker(mdid)
670
meth = getattr(self.aq, 'make_' + what)
671
meth(request.ROOT, parent, name, marker, mdid)
672
yield self.wait_for('AQ_FILE_NEW_OK', 'AQ_FILE_NEW_ERROR',
673
'AQ_DIR_NEW_OK', 'AQ_DIR_NEW_ERROR',
676
node_id = self.listener.get_id_for_marker(marker, default_id)
677
defer.returnValue((mdid, node_id))
679
def _mkdir(self, name, parent=None, default_id=_marker, path=None):
680
"""Create a dir, optionally storing the resulting uuid."""
681
return self._gmk('dir', name, parent, default_id, path)
683
def _mkfile(self, name, parent=None, default_id=_marker, path=None):
684
"""Create a file, optionally storing the resulting uuid."""
685
return self._gmk('file', name, parent, default_id, path)
688
class TestContentBase(TestBase):
689
"""Reusable utility methods born out of TestContent."""
691
def _get_data(self, data_len=1000):
692
"""Get the hash, crc and size of a chunk of data."""
693
data = os.urandom(data_len) # not terribly compressible
694
hash_object = content_hash_factory()
695
hash_object.update(data)
696
hash_value = hash_object.content_hash()
697
crc32_value = crc32(data)
699
return NoCloseStringIO(data), data, hash_value, crc32_value, size
701
def _mk_file_w_content(self, filename='hola', data_len=1000):
702
"""Make a file and dump some content in it."""
703
fobj, data, hash_value, crc32_value, size = self._get_data(data_len)
706
@defer.inlineCallbacks
708
"""Do the upload, later."""
709
mdid, node_id = yield self._mkfile(filename, path=path)
710
wait_upload = self.wait_for('AQ_UPLOAD_FINISHED',
711
share_id=request.ROOT, hash=hash_value,
713
orig_open_file = self.main.fs.open_file
714
self.main.fs.open_file = lambda _: fobj
716
self.aq.upload(request.ROOT, node_id, NO_CONTENT_HASH,
717
hash_value, crc32_value, size, mdid)
718
self.main.fs.open_file = orig_open_file
720
defer.returnValue((mdid, node_id))
722
return hash_value, crc32_value, data, worker()
725
class FakeResolver(ResolverBase):
726
"""A fake resolver that returns two fixed hosts.
728
Those are fs-1.ubuntuone.com and fs-1.server.com both with port=443
731
def _lookup(self, name, cls, qtype, timeout):
732
""" do the fake lookup. """
733
hostname = 'fs-%s.server.com'
734
rr = dns.RRHeader(name=hostname % '0', type=qtype, cls=cls, ttl=60,
735
payload=dns.Record_SRV(target=hostname % '0',
737
rr1 = dns.RRHeader(name=hostname % '1', type=qtype, cls=cls, ttl=60,
738
payload=dns.Record_SRV(target=hostname % '1',
743
return defer.succeed((results, authority, addtional))
746
class MethodInterferer(object):
747
"""Helper to nuke a client method and restore it later."""
749
def __init__(self, obj, meth):
754
def insert_after(self, info, func):
755
"""Runs func after running the replaced method."""
756
self.old = getattr(self.obj, self.meth)
758
def middle(*args, **kwargs):
759
"""Helper/worker func."""
760
r = self.old(*args, **kwargs)
761
func(*args, **kwargs)
763
setattr(self.obj, self.meth, middle)
766
def insert_before(self, info, func):
767
"""Runs func before running the replaced method."""
768
self.old = getattr(self.obj, self.meth)
770
def middle(*args, **kwargs):
771
"""Helper/worker func."""
772
if func(*args, **kwargs):
773
return self.old(*args, **kwargs)
774
setattr(self.obj, self.meth, middle)
777
def nuke(self, info, func=None):
778
"""Nukes the method"""
779
self.old = getattr(self.obj, self.meth)
781
func = lambda *_, **__: None
782
setattr(self.obj, self.meth, func)
785
def restore(self, info=None):
786
"""Restores the original method."""
788
m = "the old method is None (hint: called restore before nuke)"
790
setattr(self.obj, self.meth, self.old)
793
def pause(self, func=None):
794
"""Pauses a method execution that can be played later."""
795
self.old = getattr(self.obj, self.meth)
796
play = defer.Deferred()
798
@defer.inlineCallbacks
800
"""Play it in the middle."""
804
setattr(self.obj, self.meth, self.old)
805
result = yield defer.maybeDeferred(self.old, *a, **k)
806
defer.returnValue(result)
808
setattr(self.obj, self.meth, middle)
809
return lambda: play.callback(True)
812
class NukeAQClient(object):
813
"""Helper to nuke a client method and restore it later."""
815
def __init__(self, aq, meth):
820
def nuke(self, info, func=None):
821
"""Nukes the method"""
822
self.old = getattr(self.aq.client, self.meth)
824
func = lambda *_, **__: defer.Deferred
825
setattr(self.aq.client, self.meth, func)
828
def restore(self, info):
829
"""Restores the original method."""
831
m = "the old method is None (hint: called restore before nuke)"
833
if self.aq.client is not None:
834
setattr(self.aq.client, self.meth, self.old)
838
class FakeGetContent(object):
839
"""Helper class that haves self.deferred"""
840
def __init__(self, deferred, share, node, hash):
842
self.deferred = deferred
843
self.share_id = share
845
self.server_hash = hash
848
class FakeFailure(object):
849
"""An object that when compared to a Failure, checks its message."""
851
def __init__(self, message):
852
self._message = message
854
def __eq__(self, other):
855
"""Checks using the message of 'other' if any."""
856
error_message_method = getattr(other, 'getErrorMessage', None)
857
if error_message_method:
858
other_message = error_message_method()
859
return other_message == self._message