~facundo/filesync-server/trunk

« back to all changes in this revision

Viewing changes to src/server/testing/aq_helpers.py

  • Committer: Facundo Batista
  • Date: 2015-08-05 13:10:02 UTC
  • Revision ID: facundo@taniquetil.com.ar-20150805131002-he7b7k704d8o7js6
First released version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2008-2015 Canonical
 
2
#
 
3
# This program is free software: you can redistribute it and/or modify
 
4
# it under the terms of the GNU Affero General Public License as
 
5
# published by the Free Software Foundation, either version 3 of the
 
6
# License, or (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU Affero General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Affero General Public License
 
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 
15
#
 
16
# For further info, check  http://launchpad.net/filesync-server
 
17
 
 
18
"""Assorted stuff used by test_action_queue."""
 
19
 
 
20
import logging
 
21
import os
 
22
import shutil
 
23
import time
 
24
import uuid
 
25
 
 
26
from StringIO import StringIO
 
27
from functools import partial
 
28
 
 
29
import dbus
 
30
import dbus.service
 
31
 
 
32
from dbus.mainloop.glib import DBusGMainLoop
 
33
from twisted.internet import defer, reactor
 
34
from twisted.names import dns
 
35
from twisted.names.common import ResolverBase
 
36
from twisted.python.failure import Failure
 
37
 
 
38
from ubuntuone.storage.server.testing.testcase import (
 
39
    BaseProtocolTestCase,
 
40
    create_test_user,
 
41
)
 
42
 
 
43
from config import config
 
44
from backends.filesync.data import model
 
45
from backends.filesync.data.testing.testcase import StorageDALTestCase
 
46
from ubuntuone.storage.server import ssl_proxy
 
47
from ubuntuone.storageprotocol import request, sharersp, client
 
48
from ubuntuone.storageprotocol.content_hash import content_hash_factory, crc32
 
49
from ubuntuone.storage.server.auth import SimpleAuthProvider
 
50
from ubuntuone import platform
 
51
from ubuntuone.syncdaemon.action_queue import ActionQueue, ActionQueueCommand
 
52
from ubuntuone.syncdaemon import main, volume_manager, tritcask, logger
 
53
from ubuntuone.syncdaemon.event_queue import EventQueue
 
54
from ubuntuone.syncdaemon.filesystem_manager import FileSystemManager
 
55
from ubuntuone.syncdaemon.sync import Sync
 
56
from ubuntuone.syncdaemon.marker import MDMarker
 
57
 
 
58
ROOT_DIR = os.getcwd()
 
59
SD_CONFIG_DIR = ROOT_DIR + "/.sourcecode/ubuntuone-client/data"
 
60
SD_CONFIGS = [os.path.join(SD_CONFIG_DIR, 'syncdaemon.conf'),
 
61
              os.path.join(SD_CONFIG_DIR, 'syncdaemon-dev.conf')]
 
62
 
 
63
_marker = object()
 
64
 
 
65
HASH_EMPTY = model.EMPTY_CONTENT_HASH
 
66
NO_CONTENT_HASH = ""
 
67
 
 
68
 
 
69
TESTS_DIR = os.getcwd() + "/tmp/sync_tests"
 
70
 
 
71
 
 
72
def show_time():
 
73
    """Return current time with HH:MM:SS,xxx where xxx are msec."""
 
74
    t = time.time()
 
75
    p1 = time.strftime("%H:%M:%S", time.localtime(t))
 
76
    p2 = ("%.3f" % (t % 1))[2:]
 
77
    return "%s,%s" % (p1, p2)
 
78
 
 
79
 
 
80
class NoCloseStringIO(StringIO):
 
81
    """a stringio subclass that doesnt destroy content on close."""
 
82
    def close(self):
 
83
        """do nothing"""
 
84
        pass
 
85
 
 
86
 
 
87
class ReallyAttentiveListener(object):
 
88
    """A listener that listens to everything and writes it down very tidily."""
 
89
 
 
90
    def __init__(self):
 
91
        self.q = []
 
92
 
 
93
    def __getattr__(self, attr):
 
94
        if attr.startswith('handle_'):
 
95
            return partial(self.write_it_down, attr[7:])
 
96
        # probably AttributeError, but just in case
 
97
        return super(ReallyAttentiveListener, self).__getattr__(attr)
 
98
 
 
99
    def write_it_down(self, attr, *a, **kw):
 
100
        """Write the event down."""
 
101
        self.q.append((attr, kw))
 
102
 
 
103
    def get_svhash_for(self, share_id, node_id):
 
104
        """
 
105
        find the latest SV_HASH_NEW for the given node
 
106
        """
 
107
        for ev, kw in reversed(self.q):
 
108
            if ev == 'SV_HASH_NEW':
 
109
                if kw.get('node_id') == node_id \
 
110
                        and kw.get('share_id') == share_id:
 
111
                    return kw.get('hash')
 
112
        raise ValueError("no hash for %s in %s" % (node_id, self.q))
 
113
 
 
114
    def get_rescan_from_scratch_for(self, volume_id):
 
115
        """Find the last AQ_RESCAN_FROM_SCRATCH_OK and return the kwargs."""
 
116
        for ev, kw in reversed(self.q):
 
117
            if ev == 'AQ_RESCAN_FROM_SCRATCH_OK':
 
118
                if kw.get('volume_id') == volume_id:
 
119
                    return kw
 
120
        raise ValueError("no AQ_RESCAN_FROM_SCRATCH_OK for %s in %s" %
 
121
                         (volume_id, self.q))
 
122
 
 
123
    def get_id_for_marker(self, marker, default=_marker):
 
124
        """
 
125
        find the latest AQ_(FILE|DIR)_NEW_OK for the given marker
 
126
        """
 
127
        for ev, kw in reversed(self.q):
 
128
            if ev in ('AQ_FILE_NEW_OK', 'AQ_DIR_NEW_OK', 'AQ_CREATE_SHARE_OK'):
 
129
                if kw.get('marker') == marker:
 
130
                    if ev == 'AQ_CREATE_SHARE_OK':
 
131
                        return kw.get('share_id')
 
132
                    else:
 
133
                        return kw.get('new_id')
 
134
        if default is _marker:
 
135
            raise ValueError("no uuid for marker %s" % marker)
 
136
        else:
 
137
            return default
 
138
 
 
139
 
 
140
class DumbVolumeManager(volume_manager.VolumeManager):
 
141
    """A real VolumeManager but with dummy refresh_* and (a few) handle_*."""
 
142
 
 
143
    def refresh_shares(self):
 
144
        """Noop."""
 
145
 
 
146
    def refresh_volumes(self):
 
147
        """Noop."""
 
148
 
 
149
    def on_server_root(self, root):
 
150
        """Asociate server root and nothing more."""
 
151
        self.log.debug('on_server_root(%s)', root)
 
152
        self._set_root(root)
 
153
 
 
154
    def handle_AQ_LIST_VOLUMES_ERROR(self, error):
 
155
        """Noop."""
 
156
 
 
157
    def handle_AQ_LIST_SHARES_ERROR(self, error):
 
158
        """Noop."""
 
159
 
 
160
    def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
 
161
        """Noop."""
 
162
 
 
163
 
 
164
class ReallyFakeMain(main.Main):
 
165
    """
 
166
    This main is so fake, it breaks nearly everything.
 
167
    """
 
168
    def __init__(self, port, root_dir, data_dir, partials_dir,
 
169
                 dns_srv=None):
 
170
        self.root_dir = root_dir
 
171
        self.shares_dir = os.path.join(os.path.dirname(root_dir), 'shares_dir')
 
172
        self.shares_dir_link = os.path.join(root_dir, 'shares_link')
 
173
        self.data_dir = data_dir
 
174
        self.fsmdir = os.path.join(data_dir, 'fsmdir')
 
175
        self.partials_dir = partials_dir
 
176
        self.tritcask_dir = os.path.join(self.data_dir, 'tritcask')
 
177
        self.hash_q = type('fake hash queue', (),
 
178
                           {'empty': staticmethod(lambda: True),
 
179
                            '__len__': staticmethod(lambda: 0)})()
 
180
        self.logger = logging.getLogger('fsyncsrvr.SyncDaemon.Main')
 
181
        self.db = tritcask.Tritcask(self.tritcask_dir)
 
182
        self.vm = DumbVolumeManager(self)
 
183
        self.fs = FileSystemManager(self.fsmdir, self.partials_dir, self.vm,
 
184
                                    self.db)
 
185
        self.event_q = EventQueue(self.fs)
 
186
        self.event_q.subscribe(self.vm)
 
187
        self.fs.register_eq(self.event_q)
 
188
        self.sync = Sync(self)
 
189
        self.action_q = ActionQueue(self.event_q, self, '127.0.0.1', port,
 
190
                                    dns_srv, disable_ssl_verify=True)
 
191
        self.state_manager = main.StateManager(self, handshake_timeout=30)
 
192
        self.state_manager.connection.waiting_timeout = .1
 
193
        self.vm.init_root()
 
194
 
 
195
    def server_rescan(self):
 
196
        """Fake server rescan that doesn't actually rescan anything."""
 
197
        return self.vm.server_rescan()
 
198
 
 
199
    def local_rescan(self):
 
200
        """Fake!"""
 
201
 
 
202
 
 
203
def failure_ignore(*failures):
 
204
    """A decorator to ignore the failure.
 
205
 
 
206
    It marks a test method such that failures during the test will not be
 
207
    marked as failures of the test itself, but simply ignored.
 
208
    """
 
209
    def wrapper(func):
 
210
        """The wrapper function."""
 
211
        func.failure_ignore = failures
 
212
        return func
 
213
    return wrapper
 
214
 
 
215
 
 
216
def failure_expected(failure):
 
217
    """A decorator to expect a failure.
 
218
 
 
219
    It marks a test method such that failures during the test will not be
 
220
    marked as failures of the test itself, but rather the opposite: it is
 
221
    the lack of the failure that is a failure.
 
222
    """
 
223
    def wrapper(func):
 
224
        """The wrapper function."""
 
225
        func.failure_expected = failure
 
226
        return func
 
227
    return wrapper
 
228
 
 
229
 
 
230
class WaitingHelpingHandler(object):
 
231
    """An auxiliary class that helps wait for events."""
 
232
 
 
233
    def __init__(self, event_queue, waiting_events, waiting_kwargs,
 
234
                 result=None):
 
235
        self.deferred = defer.Deferred()
 
236
        self.event_queue = event_queue
 
237
        self.result = result
 
238
        self.waiting_events = waiting_events
 
239
        self.waiting_kwargs = waiting_kwargs
 
240
        event_queue.subscribe(self)
 
241
 
 
242
    def handle_default(self, event, *args, **kwargs):
 
243
        """Got an event: fire if it's one we want"""
 
244
        if event in self.waiting_events:
 
245
            if args:
 
246
                for wv in self.waiting_kwargs.values():
 
247
                    if wv not in args:
 
248
                        return
 
249
            if kwargs:
 
250
                for wk, wv in self.waiting_kwargs.items():
 
251
                    if not (wk in kwargs and kwargs[wk] == wv):
 
252
                        return
 
253
            self.fire()
 
254
 
 
255
    def fire(self):
 
256
        """start fire the callback"""
 
257
        self.event_queue.unsubscribe(self)
 
258
        reactor.callLater(0, lambda: self.deferred.callback(self.result))
 
259
 
 
260
 
 
261
# The following class is a duplicated from
 
262
# ubuntuone-client/tests/platform/linux/test_dbus.py
 
263
# will be removed when bug #917285 is resolved
 
264
class FakeNetworkManager(dbus.service.Object):
 
265
    """A fake NetworkManager that only emits StatusChanged signal."""
 
266
 
 
267
    State = 3
 
268
    path = '/org/freedesktop/NetworkManager'
 
269
 
 
270
    def __init__(self, bus):
 
271
        self.bus = bus
 
272
        self.bus.request_name('org.freedesktop.NetworkManager',
 
273
                              flags=dbus.bus.NAME_FLAG_REPLACE_EXISTING |
 
274
                              dbus.bus.NAME_FLAG_DO_NOT_QUEUE |
 
275
                              dbus.bus.NAME_FLAG_ALLOW_REPLACEMENT)
 
276
        self.busName = dbus.service.BusName('org.freedesktop.NetworkManager',
 
277
                                            bus=self.bus)
 
278
        dbus.service.Object.__init__(self, bus_name=self.busName,
 
279
                                     object_path=self.path)
 
280
 
 
281
    def shutdown(self):
 
282
        """Shutdown the fake NetworkManager."""
 
283
        self.busName.get_bus().release_name(self.busName.get_name())
 
284
        self.remove_from_connection()
 
285
 
 
286
    @dbus.service.method(dbus.PROPERTIES_IFACE,
 
287
                         in_signature='ss', out_signature='v',
 
288
                         async_callbacks=('reply_handler', 'error_handler'))
 
289
    def Get(self, interface, propname, reply_handler=None, error_handler=None):
 
290
        """Fake dbus's Get method to get at the State property."""
 
291
        try:
 
292
            reply_handler(getattr(self, propname, None))
 
293
        except Exception, e:
 
294
            error_handler(e)
 
295
 
 
296
 
 
297
class TestWithDatabase(BaseProtocolTestCase, StorageDALTestCase):
 
298
    """Hook up Trial, ORMTestCase, and our very own s4 and storage servers.
 
299
 
 
300
    Large chunks have been copy-pasted from
 
301
    server.testing.testcase.TestWithDatabase, hence the name.
 
302
    """
 
303
    auth_provider_class = SimpleAuthProvider
 
304
    _do_teardown_eq = False
 
305
    _ignore_cancelled_downloads = False
 
306
    failed = False
 
307
    ssl_proxy_heartbeat_interval = 0
 
308
 
 
309
    @defer.inlineCallbacks
 
310
    def setUp(self):
 
311
        """Setup."""
 
312
        yield super(TestWithDatabase, self).setUp()
 
313
        self.__root = None
 
314
 
 
315
        # Patch AQ's deferreds, to support these tests still being run
 
316
        # in Lucid, but while code calls .cancel() on them
 
317
        # Remove this code when filesync project is taken to Precise.
 
318
        defer.Deferred.cancel = lambda self: None
 
319
        defer.DeferredList.cancel = lambda self: None
 
320
 
 
321
        # Set up the main loop and bus connection
 
322
        self.loop = DBusGMainLoop(set_as_default=True)
 
323
        bus_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS', None)
 
324
        self.bus = dbus.bus.BusConnection(address_or_type=bus_address,
 
325
                                          mainloop=self.loop)
 
326
 
 
327
        # Monkeypatch the dbus.SessionBus/SystemBus methods, to ensure we
 
328
        # always point at our own private bus instance.
 
329
        self.patch(dbus, 'SessionBus', lambda: self.bus)
 
330
        self.patch(dbus, 'SystemBus', lambda: self.bus)
 
331
 
 
332
        self.nm = FakeNetworkManager(self.bus)
 
333
        self.addCleanup(self.nm.shutdown)
 
334
 
 
335
        # start the ssl proxy
 
336
        self.ssl_service = ssl_proxy.ProxyService(self.ssl_cert, self.ssl_key,
 
337
                                                  self.ssl_cert_chain,
 
338
                                                  0,  # port
 
339
                                                  "localhost", self.port,
 
340
                                                  "ssl-proxy-test", 0)
 
341
        self.patch(config.ssl_proxy, "heartbeat_interval",
 
342
                   self.ssl_proxy_heartbeat_interval)
 
343
        yield self.ssl_service.startService()
 
344
 
 
345
        # these tests require a "test" bucket to be avaialble,
 
346
        # but don't create it using the s3 api...
 
347
        self.s4_site.resource._add_bucket("test")
 
348
        if os.path.exists(self.tmpdir):
 
349
            self.rmtree(self.tmpdir)
 
350
 
 
351
        _user_data = [
 
352
            (u'jack', u'jackpass', u'shard0'),
 
353
            (u'jane', u'janepass', u'shard1'),
 
354
            (u'john', u'johnpass', u'shard2'),
 
355
        ]
 
356
        self.access_tokens = {}
 
357
        self.storage_users = {}
 
358
        for username, password, shard in _user_data:
 
359
            self.access_tokens[username] = {
 
360
                'username': username,
 
361
                'password': password,
 
362
            }
 
363
            user = create_test_user(username=username,
 
364
                                    password=password, shard_id=shard)
 
365
            self.storage_users[username] = user
 
366
        self.dns_srv = None
 
367
 
 
368
        # override and cleanup user config
 
369
        self.old_get_config_files = main.config.get_config_files
 
370
        main.config.get_config_files = lambda: SD_CONFIGS
 
371
        main.config._user_config = None
 
372
        user_config = main.config.get_user_config()
 
373
        for section in user_config.sections():
 
374
            user_config.remove_section(section)
 
375
        main.config.get_user_config().set_throttling_read_limit(-1)
 
376
        main.config.get_user_config().set_throttling_write_limit(-1)
 
377
        main.config.get_user_config().set_autoconnect(False)
 
378
 
 
379
        # logging can not be configured dinamically, touch the general logger
 
380
        # to get one big file and be able to get the logs if failure
 
381
        logger.init()
 
382
        logger.set_max_bytes(0)
 
383
        yield self.client_setup()
 
384
 
 
385
    @property
 
386
    def tmpdir(self):
 
387
        """Override default tmpdir property."""
 
388
        return TESTS_DIR
 
389
 
 
390
    def tearDown(self):
 
391
        """Tear down."""
 
392
        main.config.get_config_files = self.old_get_config_files
 
393
        d = super(TestWithDatabase, self).tearDown()
 
394
        d.addCallback(lambda _: self.ssl_service.stopService())
 
395
        if self._do_teardown_eq:
 
396
            d.addCallback(lambda _: self.eq.shutdown())
 
397
        d.addCallback(lambda _: self.main.state_manager.shutdown())
 
398
        d.addCallback(lambda _: self.main.db.shutdown())
 
399
        test_method = getattr(self, self._testMethodName)
 
400
        failure_expected = getattr(test_method, 'failure_expected', False)
 
401
        if failure_expected and failure_expected != self.failed:
 
402
            msg = "test method %r should've failed with %s and " \
 
403
                % (self._testMethodName, failure_expected)
 
404
            if self.failed:
 
405
                msg += 'instead failed with: %s' % (self.failed,)
 
406
            else:
 
407
                msg += "didn't"
 
408
            d.addCallback(lambda _: Failure(AssertionError(msg)))
 
409
        if self.failed and failure_expected != self.failed:
 
410
            failure_ignore = getattr(test_method, 'failure_ignore', ())
 
411
            if self.failed and self.failed not in failure_ignore:
 
412
                msg = "test method %r failed with: %s" \
 
413
                      % (self._testMethodName, self.failed)
 
414
                d.addCallback(lambda _: Failure(AssertionError(msg)))
 
415
 
 
416
        def temp_dir_cleanup(_):
 
417
            """Clean up tmpdir."""
 
418
            if os.path.exists(self.tmpdir):
 
419
                self.rmtree(self.tmpdir)
 
420
 
 
421
        d.addBoth(temp_dir_cleanup)
 
422
        return d
 
423
 
 
424
    def mktemp(self, name='temp'):
 
425
        """ Customized mktemp that accepts an optional name argument. """
 
426
        tempdir = os.path.join(self.tmpdir, name)
 
427
        if os.path.exists(tempdir):
 
428
            self.rmtree(tempdir)
 
429
        os.makedirs(tempdir)
 
430
        self.addCleanup(self.rmtree, tempdir)
 
431
        return tempdir
 
432
 
 
433
    def rmtree(self, path):
 
434
        """Custom rmtree that handle ro parent(s) and childs."""
 
435
        # change perms to rw, so we can delete the temp dir
 
436
        if path != self.__root:
 
437
            platform.set_dir_readwrite(os.path.dirname(path))
 
438
        if not platform.can_write(path):
 
439
            platform.set_dir_readwrite(path)
 
440
 
 
441
        for dirpath, dirs, files in os.walk(path):
 
442
            for adir in dirs:
 
443
                adir = os.path.join(dirpath, adir)
 
444
                if not platform.can_write(adir):
 
445
                    platform.set_dir_readwrite(adir)
 
446
 
 
447
        shutil.rmtree(path)
 
448
 
 
449
    def client_setup(self):
 
450
        """Create the clients needed for the tests."""
 
451
        self._do_teardown_eq = True
 
452
        root_dir = self.mktemp('fake_root_dir')
 
453
        data_dir = self.mktemp('fake_data_dir')
 
454
        partials = self.mktemp('partials_dir')
 
455
        self.main = ReallyFakeMain(self.port, root_dir,
 
456
                                   data_dir, partials, self.dns_srv)
 
457
        self.state = self.main.state_manager
 
458
        self.eq = self.main.event_q
 
459
        self.listener = ReallyAttentiveListener()
 
460
        self.eq.subscribe(self.listener)
 
461
        self.eq.subscribe(self)
 
462
        self.aq = self.main.action_q
 
463
 
 
464
    @property
 
465
    def ssl_port(self):
 
466
        """SSL port."""
 
467
        return self.ssl_service.port
 
468
 
 
469
    def nuke_client_method(self, method_name, callback,
 
470
                           method_retval_cb=defer.Deferred):
 
471
        """
 
472
        Nuke the client method, call the callback, and de-nuke it
 
473
        """
 
474
        old_method = getattr(self.aq.client, method_name)
 
475
        setattr(self.aq.client, method_name,
 
476
                lambda *_, **__: method_retval_cb())
 
477
        try:
 
478
            retval = callback()
 
479
        finally:
 
480
            if self.aq.client is not None:
 
481
                setattr(self.aq.client, method_name, old_method)
 
482
        return retval
 
483
 
 
484
    def wait_for(self, *waiting_events, **waiting_kwargs):
 
485
        """defer until event appears"""
 
486
        return WaitingHelpingHandler(self.main.event_q,
 
487
                                     waiting_events,
 
488
                                     waiting_kwargs).deferred
 
489
 
 
490
    def wait_for_cb(self, *waiting_events, **waiting_kwargs):
 
491
        """
 
492
        Returns a callable that returns a deferred that fires when an event
 
493
        happens
 
494
        """
 
495
        return lambda result: WaitingHelpingHandler(self.main.event_q,
 
496
                                                    waiting_events,
 
497
                                                    waiting_kwargs,
 
498
                                                    result).deferred
 
499
 
 
500
    def handle_default(self, event, *args, **kwargs):
 
501
        """
 
502
        Handle events. In particular, catch errors and store them
 
503
        under the 'failed' attribute
 
504
        """
 
505
        if 'error' in kwargs:
 
506
            self.failed = kwargs['error']
 
507
 
 
508
    def handle_AQ_DOWNLOAD_CANCELLED(self, *args, **kwargs):
 
509
        """handle the case of CANCEL"""
 
510
        if not self._ignore_cancelled_downloads:
 
511
            self.failed = 'CANCELLED'
 
512
 
 
513
    def wait_for_nirvana(self, last_event_interval=.5):
 
514
        """Get a deferred that will fire when there are no more
 
515
        events or transfers."""
 
516
        return self.main.wait_for_nirvana(last_event_interval)
 
517
 
 
518
    def connect(self, do_connect=True):
 
519
        """Encourage the AQ to connect."""
 
520
        d = self.wait_for('SYS_CONNECTION_MADE')
 
521
        self.eq.push('SYS_INIT_DONE')
 
522
        self.eq.push('SYS_LOCAL_RESCAN_DONE')
 
523
        self.eq.push('SYS_USER_CONNECT',
 
524
                     access_token=self.access_tokens['jack'])
 
525
        if do_connect:
 
526
            self.eq.push('SYS_NET_CONNECTED')
 
527
        return d
 
528
 
 
529
 
 
530
class _Placeholder(object):
 
531
    """Object you can use in eq comparison w'out knowing equality with what."""
 
532
    def __init__(self, label):
 
533
        self.label = label
 
534
 
 
535
    def __repr__(self):
 
536
        return "<placeholder for %s>" % self.label
 
537
 
 
538
 
 
539
class _HashPlaceholder(_Placeholder):
 
540
    """A placeholder for a hash"""
 
541
    def __eq__(self, other):
 
542
        return all((isinstance(other, str),
 
543
                    other.startswith('sha1:'),
 
544
                    len(other) == 45))
 
545
 
 
546
 
 
547
class _UUIDPlaceholder(_Placeholder):
 
548
    """A placeholder for an uuid"""
 
549
 
 
550
    def __init__(self, label, exceptions=()):
 
551
        super(_UUIDPlaceholder, self).__init__(label)
 
552
        self.exceptions = exceptions
 
553
 
 
554
    def __eq__(self, other):
 
555
        if other in self.exceptions:
 
556
            return True
 
557
        try:
 
558
            str(uuid.UUID(other))
 
559
        except ValueError:
 
560
            return False
 
561
        else:
 
562
            return True
 
563
 
 
564
 
 
565
class _TypedPlaceholder(_Placeholder):
 
566
    """A placeholder for an object of a certain type"""
 
567
 
 
568
    def __init__(self, label, a_type):
 
569
        super(_TypedPlaceholder, self).__init__(label)
 
570
        self.type = a_type
 
571
 
 
572
    def __eq__(self, other):
 
573
        return isinstance(other, self.type)
 
574
 
 
575
 
 
576
class _ShareListPlaceholder(_Placeholder):
 
577
    """A placeholder for a list of shares"""
 
578
 
 
579
    def __init__(self, label, shares):
 
580
        super(_ShareListPlaceholder, self).__init__(label)
 
581
        self.shares = shares
 
582
 
 
583
    def __cmp__(self, other):
 
584
        return cmp(self.shares, other.shares)
 
585
 
 
586
aHash = _HashPlaceholder('a hash')
 
587
anUUID = _UUIDPlaceholder('an UUID')
 
588
aShareUUID = _UUIDPlaceholder('a share UUID', ('',))
 
589
anEmptyShareList = _ShareListPlaceholder('an empty share list', [])
 
590
aShareInfo = _TypedPlaceholder('a share info', sharersp.NotifyShareHolder)
 
591
aGetContentRequest = _TypedPlaceholder('a get_content request',
 
592
                                       client.GetContent)
 
593
anAQCommand = _TypedPlaceholder('an action queue command',
 
594
                                ActionQueueCommand)
 
595
 
 
596
 
 
597
class TestBase(TestWithDatabase):
 
598
    """Base class for TestMeta and TestContent."""
 
599
    client = None
 
600
 
 
601
    @defer.inlineCallbacks
 
602
    def setUp(self):
 
603
        """Set up."""
 
604
        yield super(TestBase, self).setUp()
 
605
        yield self.connect()
 
606
        self.client = self.aq.client
 
607
        self.assertFalse(self.client.factory.connector is None)
 
608
        self.root = yield self.client.get_root()
 
609
        yield self.wait_for_nirvana(.5)
 
610
 
 
611
    def tearDown(self):
 
612
        """Tear down."""
 
613
        self.aq.disconnect()
 
614
        return super(TestBase, self).tearDown()
 
615
 
 
616
    def assertEvent(self, event, msg=None):
 
617
        """Check if an event happened."""
 
618
        self.assertIn(event, self.listener.q, msg)
 
619
 
 
620
    def assertInQ(self, deferred, containee, msg=None):
 
621
        """Deferredly assert that the containee is in the event queue.
 
622
 
 
623
        Containee can be callable, in which case it's called before asserting.
 
624
        """
 
625
        def check_queue(_):
 
626
            """The check itself."""
 
627
            ce = containee() if callable(containee) else containee
 
628
            self.assertIn(ce, self.listener.q, msg)
 
629
        deferred.addCallback(check_queue)
 
630
 
 
631
    def assertOneInQ(self, deferred, containees, msg=None):
 
632
        """Deferredly assert that one of the containee is in the event queue.
 
633
 
 
634
        Containee can be callable, in which case it's called before asserting.
 
635
        """
 
636
        def check_queue(_, msg=msg):
 
637
            """The check itself."""
 
638
            ce = containees() if callable(containees) else containees
 
639
            for i in ce:
 
640
                if i in self.listener.q:
 
641
                    break
 
642
            else:
 
643
                if msg is None:
 
644
                    msg = 'None of %s were found in %s' % (ce, self.listener.q)
 
645
                raise AssertionError(msg)
 
646
        deferred.addCallback(check_queue)
 
647
 
 
648
    def assertNotInQ(self, deferred, containee, msg=None):
 
649
        """Deferredly assert that the containee is not in the event queue.
 
650
 
 
651
        Containee can be callable, in which case it's called before asserting.
 
652
        """
 
653
        def check_queue(_):
 
654
            """The check itself."""
 
655
            ce = containee() if callable(containee) else containee
 
656
            self.assertNotIn(ce, self.listener.q, msg)
 
657
        deferred.addCallback(check_queue)
 
658
 
 
659
    @defer.inlineCallbacks
 
660
    def _gmk(self, what, name, parent, default_id, path):
 
661
        """Generalized _mk* helper."""
 
662
        if path is None:
 
663
            path = name + str(uuid.uuid4())
 
664
        if parent is None:
 
665
            parent = self.root
 
666
        parent_path = self.main.fs.get_by_node_id(request.ROOT, parent).path
 
667
        mdid = self.main.fs.create(os.path.join(parent_path, path),
 
668
                                   request.ROOT)
 
669
        marker = MDMarker(mdid)
 
670
        meth = getattr(self.aq, 'make_' + what)
 
671
        meth(request.ROOT, parent, name, marker, mdid)
 
672
        yield self.wait_for('AQ_FILE_NEW_OK', 'AQ_FILE_NEW_ERROR',
 
673
                            'AQ_DIR_NEW_OK', 'AQ_DIR_NEW_ERROR',
 
674
                            marker=marker)
 
675
 
 
676
        node_id = self.listener.get_id_for_marker(marker, default_id)
 
677
        defer.returnValue((mdid, node_id))
 
678
 
 
679
    def _mkdir(self, name, parent=None, default_id=_marker, path=None):
 
680
        """Create a dir, optionally storing the resulting uuid."""
 
681
        return self._gmk('dir', name, parent, default_id, path)
 
682
 
 
683
    def _mkfile(self, name, parent=None, default_id=_marker, path=None):
 
684
        """Create a file, optionally storing the resulting uuid."""
 
685
        return self._gmk('file', name, parent, default_id, path)
 
686
 
 
687
 
 
688
class TestContentBase(TestBase):
 
689
    """Reusable utility methods born out of TestContent."""
 
690
 
 
691
    def _get_data(self, data_len=1000):
 
692
        """Get the hash, crc and size of a chunk of data."""
 
693
        data = os.urandom(data_len)  # not terribly compressible
 
694
        hash_object = content_hash_factory()
 
695
        hash_object.update(data)
 
696
        hash_value = hash_object.content_hash()
 
697
        crc32_value = crc32(data)
 
698
        size = len(data)
 
699
        return NoCloseStringIO(data), data, hash_value, crc32_value, size
 
700
 
 
701
    def _mk_file_w_content(self, filename='hola', data_len=1000):
 
702
        """Make a file and dump some content in it."""
 
703
        fobj, data, hash_value, crc32_value, size = self._get_data(data_len)
 
704
        path = filename
 
705
 
 
706
        @defer.inlineCallbacks
 
707
        def worker():
 
708
            """Do the upload, later."""
 
709
            mdid, node_id = yield self._mkfile(filename, path=path)
 
710
            wait_upload = self.wait_for('AQ_UPLOAD_FINISHED',
 
711
                                        share_id=request.ROOT, hash=hash_value,
 
712
                                        node_id=node_id)
 
713
            orig_open_file = self.main.fs.open_file
 
714
            self.main.fs.open_file = lambda _: fobj
 
715
 
 
716
            self.aq.upload(request.ROOT, node_id, NO_CONTENT_HASH,
 
717
                           hash_value, crc32_value, size, mdid)
 
718
            self.main.fs.open_file = orig_open_file
 
719
            yield wait_upload
 
720
            defer.returnValue((mdid, node_id))
 
721
 
 
722
        return hash_value, crc32_value, data, worker()
 
723
 
 
724
 
 
725
class FakeResolver(ResolverBase):
 
726
    """A fake resolver that returns two fixed hosts.
 
727
 
 
728
    Those are fs-1.ubuntuone.com and fs-1.server.com both with port=443
 
729
    """
 
730
 
 
731
    def _lookup(self, name, cls, qtype, timeout):
 
732
        """ do the fake lookup. """
 
733
        hostname = 'fs-%s.server.com'
 
734
        rr = dns.RRHeader(name=hostname % '0', type=qtype, cls=cls, ttl=60,
 
735
                          payload=dns.Record_SRV(target=hostname % '0',
 
736
                                                 port=443))
 
737
        rr1 = dns.RRHeader(name=hostname % '1', type=qtype, cls=cls, ttl=60,
 
738
                           payload=dns.Record_SRV(target=hostname % '1',
 
739
                                                  port=443))
 
740
        results = [rr, rr1]
 
741
        authority = []
 
742
        addtional = []
 
743
        return defer.succeed((results, authority, addtional))
 
744
 
 
745
 
 
746
class MethodInterferer(object):
 
747
    """Helper to nuke a client method and restore it later."""
 
748
 
 
749
    def __init__(self, obj, meth):
 
750
        self.obj = obj
 
751
        self.meth = meth
 
752
        self.old = None
 
753
 
 
754
    def insert_after(self, info, func):
 
755
        """Runs func after running the replaced method."""
 
756
        self.old = getattr(self.obj, self.meth)
 
757
 
 
758
        def middle(*args, **kwargs):
 
759
            """Helper/worker func."""
 
760
            r = self.old(*args, **kwargs)
 
761
            func(*args, **kwargs)
 
762
            return r
 
763
        setattr(self.obj, self.meth, middle)
 
764
        return info
 
765
 
 
766
    def insert_before(self, info, func):
 
767
        """Runs func before running the replaced method."""
 
768
        self.old = getattr(self.obj, self.meth)
 
769
 
 
770
        def middle(*args, **kwargs):
 
771
            """Helper/worker func."""
 
772
            if func(*args, **kwargs):
 
773
                return self.old(*args, **kwargs)
 
774
        setattr(self.obj, self.meth, middle)
 
775
        return info
 
776
 
 
777
    def nuke(self, info, func=None):
 
778
        """Nukes the method"""
 
779
        self.old = getattr(self.obj, self.meth)
 
780
        if func is None:
 
781
            func = lambda *_, **__: None
 
782
        setattr(self.obj, self.meth, func)
 
783
        return info
 
784
 
 
785
    def restore(self, info=None):
 
786
        """Restores the original method."""
 
787
        if self.old is None:
 
788
            m = "the old method is None (hint: called restore before nuke)"
 
789
            raise ValueError(m)
 
790
        setattr(self.obj, self.meth, self.old)
 
791
        return info
 
792
 
 
793
    def pause(self, func=None):
 
794
        """Pauses a method execution that can be played later."""
 
795
        self.old = getattr(self.obj, self.meth)
 
796
        play = defer.Deferred()
 
797
 
 
798
        @defer.inlineCallbacks
 
799
        def middle(*a, **k):
 
800
            """Play it in the middle."""
 
801
            if func is not None:
 
802
                func(*a, **k)
 
803
            yield play
 
804
            setattr(self.obj, self.meth, self.old)
 
805
            result = yield defer.maybeDeferred(self.old, *a, **k)
 
806
            defer.returnValue(result)
 
807
 
 
808
        setattr(self.obj, self.meth, middle)
 
809
        return lambda: play.callback(True)
 
810
 
 
811
 
 
812
class NukeAQClient(object):
 
813
    """Helper to nuke a client method and restore it later."""
 
814
 
 
815
    def __init__(self, aq, meth):
 
816
        self.aq = aq
 
817
        self.meth = meth
 
818
        self.old = None
 
819
 
 
820
    def nuke(self, info, func=None):
 
821
        """Nukes the method"""
 
822
        self.old = getattr(self.aq.client, self.meth)
 
823
        if func is None:
 
824
            func = lambda *_, **__: defer.Deferred
 
825
        setattr(self.aq.client, self.meth, func)
 
826
        return info
 
827
 
 
828
    def restore(self, info):
 
829
        """Restores the original method."""
 
830
        if self.old is None:
 
831
            m = "the old method is None (hint: called restore before nuke)"
 
832
            raise ValueError(m)
 
833
        if self.aq.client is not None:
 
834
            setattr(self.aq.client, self.meth, self.old)
 
835
        return info
 
836
 
 
837
 
 
838
class FakeGetContent(object):
 
839
    """Helper class that haves self.deferred"""
 
840
    def __init__(self, deferred, share, node, hash):
 
841
        """initialize it"""
 
842
        self.deferred = deferred
 
843
        self.share_id = share
 
844
        self.node_id = node
 
845
        self.server_hash = hash
 
846
 
 
847
 
 
848
class FakeFailure(object):
 
849
    """An object that when compared to a Failure, checks its message."""
 
850
 
 
851
    def __init__(self, message):
 
852
        self._message = message
 
853
 
 
854
    def __eq__(self, other):
 
855
        """Checks using the message of 'other' if any."""
 
856
        error_message_method = getattr(other, 'getErrorMessage', None)
 
857
        if error_message_method:
 
858
            other_message = error_message_method()
 
859
            return other_message == self._message
 
860
        else:
 
861
            return False