~agateau/ubuntuone-client/gir1.2-unity-5.0

« back to all changes in this revision

Viewing changes to .pc/02_value_io_error_fix.patch/tests/syncdaemon/test_action_queue.py

  • Committer: Package Import Robot
  • Author(s): Rodney Dawes
  • Date: 2011-11-21 10:24:35 UTC
  • Revision ID: package-import@ubuntu.com-20111121102435-b0t4er4bwxsgz899
Tags: 2.0.0-0ubuntu3.1
* debian/patches/series:
  - Include missing patch from previous upload

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
#
 
3
# Author: John R. Lenton <john.lenton@canonical.com>
 
4
# Author: Natalia B. Bidart <natalia.bidart@canonical.com>
 
5
# Author: Facundo Batista <facundo@canonical.com>
 
6
#
 
7
# Copyright 2009, 2010, 2011 Canonical Ltd.
 
8
#
 
9
# This program is free software: you can redistribute it and/or modify it
 
10
# under the terms of the GNU General Public License version 3, as published
 
11
# by the Free Software Foundation.
 
12
#
 
13
# This program is distributed in the hope that it will be useful, but
 
14
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
15
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
16
# PURPOSE.  See the GNU General Public License for more details.
 
17
#
 
18
# You should have received a copy of the GNU General Public License along
 
19
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
"""Tests for the action queue module."""
 
21
 
 
22
from __future__ import with_statement
 
23
 
 
24
import base64
 
25
import inspect
 
26
import logging
 
27
import operator
 
28
import os
 
29
import unittest
 
30
import urllib2
 
31
import uuid
 
32
 
 
33
from functools import wraps
 
34
from StringIO import StringIO
 
35
 
 
36
import OpenSSL.SSL
 
37
 
 
38
from mocker import Mocker, MockerTestCase, ANY, expect
 
39
from oauth import oauth
 
40
from twisted.internet import defer, threads, reactor
 
41
from twisted.internet import error as twisted_error
 
42
from twisted.python.failure import DefaultException, Failure
 
43
from twisted.web import server
 
44
from twisted.trial.unittest import TestCase as TwistedTestCase
 
45
from zope.interface.verify import verifyObject, verifyClass
 
46
 
 
47
from contrib.testing.testcase import (
 
48
    BaseTwistedTestCase, DummyClass, FakeMain, FakeActionQueue,
 
49
)
 
50
from ubuntuone.devtools import handlers
 
51
from ubuntuone import logger, clientdefs
 
52
from ubuntuone.platform import open_file, platform, path_exists
 
53
from ubuntuone.storageprotocol import (
 
54
    client,
 
55
    content_hash,
 
56
    errors,
 
57
    protocol_pb2,
 
58
    request,
 
59
)
 
60
from ubuntuone.syncdaemon import states, interfaces, config
 
61
from ubuntuone.syncdaemon import action_queue
 
62
from ubuntuone.syncdaemon.action_queue import (
 
63
    ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
 
64
    DeleteVolume, Download, ListVolumes, ActionQueueProtocol, ListShares,
 
65
    RequestQueue, UploadProgressWrapper, Upload,
 
66
    CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
 
67
    TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
 
68
    ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
 
69
    InterruptibleDeferred, DeferredInterrupted, ConditionsLocker,
 
70
    NamedTemporaryFile, PingManager,
 
71
)
 
72
from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
 
73
from ubuntuone.syncdaemon.marker import MDMarker
 
74
from tests.platform import setup_action_queue_test
 
75
 
 
76
PATH = os.path.join(u'~', u'Documents', u'pdfs', u'moño', u'')
 
77
NAME = u'UDF-me'
 
78
VOLUME = uuid.UUID('12345678-1234-1234-1234-123456789abc')
 
79
NODE = uuid.UUID('FEDCBA98-7654-3211-2345-6789ABCDEF12')
 
80
USER = u'Dude'
 
81
SHARE = uuid.uuid4()
 
82
 
 
83
 
 
84
def fire_and_check(f, deferred, check):
 
85
    """Callback a deferred."""
 
86
    @wraps(f)
 
87
    def inner(*args, **kwargs):
 
88
        """Execute f and fire the deferred."""
 
89
        result = f(*args, **kwargs)
 
90
        error = check()
 
91
        if not error:
 
92
            deferred.callback(True)
 
93
        else:
 
94
            deferred.errback(error)
 
95
        return result
 
96
    return inner
 
97
 
 
98
 
 
99
class MementoHandler(handlers.MementoHandler):
 
100
    """Wrapper to handle custom logger levels."""
 
101
 
 
102
    def check_note(self, *msgs):
 
103
        """Shortcut for checking in ERROR."""
 
104
        return self.check(logger.NOTE, *msgs)
 
105
 
 
106
 
 
107
class FakeTempFile(object):
 
108
    """Fake temporary file."""
 
109
    def __init__(self, tmpdir):
 
110
        self.name = os.path.join(tmpdir, 'remove-me.zip')
 
111
        open_file(self.name, 'w').close()
 
112
 
 
113
 
 
114
class FakeCommand(object):
 
115
    """Yet another fake action queue command."""
 
116
 
 
117
    is_runnable = True
 
118
    paused = False
 
119
    conditions_checked = False
 
120
 
 
121
    def __init__(self, share_id=None, node_id=None):
 
122
        self.share_id = share_id
 
123
        self.node_id = node_id
 
124
        self.cancelled = False
 
125
        self.log = logging.getLogger('ubuntuone.SyncDaemon')
 
126
 
 
127
    run = lambda self: defer.succeed(True)
 
128
 
 
129
    def pause(self):
 
130
        """Mark as paused."""
 
131
        self.paused = True
 
132
 
 
133
    @property
 
134
    def uniqueness(self):
 
135
        """Fake uniqueness."""
 
136
        if self.share_id is None and self.node_id is None:
 
137
            return self
 
138
        else:
 
139
            return (self.__class__.__name__, self.share_id, self.node_id)
 
140
 
 
141
    def cancel(self):
 
142
        """Cancel!"""
 
143
        self.cancelled = True
 
144
 
 
145
 
 
146
class FakedEventQueue(EventQueue):
 
147
    """Faked event queue."""
 
148
 
 
149
    def __init__(self, fs=None):
 
150
        """Initialize a faked event queue."""
 
151
        super(FakedEventQueue, self).__init__(fs=fs)
 
152
        self.events = []
 
153
 
 
154
    def push(self, event_name, **kwargs):
 
155
        """Faked event pushing."""
 
156
        self.events.append((event_name, kwargs))
 
157
        super(FakedEventQueue, self).push(event_name, **kwargs)
 
158
 
 
159
 
 
160
class FakedVolume(object):
 
161
    """Faked volume."""
 
162
    volume_id = None
 
163
    generation = None
 
164
    free_bytes = None
 
165
 
 
166
 
 
167
class FakeSemaphore(object):
 
168
    """Fake semaphore."""
 
169
 
 
170
    def __init__(self):
 
171
        self.count = 0
 
172
 
 
173
    def acquire(self):
 
174
        """Increase the count."""
 
175
        self.count += 1
 
176
 
 
177
    def release(self):
 
178
        """Decrease the count."""
 
179
        self.count -= 1
 
180
 
 
181
 
 
182
class FakeRequest(object):
 
183
    """Fake Request."""
 
184
    def __init__(self):
 
185
        self.deferred = defer.succeed(True)
 
186
        self.cancelled = False
 
187
 
 
188
    def cancel(self):
 
189
        """Mark cancelled."""
 
190
        self.cancelled = True
 
191
 
 
192
 
 
193
class FakeClient(object):
 
194
    """Fake Client."""
 
195
    def __init__(self):
 
196
        self.called = []
 
197
 
 
198
    def put_content_request(self, *args, **kwargs):
 
199
        """Fake a put content request with its deferred."""
 
200
        self.called.append(('put_content_request', args, kwargs))
 
201
        return FakeRequest()
 
202
 
 
203
    def get_content_request(self, *args, **kwargs):
 
204
        """Fake a get content request with its deferred."""
 
205
        self.called.append(('get_content_request', args, kwargs))
 
206
        return FakeRequest()
 
207
 
 
208
 
 
209
class TestingProtocol(ActionQueue.protocol):
 
210
    """Protocol for testing."""
 
211
 
 
212
    max_payload_size = 65536
 
213
 
 
214
    def connectionMade(self):
 
215
        """connectionMade."""
 
216
        ActionQueue.protocol.connectionMade(self)
 
217
 
 
218
        # assure we're connected
 
219
        events = [x[0] for x in self.factory.event_queue.events]
 
220
        assert 'SYS_CONNECTION_MADE' in events
 
221
 
 
222
        self.factory.event_queue.events = [] # reset events
 
223
        if hasattr(self, 'testing_deferred'):
 
224
            self.testing_deferred.callback(True)
 
225
 
 
226
 
 
227
class TestActionQueue(ActionQueue):
 
228
    """AQ class that uses the testing protocol."""
 
229
    protocol = TestingProtocol
 
230
 
 
231
 
 
232
class BasicTestCase(BaseTwistedTestCase):
 
233
    """Basic test case for ActionQueue."""
 
234
 
 
235
    timeout = 5
 
236
 
 
237
    @defer.inlineCallbacks
 
238
    def setUp(self):
 
239
        """Init."""
 
240
        yield super(BasicTestCase, self).setUp()
 
241
 
 
242
        self.root = self.mktemp('root')
 
243
        self.home = self.mktemp('home')
 
244
        self.data = self.mktemp('data')
 
245
        self.shares = self.mktemp('shares')
 
246
        self.partials = self.mktemp('partials')
 
247
 
 
248
        # set up FakeMain to use the testing AQ, the port will be decided later
 
249
        self.patch(FakeMain, '_fake_AQ_class', TestActionQueue)
 
250
        self.patch(FakeMain, '_fake_AQ_params', ('127.0.0.1', 0, False))
 
251
 
 
252
        self.main = FakeMain(root_dir=self.root, shares_dir=self.shares,
 
253
                             data_dir=self.data, partials_dir=self.partials)
 
254
        self.addCleanup(self.main.shutdown)
 
255
 
 
256
        self.action_queue = self.main.action_q
 
257
        self.action_queue.connection_timeout = 3
 
258
        self.action_queue.event_queue.events = []
 
259
 
 
260
        def keep_a_copy(f):
 
261
            """Keep a copy of the pushed events."""
 
262
            @wraps(f)
 
263
            def recording(event_name, **kwargs):
 
264
                """Keep a copy of the pushed events."""
 
265
                value = (event_name, kwargs)
 
266
                if event_name != 'SYS_STATE_CHANGED' and \
 
267
                   not event_name.startswith('VM_'):
 
268
                    self.action_queue.event_queue.events.append(value)
 
269
                return f(event_name, **kwargs)
 
270
            return recording
 
271
 
 
272
        self.main.event_q.push = keep_a_copy(self.main.event_q.push)
 
273
 
 
274
        self.handler = MementoHandler()
 
275
        self.handler.setLevel(logging.DEBUG)
 
276
        self._logger = logging.getLogger('ubuntuone.SyncDaemon')
 
277
        self._logger.addHandler(self.handler)
 
278
        self.addCleanup(self._logger.removeHandler, self.handler)
 
279
 
 
280
        # setup done according to the platform
 
281
        setup_action_queue_test(self)
 
282
 
 
283
    @defer.inlineCallbacks
 
284
    def tearDown(self):
 
285
        """Cleanup."""
 
286
        for record in self.handler.records:
 
287
            exc_info = getattr(record, 'exc_info', None)
 
288
            if exc_info is not None:
 
289
                raise exc_info[0], exc_info[1], exc_info[2]
 
290
 
 
291
        yield super(BasicTestCase, self).tearDown()
 
292
 
 
293
    def user_connect(self):
 
294
        """User requested to connect to server."""
 
295
        token = {'token': 'bla', 'token_secret': 'ble',
 
296
                 'consumer_key': 'foo', 'consumer_secret': 'bar'}
 
297
        self.action_queue.event_queue.push('SYS_USER_CONNECT',
 
298
                                           access_token=token)
 
299
 
 
300
 
 
301
class BasicTests(BasicTestCase):
 
302
    """Basic tests to check ActionQueue."""
 
303
 
 
304
    def test_implements_interface(self):
 
305
        """Verify ActionQueue and FakeActionQueue interface."""
 
306
        verifyObject(interfaces.IActionQueue, self.action_queue)
 
307
        verifyClass(interfaces.IActionQueue, FakeActionQueue)
 
308
 
 
309
    @defer.inlineCallbacks
 
310
    def test_get_root_and_demark(self):
 
311
        """Get the received Root and demark its mdid."""
 
312
        # get the marker
 
313
        d = self.action_queue.uuid_map.get('mdid')
 
314
 
 
315
        # we received a root!
 
316
        self.main.event_q.push('SYS_ROOT_RECEIVED',
 
317
                               root_id='node_id', mdid='mdid')
 
318
 
 
319
        # it should be demarked with the root node_id
 
320
        root_node_id = yield d
 
321
        self.assertEqual(root_node_id, 'node_id')
 
322
 
 
323
    def test_cancelupload_calls_cancelop(self):
 
324
        """cancel_upload passes the correct args to the generic method."""
 
325
        called = []
 
326
        self.action_queue._cancel_op = lambda *a: called.append(a)
 
327
        self.action_queue.cancel_upload('share', 'node')
 
328
        self.assertEqual(called, [('share', 'node', Upload)])
 
329
 
 
330
    def test_canceldownload_calls_cancelop(self):
 
331
        """cancel_download passes the correct args to the generic method."""
 
332
        called = []
 
333
        self.action_queue._cancel_op = lambda *a: called.append(a)
 
334
        self.action_queue.cancel_download('share', 'node')
 
335
        self.assertEqual(called, [('share', 'node', Download)])
 
336
 
 
337
    def test_cancelop_nothing(self):
 
338
        """It does not cancel anything, because all empty."""
 
339
        assert not self.action_queue.queue.waiting
 
340
        self.action_queue._cancel_op('shr', 'node', Upload)
 
341
        self.assertFalse(self.handler.check_debug('cancel', 'shr', 'node'))
 
342
 
 
343
    def _set_queue(self, *waiting):
 
344
        """Set the content queue content."""
 
345
        cq = self.action_queue.queue
 
346
        for cmd in waiting:
 
347
            cq.waiting.append(cmd)
 
348
            cq.hashed_waiting[cmd.uniqueness] = cmd
 
349
 
 
350
    def test_cancelop_different_sharenode(self):
 
351
        """It does not cancel anything, because queue with different stuff."""
 
352
        cmd1 = FakeCommand('sh', 'nd1')
 
353
        cmd2 = FakeCommand('sh', 'nd2')
 
354
        self._set_queue(cmd1, cmd2)
 
355
        self.action_queue._cancel_op('sh', 'nd3', FakeCommand)
 
356
        self.assertFalse(self.handler.check_debug('external cancel attempt'))
 
357
        self.assertFalse(cmd1.cancelled)
 
358
        self.assertFalse(cmd2.cancelled)
 
359
 
 
360
    def test_cancelop_different_operation(self):
 
361
        """It does not cancel anything, because queue with different stuff."""
 
362
        cmd1 = FakeCommand('sh', 'nd')
 
363
        cmd2 = FakeCommand('sh', 'nd')
 
364
        self._set_queue(cmd1, cmd2)
 
365
        self.action_queue._cancel_op('sh', 'nd', Upload)
 
366
        self.assertFalse(self.handler.check_debug('external cancel attempt'))
 
367
        self.assertFalse(cmd1.cancelled)
 
368
        self.assertFalse(cmd2.cancelled)
 
369
 
 
370
    def test_cancelop_inwaiting(self):
 
371
        """Cancel something that is in the waiting queue."""
 
372
        cmd = FakeCommand('sh', 'nd')
 
373
        self._set_queue(cmd)
 
374
        self.action_queue._cancel_op('sh', 'nd', FakeCommand)
 
375
        self.assertTrue(self.handler.check_debug('external cancel attempt',
 
376
                                                 'sh', 'nd'))
 
377
        self.assertTrue(cmd.cancelled)
 
378
 
 
379
    def test_node_is_queued_move_api(self):
 
380
        """Test that it calls the queue method."""
 
381
        called = []
 
382
        aq = self.action_queue
 
383
        aq.queue.node_is_queued = lambda *a: called.append(a)
 
384
        aq.node_is_with_queued_move('share', 'node')
 
385
        self.assertEqual(called, [(Move, 'share', 'node')])
 
386
 
 
387
    def test_node_is_queued_move_integration(self):
 
388
        """Kind of integration test for this method."""
 
389
        aq = self.action_queue
 
390
        cmd = Move(aq.queue, VOLUME, 'node', 'o_p', 'n_p', 'n_name',
 
391
                   "pathfrom", "pathto")
 
392
        self.assertFalse(aq.node_is_with_queued_move(VOLUME, 'node'))
 
393
        aq.queue.waiting.append(cmd)
 
394
        aq.queue.hashed_waiting[cmd.uniqueness] = cmd
 
395
        self.assertTrue(aq.node_is_with_queued_move(VOLUME, 'node'))
 
396
 
 
397
    def test_event_listener(self):
 
398
        """All event listeners should define methods with correct signature."""
 
399
        for evtname, evtargs in EVENTS.iteritems():
 
400
            meth = getattr(ActionQueue, 'handle_' + evtname, None)
 
401
            if meth is not None:
 
402
                defined_args = inspect.getargspec(meth)[0]
 
403
                self.assertEqual(defined_args[0], 'self')
 
404
                self.assertEqual(set(defined_args[1:]), set(evtargs))
 
405
 
 
406
 
 
407
class TestLoggingStorageClient(TwistedTestCase):
 
408
    """Tests for ensuring magic hash dont show in logs."""
 
409
 
 
410
    def get_message(self):
 
411
        """Produce an upload message."""
 
412
        message = protocol_pb2.Message()
 
413
        message.type = protocol_pb2.Message.PUT_CONTENT
 
414
        message.put_content.share = "share"
 
415
        message.put_content.node = "node"
 
416
        message.put_content.previous_hash = "previous hash"
 
417
        message.put_content.magic_hash = "magic!"
 
418
        message.put_content.hash = "hash"
 
419
        return message
 
420
 
 
421
    def test_sanitize_messages(self):
 
422
        """Messages get sanitized and magic hash is removed."""
 
423
        message = self.get_message()
 
424
        result = action_queue.sanitize_message(message)
 
425
        text = result[0].__mod__(result[1:])
 
426
        self.assertTrue("magic!" not in text)
 
427
        self.assertTrue("share" in text)
 
428
        self.assertTrue("hash" in text)
 
429
        self.assertTrue("previous_hash" in text)
 
430
 
 
431
    def test_logging_storage_client(self):
 
432
        """LoggingStorageClient sanitizes messages."""
 
433
        message = self.get_message()
 
434
        result = []
 
435
        lsc = action_queue.LoggingStorageClient()
 
436
        lsc.log.setLevel(action_queue.TRACE)
 
437
        lsc.log_trace = lambda *args: result.append(args)
 
438
        lsc.log_message(message)
 
439
        self.assertTrue(result, [action_queue.sanitize_message(message)])
 
440
 
 
441
 
 
442
class TestRequestQueue(TwistedTestCase):
 
443
    """Tests for the RequestQueue."""
 
444
 
 
445
    def setUp(self):
 
446
        """Set up."""
 
447
 
 
448
        class FakeAQ(object):
 
449
            """Fake AQ."""
 
450
            event_queue = self.eq = FakedEventQueue()
 
451
 
 
452
        self.rq = RequestQueue(action_queue=FakeAQ())
 
453
        self.addCleanup(self.eq.shutdown)
 
454
 
 
455
        # add a Memento handler to the logger
 
456
        self.log_handler = MementoHandler()
 
457
        self.log_handler.setLevel(logging.DEBUG)
 
458
        logger = logging.getLogger('ubuntuone.SyncDaemon')
 
459
        logger.addHandler(self.log_handler)
 
460
        self.addCleanup(logger.removeHandler, self.log_handler)
 
461
 
 
462
    def _add_to_rq(self, *cmds):
 
463
        """Add the commands to rq.waiting and hashed_waiting."""
 
464
        for cmd in cmds:
 
465
            self.rq.waiting.append(cmd)
 
466
            self.rq.hashed_waiting[cmd.uniqueness] = cmd
 
467
 
 
468
    def test_len_nothing(self):
 
469
        """Len with nothing queued."""
 
470
        self.assertEqual(len(self.rq), 0)
 
471
 
 
472
    def test_len_waiting(self):
 
473
        """Len with something in the queue."""
 
474
        self.rq.waiting.append(1)
 
475
        self.assertEqual(len(self.rq), 1)
 
476
 
 
477
    def test_len_bigger(self):
 
478
        """Len with several commands."""
 
479
        self.rq.waiting.append(1)
 
480
        self.rq.waiting.append(1)
 
481
        self.assertEqual(len(self.rq), 2)
 
482
 
 
483
    def test_queue_adds_to_waiting(self):
 
484
        """Command queues is appended to waiting."""
 
485
        # set up
 
486
        cmd1 = FakeCommand()
 
487
        cmd2 = FakeCommand()
 
488
        self.rq.waiting.append(cmd1)
 
489
 
 
490
        # queue and test
 
491
        self.rq.queue(cmd2)
 
492
        self.assertEqual(list(self.rq.waiting), [cmd1, cmd2])
 
493
 
 
494
    def test_queue_sends_changed_event(self):
 
495
        """Alert something changed."""
 
496
        cmd = FakeCommand()
 
497
        self.rq.queue(cmd)
 
498
        evt = ('SYS_QUEUE_ADDED', dict(command=cmd))
 
499
        self.assertIn(evt, self.eq.events)
 
500
 
 
501
    def test_queue_waiting_if_first(self):
 
502
        """It should send the WAITING signal ."""
 
503
        # set up
 
504
        cmd = FakeCommand()
 
505
 
 
506
        # queue and test
 
507
        self.rq.queue(cmd)
 
508
        self.assertEqual(list(self.rq.waiting), [cmd])
 
509
        self.assertIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
 
510
 
 
511
    def test_queue_nowaiting_if_not_first(self):
 
512
        """It should not send the WAITING signal if no first cmd."""
 
513
        # set up
 
514
        cmd1 = FakeCommand()
 
515
        cmd2 = FakeCommand()
 
516
        self.rq.waiting.append(cmd1)
 
517
 
 
518
        # queue and test
 
519
        self.rq.queue(cmd2)
 
520
        self.assertEqual(list(self.rq.waiting), [cmd1, cmd2])
 
521
        self.assertNotIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
 
522
 
 
523
    def test_with_one_run(self):
 
524
        """Run will execute the command."""
 
525
        cmd = FakeCommand()
 
526
        self.rq.queue(cmd)
 
527
        self.assertIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
 
528
        self.assertNotIn(('SYS_QUEUE_DONE', {}), self.eq.events)
 
529
        self.rq.unqueue(cmd)
 
530
        self.assertIn(('SYS_QUEUE_DONE', {}), self.eq.events)
 
531
 
 
532
    def test_with_two_run(self):
 
533
        """Run will execute both commands."""
 
534
        # first queuing, get the event
 
535
        cmd1 = FakeCommand()
 
536
        self.rq.queue(cmd1)
 
537
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
 
538
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
 
539
 
 
540
        # second queuing, don't get the event
 
541
        cmd2 = FakeCommand()
 
542
        self.rq.queue(cmd2)
 
543
 
 
544
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
 
545
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
 
546
 
 
547
        # first run, no new events
 
548
        self.rq.unqueue(cmd1)
 
549
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
 
550
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
 
551
 
 
552
        # second run, now we're done
 
553
        self.rq.unqueue(cmd2)
 
554
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
 
555
        self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 1)
 
556
 
 
557
    def test_init_notactive(self):
 
558
        """RQ borns not active."""
 
559
        self.assertFalse(self.rq.active)
 
560
 
 
561
    def test_init_activedef(self):
 
562
        """Just instanced queue has the deferred to take."""
 
563
        self.assertTrue(isinstance(self.rq.active_deferred, defer.Deferred))
 
564
 
 
565
    def test_run_goes_active(self):
 
566
        """Activate on run."""
 
567
        self.rq.run()
 
568
        self.assertTrue(self.rq.active)
 
569
 
 
570
    def test_run_triggers_activedef(self):
 
571
        """Trigger the active_deferred on run."""
 
572
        assert not self.rq.active_deferred.called
 
573
        self.rq.run()
 
574
        self.assertTrue(self.rq.active_deferred.called)
 
575
 
 
576
    def test_stop_goes_inactive(self):
 
577
        """Desactivate on stop."""
 
578
        self.rq.active = True
 
579
        self.rq.stop()
 
580
        self.assertFalse(self.rq.active)
 
581
 
 
582
    def test_stop_pauses_commands(self):
 
583
        """Pauses all queued commands on stop."""
 
584
        # set up
 
585
        cmd1 = FakeCommand()
 
586
        cmd2 = FakeCommand()
 
587
        self.rq.waiting.extend((cmd1, cmd2))
 
588
        assert not cmd1.paused and not cmd2.paused
 
589
 
 
590
        # stop and test
 
591
        self.rq.stop()
 
592
        self.assertTrue(cmd1.paused)
 
593
        self.assertTrue(cmd2.paused)
 
594
 
 
595
    def test_stop_pause_useful_activedef(self):
 
596
        """Refresh the active_deferred before pausing."""
 
597
        checked = defer.Deferred()
 
598
 
 
599
        def fake_pause():
 
600
            """Check that RQ has a useful active_deferred."""
 
601
            self.assertTrue(isinstance(self.rq.active_deferred,
 
602
                                       defer.Deferred))
 
603
            self.assertFalse(self.rq.active_deferred.called)
 
604
            checked.callback(True)
 
605
 
 
606
        cmd = FakeCommand()
 
607
        cmd.pause = fake_pause
 
608
        self.rq.waiting.append(cmd)
 
609
 
 
610
        # stop and test
 
611
        self.rq.stop()
 
612
        return checked
 
613
 
 
614
    def test_unqueue_remove(self):
 
615
        """Remove the command from queue on unqueue."""
 
616
        # set up a couple of commands
 
617
        cmd1 = FakeCommand()
 
618
        cmd2 = FakeCommand()
 
619
        self.rq.waiting.extend((cmd1, cmd2))
 
620
        self.rq.hashed_waiting[cmd1] = cmd1
 
621
        self.rq.hashed_waiting[cmd2] = cmd2
 
622
 
 
623
        # unqueue and check that 1 was removed from both structures and
 
624
        # that cmd2 is still there untouched
 
625
        self.rq.unqueue(cmd1)
 
626
        self.assertNotIn(cmd1, self.rq.waiting)
 
627
        self.assertIn(cmd2, self.rq.waiting)
 
628
        self.assertNotIn(cmd1, self.rq.hashed_waiting)
 
629
        self.assertIn(cmd2, self.rq.hashed_waiting)
 
630
 
 
631
    def test_unqueue_sysqueuedone_if_empty(self):
 
632
        """Send SYS_QUEUE_DONE if empty after unqueue."""
 
633
        # set up one command
 
634
        cmd = FakeCommand()
 
635
        self.rq.waiting.append(cmd)
 
636
        self.rq.hashed_waiting[cmd] = cmd
 
637
 
 
638
        # unqueue it and check
 
639
        self.rq.unqueue(cmd)
 
640
        self.assertIn(('SYS_QUEUE_DONE', {}), self.eq.events)
 
641
 
 
642
    def test_unqueue_sysqueuedone_if_not_empty(self):
 
643
        """Do not send SYS_QUEUE_DONE if not empty after unqueue."""
 
644
        # set up a couple of commands
 
645
        cmd1 = FakeCommand()
 
646
        cmd2 = FakeCommand()
 
647
        self.rq.waiting.extend((cmd1, cmd2))
 
648
        self.rq.hashed_waiting[cmd1] = cmd1
 
649
        self.rq.hashed_waiting[cmd2] = cmd2
 
650
 
 
651
        # unqueue only one and check
 
652
        self.rq.unqueue(cmd1)
 
653
        self.assertNotIn(('SYS_QUEUE_DONE', {}), self.eq.events)
 
654
 
 
655
    def test_unqueue_sends_changed_event(self):
 
656
        """Alert something changed."""
 
657
        cmd = FakeCommand()
 
658
        self.rq.waiting.append(cmd)
 
659
        self.rq.unqueue(cmd)
 
660
        evt = ('SYS_QUEUE_REMOVED', dict(command=cmd))
 
661
        self.assertIn(evt, self.eq.events)
 
662
 
 
663
    def test_remove_empty(self):
 
664
        """Don't remove if waiting is empty."""
 
665
        assert not self.rq.waiting, "test badly set up"
 
666
        cmd = FakeCommand()
 
667
        self.rq.remove(cmd)
 
668
        self.assertFalse(self.rq.waiting)
 
669
        self.assertFalse(self.rq.hashed_waiting)
 
670
 
 
671
    def test_remove_other(self):
 
672
        """Don't remove if waiting has other command."""
 
673
        cmd1 = FakeCommand(1, 2)
 
674
        self._add_to_rq(cmd1)
 
675
        cmd2 = FakeCommand(2, 3)
 
676
        self.rq.remove(cmd2)
 
677
        self.assertEqual(list(self.rq.waiting), [cmd1])
 
678
        self.assertEqual(self.rq.hashed_waiting.values(), [cmd1])
 
679
 
 
680
    def test_remove_command(self):
 
681
        """Remove for the command."""
 
682
        cmd = FakeCommand()
 
683
        self._add_to_rq(cmd)
 
684
        self.rq.remove(cmd)
 
685
        self.assertFalse(self.rq.waiting)
 
686
        self.assertFalse(self.rq.hashed_waiting)
 
687
 
 
688
    def test_remove_mixed(self):
 
689
        """Remove ok in a mixed situation."""
 
690
        cmd1 = FakeCommand(1, 2)
 
691
        cmd2 = FakeCommand(2, 3)
 
692
        cmd3 = FakeCommand(3, 4)
 
693
        self._add_to_rq(cmd1, cmd2, cmd3)
 
694
        self.rq.remove(cmd2)
 
695
        self.assertEqual(list(self.rq.waiting), [cmd1, cmd3])
 
696
        self.assertEqual(set(self.rq.hashed_waiting.values()),
 
697
                         set([cmd1, cmd3]))
 
698
 
 
699
    def test_hashedwaiting_queue(self):
 
700
        """Queue a command and it will be added to hashed waiting."""
 
701
        cmd = FakeCommand()
 
702
        self.rq.queue(cmd)
 
703
        self.assertTrue(self.rq.hashed_waiting.values(), [cmd])
 
704
 
 
705
    def test_node_is_queued_nothing(self):
 
706
        """Test with empty queues."""
 
707
        self.assertFalse(self.rq.node_is_queued(Move, 'share', 'node'))
 
708
 
 
709
    def test_node_is_queued_waiting(self):
 
710
        """Test with a command in waiting."""
 
711
        cmd = FakeCommand('share', 'node')
 
712
        self._add_to_rq(cmd)
 
713
        self.assertTrue(self.rq.node_is_queued(FakeCommand, 'share', 'node'))
 
714
 
 
715
    def test_node_is_queued_different_command(self):
 
716
        """The node is queued, but other command on it."""
 
717
        cmd = FakeCommand('share', 'node')
 
718
        self._add_to_rq(cmd)
 
719
        self.assertFalse(self.rq.node_is_queued(Move, 'share', 'node'))
 
720
 
 
721
    def test_node_is_queued_different_node(self):
 
722
        """The command is queued, but on other node."""
 
723
        cmd = FakeCommand('share', 'node')
 
724
        self._add_to_rq(cmd)
 
725
        self.assertFalse(self.rq.node_is_queued(FakeCommand, 'share', 'other'))
 
726
 
 
727
    def test_len_empty(self):
 
728
        """Counter return that it's empty."""
 
729
        self.assertEqual(len(self.rq), 0)
 
730
 
 
731
    def test_len_with_one(self):
 
732
        """Counter return that it has one."""
 
733
        cmd = FakeCommand()
 
734
        self.rq.queue(cmd)
 
735
        self.assertEqual(len(self.rq), 1)
 
736
 
 
737
    def test_len_with_two(self):
 
738
        """Counter return that it has two."""
 
739
        cmd = FakeCommand()
 
740
        self.rq.queue(cmd)
 
741
        self.rq.queue(cmd)
 
742
        self.assertEqual(len(self.rq), 2)
 
743
 
 
744
    def test_len_run_decreases(self):
 
745
        """Counter behaviour when adding/running."""
 
746
        cmd1 = FakeCommand()
 
747
        cmd2 = FakeCommand()
 
748
        self.rq.queue(cmd1)
 
749
        self.assertEqual(len(self.rq), 1)
 
750
        self.rq.queue(cmd2)
 
751
        self.assertEqual(len(self.rq), 2)
 
752
        self.rq.unqueue(cmd1)
 
753
        self.assertEqual(len(self.rq), 1)
 
754
        self.rq.unqueue(cmd2)
 
755
        self.assertEqual(len(self.rq), 0)
 
756
 
 
757
    def test_init_simult_transfers(self):
 
758
        """Configure the transfers semaphore according to config."""
 
759
        user_config = config.get_user_config()
 
760
        user_config.set_simult_transfers(12345)
 
761
        rq = RequestQueue(action_queue=None)
 
762
        self.assertEqual(rq.transfers_semaphore.tokens, 12345)
 
763
 
 
764
 
 
765
class TestDeferredMap(TwistedTestCase):
 
766
    """Test the deferred map."""
 
767
 
 
768
    def setUp(self):
 
769
        """Set up."""
 
770
        self.dm = DeferredMap()
 
771
 
 
772
    def test_one_get_returns_stored_deferred(self):
 
773
        """Get will return the stored deferred."""
 
774
        d = self.dm.get('foo')
 
775
        self.assertEqual(self.dm.waiting, {'foo': [d]})
 
776
 
 
777
    def test_two_gets_returns_second_deferred_other_key(self):
 
778
        """A second get for other key will return other deferred."""
 
779
        d1 = self.dm.get('foo')
 
780
        d2 = self.dm.get('bar')
 
781
        self.assertEqual(self.dm.waiting, {'foo': [d1], 'bar': [d2]})
 
782
 
 
783
    def test_two_gets_returns_second_deferred_same_key(self):
 
784
        """A second get for the same key will return other deferred."""
 
785
        d1 = self.dm.get('foo')
 
786
        d2 = self.dm.get('foo')
 
787
        self.assertEqual(self.dm.waiting, {'foo': [d1, d2]})
 
788
 
 
789
    def test_mixed_gets(self):
 
790
        """Several gets with different keys."""
 
791
        d1 = self.dm.get('foo')
 
792
        d2 = self.dm.get('bar')
 
793
        d3 = self.dm.get('foo')
 
794
        d4 = self.dm.get('baz')
 
795
        self.assertEqual(self.dm.waiting,
 
796
                         {'foo': [d1, d3], 'bar': [d2], 'baz': [d4]})
 
797
 
 
798
    def test_set_to_nothing(self):
 
799
        """It's ok to set a key that is not being waited."""
 
800
        self.dm.set('not there', 'value')
 
801
 
 
802
    @defer.inlineCallbacks
 
803
    def test_set_fires_deferred_single(self):
 
804
        """The set fires the unique waiting deferred with the value."""
 
805
        d1 = self.dm.get('foo')
 
806
        d2 = self.dm.get('bar')
 
807
        d3 = self.dm.get('foo')
 
808
        self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
 
809
 
 
810
        self.dm.set('bar', 'value')
 
811
        res = yield d2
 
812
        self.assertEqual(res, 'value')
 
813
        self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
 
814
 
 
815
    @defer.inlineCallbacks
 
816
    def test_set_fires_deferred_multiple(self):
 
817
        """The set fires the multiple waiting deferreds with the value."""
 
818
        d1 = self.dm.get('foo')
 
819
        d2 = self.dm.get('bar')
 
820
        d3 = self.dm.get('foo')
 
821
        self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
 
822
 
 
823
        self.dm.set('foo', 'value')
 
824
        res1 = yield d1
 
825
        res2 = yield d3
 
826
        self.assertEqual(res1, 'value')
 
827
        self.assertEqual(res2, 'value')
 
828
        self.assertEqual(self.dm.waiting, {'bar': [d2]})
 
829
 
 
830
    def test_err_to_nothing(self):
 
831
        """It's ok to err a key that is not being waited."""
 
832
        self.dm.err('not there', 'failure')
 
833
 
 
834
    @defer.inlineCallbacks
 
835
    def test_err_fires_deferred_single(self):
 
836
        """The set fires the unique waiting deferred with the failure."""
 
837
        d1 = self.dm.get('foo')
 
838
        d2 = self.dm.get('bar')
 
839
        d3 = self.dm.get('foo')
 
840
        self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
 
841
 
 
842
        exc = Exception('problem!')
 
843
        self.dm.err('bar', Failure(exc))
 
844
        try:
 
845
            yield d2
 
846
        except Exception, e:
 
847
            self.assertEqual(e, exc)
 
848
        else:
 
849
            self.fail("It didn't fired the deferred with a failure!")
 
850
        self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
 
851
 
 
852
 
 
853
class TestZipQueue(TwistedTestCase):
 
854
    """Test the zipping queue."""
 
855
 
 
856
    def setUp(self):
 
857
        """Set up."""
 
858
        self.zq = ZipQueue()
 
859
 
 
860
    @defer.inlineCallbacks
 
861
    def test_zip_calls_compress_in_thread(self):
 
862
        """Test that self._compress is called in another thread."""
 
863
        upload = FakeCommand()
 
864
        mocker = Mocker()
 
865
        fake_fileobj = mocker.mock()
 
866
        fake_fileobj.close()
 
867
        upload.fileobj_factory = lambda: fake_fileobj
 
868
 
 
869
        def fake_compress(deferred, _upload, fileobj):
 
870
            """Fake the _compress method."""
 
871
            self.assertEqual(upload, _upload)
 
872
            deferred.callback(True)
 
873
 
 
874
        self.zq._compress = fake_compress
 
875
        with mocker:
 
876
            yield self.zq.zip(upload)
 
877
 
 
878
    @defer.inlineCallbacks
 
879
    def test_zip_calls_compress_with_file_object(self):
 
880
        """Test that _compress is called with the result of fileobj factory."""
 
881
        upload = FakeCommand()
 
882
        upload.fileobj_factory = lambda: Mocker().mock().close()
 
883
 
 
884
        def fake_compress(deferred, upload, fileobj):
 
885
            """Fake the _compress method."""
 
886
            deferred.callback(True)
 
887
 
 
888
        self.zq._compress = fake_compress
 
889
        yield self.zq.zip(upload)
 
890
 
 
891
    @defer.inlineCallbacks
 
892
    def test_fileobj_factory_error_is_logged(self):
 
893
        """Log the error when fileobj_factory fails."""
 
894
        def crash():
 
895
            """Crash!"""
 
896
            raise ValueError("foo")
 
897
 
 
898
        upload = FakeCommand()
 
899
        upload.fileobj_factory = crash
 
900
 
 
901
        # set up the logger
 
902
        self.handler = MementoHandler()
 
903
        self.handler.setLevel(logging.DEBUG)
 
904
        upload.log.addHandler(self.handler)
 
905
 
 
906
        yield self.zq.zip(upload)
 
907
        self.assertTrue(self.handler.check_warning("Unable to build fileobj",
 
908
                                                   "ValueError", "foo"))
 
909
 
 
910
    @defer.inlineCallbacks
 
911
    def test_fileobj_factory_error_cancels_upload(self):
 
912
        """Cancel the upload when fileobj_factory fails."""
 
913
        upload = FakeCommand()
 
914
 
 
915
        yield self.zq.zip(upload)
 
916
        self.assertTrue(upload.cancelled)
 
917
 
 
918
    @defer.inlineCallbacks
 
919
    def test_fileobj_factory_error_dont_call_compress(self):
 
920
        """Stop the execution if fileobj_factory fails."""
 
921
        upload = FakeCommand()
 
922
        called = []
 
923
        self.zq._compress = lambda *a: called.append(True)
 
924
        yield self.zq.zip(upload)
 
925
        self.assertFalse(called)
 
926
 
 
927
    @defer.inlineCallbacks
 
928
    def test_zip_acquire_lock(self):
 
929
        """Test that it acquires the lock."""
 
930
        called = []
 
931
        self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
 
932
 
 
933
        def fake_acquire():
 
934
            """Fake the acquire method."""
 
935
            self.zq.tokens = 1
 
936
            called.append(True)
 
937
            return defer.succeed(True)
 
938
 
 
939
        self.zq.acquire = fake_acquire
 
940
        upload = FakeCommand()
 
941
        upload.fileobj_factory = lambda: Mocker().mock().close()
 
942
        yield self.zq.zip(upload)
 
943
        self.assertTrue(called)
 
944
 
 
945
    @defer.inlineCallbacks
 
946
    def test_zip_release_lock_ok(self):
 
947
        """Test that it releases the lock when all ok."""
 
948
        called = []
 
949
        self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
 
950
        self.zq.release = lambda: called.append(True)
 
951
 
 
952
        upload = FakeCommand()
 
953
        upload.fileobj_factory = lambda: Mocker().mock().close()
 
954
        yield self.zq.zip(upload)
 
955
        self.assertTrue(called)
 
956
 
 
957
    @defer.inlineCallbacks
 
958
    def test_zip_release_lock_compression_error(self):
 
959
        """Test that it releases the lock even on compression error."""
 
960
        called = []
 
961
        exc = Exception('bad')
 
962
        self.zq._compress = lambda deferred, upl, fobj: deferred.errback(exc)
 
963
        self.zq.release = lambda: called.append(True)
 
964
        upload = FakeCommand()
 
965
        upload.fileobj_factory = lambda: Mocker().mock().close()
 
966
 
 
967
        try:
 
968
            yield self.zq.zip(upload)
 
969
        except Exception, e:
 
970
            # need to silent the exception we're generating in the test
 
971
            self.assertEqual(e, exc)
 
972
        else:
 
973
            self.fail("It should have raised the exception!")
 
974
        self.assertTrue(called)
 
975
 
 
976
    @defer.inlineCallbacks
 
977
    def test_zip_release_lock_fileobjfactory_error(self):
 
978
        """Test that it releases the lock even on file factory error."""
 
979
        called = []
 
980
        self.zq.release = lambda: called.append(True)
 
981
        upload = FakeCommand()
 
982
 
 
983
        yield self.zq.zip(upload)
 
984
        self.assertTrue(called)
 
985
 
 
986
    @defer.inlineCallbacks
 
987
    def test_fileobj_closed_ok(self):
 
988
        """Close the fileobj after compressing ok."""
 
989
        mocker = Mocker()
 
990
        fake_fileobj = mocker.mock()
 
991
        fake_fileobj.close()
 
992
 
 
993
        self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
 
994
 
 
995
        upload = FakeCommand()
 
996
        upload.fileobj_factory = lambda: fake_fileobj
 
997
        with mocker:
 
998
            yield self.zq.zip(upload)
 
999
 
 
1000
    @defer.inlineCallbacks
 
1001
    def test_fileobj_closed_error(self):
 
1002
        """Close the fileobj after compressing with error."""
 
1003
        mocker = Mocker()
 
1004
        fake_fileobj = mocker.mock()
 
1005
        fake_fileobj.close()
 
1006
 
 
1007
        exc = Exception('bad')
 
1008
        self.zq._compress = lambda deferred, upl, fobj: deferred.errback(exc)
 
1009
 
 
1010
        upload = FakeCommand()
 
1011
        upload.fileobj_factory = lambda: fake_fileobj
 
1012
        with mocker:
 
1013
            try:
 
1014
                yield self.zq.zip(upload)
 
1015
            except Exception, e:
 
1016
                # need to silent the exception we're generating in the test
 
1017
                self.assertEqual(e, exc)
 
1018
            else:
 
1019
                self.fail("It should have raised the exception!")
 
1020
 
 
1021
    @defer.inlineCallbacks
 
1022
    def test_compress_gets_compressed_data(self):
 
1023
        """Compressed data is generated by _compress."""
 
1024
        upload = FakeCommand()
 
1025
        data = "a lot of data to compress"
 
1026
        fileobj = StringIO(data)
 
1027
 
 
1028
        # call and wait
 
1029
        d = defer.Deferred()
 
1030
        reactor.callInThread(self.zq._compress, d, upload, fileobj)
 
1031
        yield d
 
1032
 
 
1033
        compressed = open_file(upload.tempfile.name).read()
 
1034
        self.assertEqual(compressed, data.encode('zip'))
 
1035
 
 
1036
    @defer.inlineCallbacks
 
1037
    def test_compress_gets_magic_hash(self):
 
1038
        """The magic hash is generated by _compress."""
 
1039
        upload = FakeCommand()
 
1040
        data = "a lot of data to hash"
 
1041
        fileobj = StringIO(data)
 
1042
 
 
1043
        # what the hash should give us
 
1044
        mh = content_hash.magic_hash_factory()
 
1045
        mh.update(data)
 
1046
        should_hash = mh.content_hash()._magic_hash
 
1047
 
 
1048
        # call and wait
 
1049
        d = defer.Deferred()
 
1050
        reactor.callInThread(self.zq._compress, d, upload, fileobj)
 
1051
        yield d
 
1052
 
 
1053
        hashed = upload.magic_hash._magic_hash
 
1054
        self.assertEqual(hashed, should_hash)
 
1055
 
 
1056
    def test_compress_release_deferred_cancelled_command(self):
 
1057
        """Release the deferred if the command is cancelled."""
 
1058
        upload = FakeCommand()
 
1059
        upload.cancelled = True
 
1060
 
 
1061
        # call and wait
 
1062
        d = defer.Deferred()
 
1063
        reactor.callInThread(self.zq._compress, d, upload, None)
 
1064
        return d
 
1065
 
 
1066
    @defer.inlineCallbacks
 
1067
    def test_compress_release_deferred_on_error(self):
 
1068
        """Release the deferred on an error."""
 
1069
        upload = None  # _compress will fail because want to access .cancelled
 
1070
 
 
1071
        # call and wait
 
1072
        d = defer.Deferred()
 
1073
        reactor.callInThread(self.zq._compress, d, upload, None)
 
1074
        yield self.assertFailure(d, AttributeError)
 
1075
 
 
1076
 
 
1077
class FactoryBaseTestCase(BasicTestCase):
 
1078
    """Helper for by-pass Twisted."""
 
1079
 
 
1080
    def _start_sample_webserver(self):
 
1081
        """Start a web server serving content at its root"""
 
1082
        # start listening on `decide yourself` port, and fix AQ with it
 
1083
        website = server.Site(None)
 
1084
        webport = reactor.listenTCP(0, website)
 
1085
        self.action_queue.port = webport.getHost().port
 
1086
 
 
1087
        transport_class = webport.transport
 
1088
        def save_an_instance(skt, protocol, addr, sself, s, sreactor):
 
1089
            self.server_transport = transport_class(skt, protocol, addr, sself,
 
1090
                                                    s, sreactor)
 
1091
            return self.server_transport
 
1092
        webport.transport = save_an_instance
 
1093
 
 
1094
        self.addCleanup(webport.stopListening)
 
1095
        return webport
 
1096
 
 
1097
    def _connect_factory(self):
 
1098
        """Connect the instance factory."""
 
1099
        self.server = self._start_sample_webserver()
 
1100
        orig = self.action_queue.buildProtocol
 
1101
 
 
1102
        d = defer.Deferred()
 
1103
        def faked_buildProtocol(*args, **kwargs):
 
1104
            """Override buildProtocol to hook a deferred."""
 
1105
            protocol = orig(*args, **kwargs)
 
1106
            protocol.testing_deferred = d
 
1107
            return protocol
 
1108
 
 
1109
        self.action_queue.buildProtocol = faked_buildProtocol
 
1110
        self.action_queue.connect()
 
1111
        return d
 
1112
 
 
1113
    def _disconnect_factory(self):
 
1114
        """Disconnect the instance factory."""
 
1115
        if self.action_queue.client is not None:
 
1116
            orig = self.action_queue.client.connectionLost
 
1117
 
 
1118
            d = defer.Deferred()
 
1119
            def faked_connectionLost(reason):
 
1120
                """Receive connection lost and fire tearDown."""
 
1121
                orig(reason)
 
1122
                d.callback(True)
 
1123
 
 
1124
            self.action_queue.client.connectionLost = faked_connectionLost
 
1125
        else:
 
1126
            d = defer.succeed(True)
 
1127
 
 
1128
        if self.action_queue.connect_in_progress:
 
1129
            self.action_queue.disconnect()
 
1130
 
 
1131
        return d
 
1132
 
 
1133
 
 
1134
class ConnectionTestCase(FactoryBaseTestCase):
 
1135
    """Test TCP/SSL connection mechanism for ActionQueue."""
 
1136
 
 
1137
    def assert_connection_state_reset(self):
 
1138
        """Test connection state is properly reset."""
 
1139
        self.assertTrue(self.action_queue.client is None)
 
1140
        self.assertTrue(self.action_queue.connector is None)
 
1141
        self.assertEqual(False, self.action_queue.connect_in_progress)
 
1142
 
 
1143
    def test_init(self):
 
1144
        """Test connection init state."""
 
1145
        self.assert_connection_state_reset()
 
1146
 
 
1147
    @defer.inlineCallbacks
 
1148
    def test_connect_if_already_connected(self):
 
1149
        """Test that double connections are avoided."""
 
1150
        yield self._connect_factory()
 
1151
 
 
1152
        assert self.action_queue.connector is not None
 
1153
        assert self.action_queue.connect_in_progress == True
 
1154
        # double connect, it returns None instead of a Deferred
 
1155
        result = self.action_queue.connect()
 
1156
        self.assertTrue(result is None, 'not connecting again')
 
1157
 
 
1158
        yield self._disconnect_factory()
 
1159
 
 
1160
    @defer.inlineCallbacks
 
1161
    def test_disconnect_if_connected(self):
 
1162
        """self.action_queue.connector.disconnect was called."""
 
1163
        yield self._connect_factory()
 
1164
 
 
1165
        self.action_queue.event_queue.events = [] # cleanup events
 
1166
        assert self.action_queue.connector.state == 'connected'
 
1167
        self.action_queue.disconnect()
 
1168
 
 
1169
        self.assert_connection_state_reset()
 
1170
        self.assertEqual([], self.action_queue.event_queue.events)
 
1171
 
 
1172
        yield self._disconnect_factory()
 
1173
 
 
1174
    @defer.inlineCallbacks
 
1175
    def test_clientConnectionFailed(self):
 
1176
        """Test clientConnectionFailed.
 
1177
 
 
1178
        The connection will not be completed since the server will be down. So,
 
1179
        self.action_queue.connector will never leave the 'connecting' state.
 
1180
        When interrupting the connection attempt, twisted automatically calls
 
1181
        self.action_queue.clientConnectionFailed.
 
1182
 
 
1183
        """
 
1184
        self.action_queue.event_queue.events = []
 
1185
        orig = self.action_queue.clientConnectionFailed
 
1186
 
 
1187
        d = defer.Deferred()
 
1188
        def faked_clientConnectionFailed(connector, reason):
 
1189
            """Receive connection failed and check."""
 
1190
            orig(connector, reason)
 
1191
            self.assert_connection_state_reset()
 
1192
            self.assertEqual([('SYS_CONNECTION_FAILED', {})],
 
1193
                             self.action_queue.event_queue.events)
 
1194
            self.action_queue.clientConnectionFailed = orig
 
1195
            d.callback(True)
 
1196
 
 
1197
        self.action_queue.clientConnectionFailed = faked_clientConnectionFailed
 
1198
        # factory will never finish the connection, server was never started
 
1199
        self.action_queue.connect()
 
1200
        # stopConnecting() will be called since the connection is in progress
 
1201
        assert self.action_queue.connector.state == 'connecting'
 
1202
        self.action_queue.connector.disconnect()
 
1203
 
 
1204
        yield d
 
1205
 
 
1206
    @defer.inlineCallbacks
 
1207
    def test_clientConnectionLost(self):
 
1208
        """Test clientConnectionLost
 
1209
 
 
1210
        The connection will be completed successfully.
 
1211
        So, self.action_queue.connector will be in the 'connected' state.
 
1212
        When disconnecting the connector, twisted automatically calls
 
1213
        self.action_queue.clientConnectionLost.
 
1214
 
 
1215
        """
 
1216
        yield self._connect_factory()
 
1217
 
 
1218
        self.action_queue.event_queue.events = []
 
1219
        orig = self.action_queue.clientConnectionLost
 
1220
 
 
1221
        d = defer.Deferred()
 
1222
        def faked_clientConnectionLost(connector, reason):
 
1223
            """Receive connection lost and check."""
 
1224
            orig(connector, reason)
 
1225
            self.assert_connection_state_reset()
 
1226
            self.assertEqual([('SYS_CONNECTION_LOST', {})],
 
1227
                             self.action_queue.event_queue.events)
 
1228
            self.action_queue.clientConnectionLost = orig
 
1229
            d.callback(True)
 
1230
 
 
1231
        self.action_queue.clientConnectionLost = faked_clientConnectionLost
 
1232
        # loseConnection() will be called since the connection was completed
 
1233
        assert self.action_queue.connector.state == 'connected'
 
1234
        self.action_queue.connector.disconnect()
 
1235
        yield d
 
1236
 
 
1237
        yield self._disconnect_factory()
 
1238
 
 
1239
    @defer.inlineCallbacks
 
1240
    def test_server_disconnect(self):
 
1241
        """Test factory's connection when the server goes down."""
 
1242
 
 
1243
        yield self._connect_factory()
 
1244
 
 
1245
        self.action_queue.event_queue.events = []
 
1246
        orig = self.action_queue.clientConnectionLost
 
1247
 
 
1248
        d = defer.Deferred()
 
1249
        def faked_connectionLost(*args, **kwargs):
 
1250
            """Receive connection lost and check."""
 
1251
            orig(*args, **kwargs)
 
1252
            self.assert_connection_state_reset()
 
1253
            self.assertEqual([('SYS_CONNECTION_LOST', {})],
 
1254
                             self.action_queue.event_queue.events)
 
1255
            self.action_queue.clientConnectionLost = orig
 
1256
            d.callback(True)
 
1257
 
 
1258
        self.action_queue.clientConnectionLost = faked_connectionLost
 
1259
        # simulate a server failure!
 
1260
        yield self.server_transport.loseConnection()
 
1261
        yield d
 
1262
        yield self._disconnect_factory()
 
1263
 
 
1264
    def test_buildProtocol(self):
 
1265
        """Test buildProtocol."""
 
1266
        protocol = self.action_queue.buildProtocol(addr=None)
 
1267
        self.assertTrue(protocol is self.action_queue.client)
 
1268
        self.assertTrue(self.action_queue is self.action_queue.client.factory)
 
1269
 
 
1270
        # callbacks are connected
 
1271
        # pylint: disable-msg=W0212
 
1272
        aq = self.action_queue
 
1273
        self.assertEqual(aq.client._share_change_callback,
 
1274
                         aq._share_change_callback)
 
1275
        self.assertEqual(aq.client._share_answer_callback,
 
1276
                         aq._share_answer_callback)
 
1277
        self.assertEqual(aq.client._free_space_callback,
 
1278
                         aq._free_space_callback)
 
1279
        self.assertEqual(aq.client._account_info_callback,
 
1280
                         aq._account_info_callback)
 
1281
        self.assertEqual(aq.client._volume_created_callback,
 
1282
                         aq._volume_created_callback)
 
1283
        self.assertEqual(aq.client._volume_deleted_callback,
 
1284
                         aq._volume_deleted_callback)
 
1285
        self.assertEqual(aq.client._volume_new_generation_callback,
 
1286
                         aq._volume_new_generation_callback)
 
1287
 
 
1288
    @defer.inlineCallbacks
 
1289
    def test_connector_gets_assigned_on_connect(self):
 
1290
        """Test factory's connector gets assigned on connect."""
 
1291
        yield self._connect_factory()
 
1292
 
 
1293
        self.assertTrue(self.action_queue.connector is not None)
 
1294
 
 
1295
        yield self._disconnect_factory()
 
1296
 
 
1297
    def test_connection_started_logging(self):
 
1298
        """Test that the connection started logs connector info, not AQ's."""
 
1299
        assert self.action_queue.host == '127.0.0.1'
 
1300
        assert self.action_queue.port == 0
 
1301
 
 
1302
        class FakeConnector(object):
 
1303
            """Fake connector."""
 
1304
            host = '1.2.3.4'
 
1305
            port = 4321
 
1306
 
 
1307
        self.action_queue.startedConnecting(FakeConnector())
 
1308
        self.assertTrue(self.handler.check_info("Connection started",
 
1309
                                                "host 1.2.3.4", "port 4321"))
 
1310
 
 
1311
 
 
1312
class NetworkmanagerTestCase(FactoryBaseTestCase):
 
1313
    """Base test case generating a connected factory."""
 
1314
 
 
1315
    timeout = 15
 
1316
 
 
1317
    @defer.inlineCallbacks
 
1318
    def setUp(self):
 
1319
        """Init."""
 
1320
        yield super(NetworkmanagerTestCase, self).setUp()
 
1321
        self.action_queue.event_queue.push('SYS_NET_CONNECTED')
 
1322
        self.main.start()
 
1323
 
 
1324
    @defer.inlineCallbacks
 
1325
    def test_wrong_disconnect(self):
 
1326
        """Test factory's connection when SYS_NET_DISCONNECTED."""
 
1327
 
 
1328
        d1 = self.main.wait_for('SYS_CONNECTION_MADE')
 
1329
        d2 = self.main.wait_for('SYS_CONNECTION_LOST')
 
1330
 
 
1331
        self.server = self._start_sample_webserver()
 
1332
        self.user_connect()
 
1333
        yield d1
 
1334
 
 
1335
        self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
 
1336
        yield d2
 
1337
 
 
1338
    @defer.inlineCallbacks
 
1339
    def test_disconnect_twice(self):
 
1340
        """Test connection when SYS_NET_DISCONNECTED is received twice."""
 
1341
 
 
1342
        d1 = self.main.wait_for('SYS_CONNECTION_MADE')
 
1343
        d2 = self.main.wait_for('SYS_CONNECTION_LOST')
 
1344
 
 
1345
        self.server = self._start_sample_webserver()
 
1346
        self.user_connect()
 
1347
        yield d1
 
1348
 
 
1349
        self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
 
1350
        yield d2
 
1351
 
 
1352
        self.action_queue.event_queue.events = []
 
1353
        self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
 
1354
        self.assertEqual([('SYS_NET_DISCONNECTED', {})],
 
1355
                         self.action_queue.event_queue.events,
 
1356
                        'No new events after a misplaced SYS_NET_DISCONNECTED')
 
1357
 
 
1358
    @defer.inlineCallbacks
 
1359
    def test_net_connected_if_already_connected(self):
 
1360
        """Test connection when SYS_NET_CONNECTED is received twice."""
 
1361
 
 
1362
        d1 = self.main.wait_for('SYS_CONNECTION_MADE')
 
1363
 
 
1364
        self.server = self._start_sample_webserver()
 
1365
        self.user_connect()
 
1366
        yield d1
 
1367
 
 
1368
        self.action_queue.event_queue.events = []
 
1369
        self.action_queue.event_queue.push('SYS_NET_CONNECTED')
 
1370
        self.assertEqual([('SYS_NET_CONNECTED', {})],
 
1371
                         self.action_queue.event_queue.events,
 
1372
                         'No new events after a misplaced SYS_NET_CONNECTED')
 
1373
 
 
1374
    @defer.inlineCallbacks
 
1375
    def test_messy_mix(self):
 
1376
        """Test connection when a messy mix of events is received."""
 
1377
        orig_waiting = states.MAX_WAITING
 
1378
        states.MAX_WAITING = 1
 
1379
 
 
1380
        self.action_queue.event_queue.events = []
 
1381
        self.server = self._start_sample_webserver()
 
1382
 
 
1383
        conn_made = self.main.wait_for('SYS_CONNECTION_MADE')
 
1384
        self.user_connect()
 
1385
        yield conn_made
 
1386
 
 
1387
        events = ['SYS_NET_CONNECTED', 'SYS_NET_DISCONNECTED',
 
1388
                  'SYS_NET_CONNECTED', 'SYS_NET_CONNECTED',
 
1389
                  'SYS_NET_DISCONNECTED', 'SYS_NET_DISCONNECTED',
 
1390
                  'SYS_NET_CONNECTED']
 
1391
 
 
1392
        for i in events:
 
1393
            self.action_queue.event_queue.push(i)
 
1394
 
 
1395
        yield self.main.wait_for_nirvana()
 
1396
 
 
1397
        expected = ['SYS_NET_CONNECTED', # from the DBus fake NetworkManager
 
1398
                    'SYS_NET_CONNECTED', 'SYS_NET_DISCONNECTED',
 
1399
                    'SYS_CONNECTION_LOST', 'SYS_CONNECTION_RETRY',
 
1400
                    'SYS_NET_CONNECTED', 'SYS_NET_CONNECTED',
 
1401
                    'SYS_CONNECTION_MADE', 'SYS_NET_DISCONNECTED',
 
1402
                    'SYS_NET_DISCONNECTED']
 
1403
 
 
1404
        avoid = ('SYS_STATE_CHANGED', 'SYS_LOCAL_RESCAN_DONE',
 
1405
                 'SYS_PROTOCOL_VERSION_OK', 'SYS_SET_CAPABILITIES_OK',
 
1406
                 'SYS_AUTH_OK', 'SYS_SERVER_RESCAN_DONE')
 
1407
        actual = [event for (event, kwargs) in
 
1408
                  self.action_queue.event_queue.events
 
1409
                  if event not in avoid]
 
1410
        self.assertEqual(sorted(expected), sorted(actual))
 
1411
 
 
1412
        states.MAX_WAITING = orig_waiting
 
1413
 
 
1414
 
 
1415
class ConnectedBaseTestCase(FactoryBaseTestCase):
 
1416
    """Base test case generating a connected factory."""
 
1417
 
 
1418
    @defer.inlineCallbacks
 
1419
    def setUp(self):
 
1420
        """Init."""
 
1421
        yield super(ConnectedBaseTestCase, self).setUp()
 
1422
        yield self._connect_factory()
 
1423
        assert self.action_queue.connector.state == 'connected'
 
1424
 
 
1425
    @defer.inlineCallbacks
 
1426
    def tearDown(self):
 
1427
        """Clean up."""
 
1428
        yield self._disconnect_factory()
 
1429
        yield super(ConnectedBaseTestCase, self).tearDown()
 
1430
 
 
1431
    def silent_connection_lost(self, failure):
 
1432
        """Some tests will generate connection lost, support it."""
 
1433
        if not failure.check(twisted_error.ConnectionDone,
 
1434
                             twisted_error.ConnectionLost):
 
1435
            return failure
 
1436
 
 
1437
 
 
1438
class VolumeManagementTestCase(ConnectedBaseTestCase):
 
1439
    """Test Volume managemenr for ActionQueue."""
 
1440
 
 
1441
    @defer.inlineCallbacks
 
1442
    def setUp(self):
 
1443
        """Init."""
 
1444
        yield super(VolumeManagementTestCase, self).setUp()
 
1445
 
 
1446
        # silence the event to avoid propagation
 
1447
        listener_map = self.action_queue.event_queue.listener_map
 
1448
        del listener_map['SV_VOLUME_DELETED']
 
1449
 
 
1450
    def test_volume_created_push_event(self):
 
1451
        """Volume created callback push proper event."""
 
1452
        volume = FakedVolume()
 
1453
        self.action_queue._volume_created_callback(volume)
 
1454
        self.assertEqual([('SV_VOLUME_CREATED', {'volume': volume})],
 
1455
                          self.action_queue.event_queue.events)
 
1456
 
 
1457
    def test_volume_deleted_push_event(self):
 
1458
        """Volume deleted callback push proper event."""
 
1459
        volume_id = VOLUME
 
1460
        self.action_queue._volume_deleted_callback(volume_id)
 
1461
        self.assertEqual([('SV_VOLUME_DELETED', {'volume_id': volume_id})],
 
1462
                          self.action_queue.event_queue.events)
 
1463
 
 
1464
    def test_volume_new_generation_push_event_root(self):
 
1465
        """Volume New Generation callback push proper event with root."""
 
1466
        volume = request.ROOT
 
1467
        self.action_queue._volume_new_generation_callback(volume, 77)
 
1468
        event = ('SV_VOLUME_NEW_GENERATION',
 
1469
                 {'volume_id': volume, 'generation': 77})
 
1470
        self.assertTrue(event in self.action_queue.event_queue.events)
 
1471
 
 
1472
    def test_volume_new_generation_push_event_uuid(self):
 
1473
        """Volume New Generation callback push proper event with uuid."""
 
1474
        volume = uuid.uuid4()
 
1475
        self.action_queue._volume_new_generation_callback(volume, 77)
 
1476
        event = ('SV_VOLUME_NEW_GENERATION',
 
1477
                 {'volume_id': volume, 'generation': 77})
 
1478
        self.assertTrue(event in self.action_queue.event_queue.events)
 
1479
 
 
1480
    def test_valid_events(self):
 
1481
        """Volume events are valid in EventQueue."""
 
1482
        new_events = ('SV_VOLUME_CREATED', 'SV_VOLUME_DELETED',
 
1483
                      'AQ_CREATE_UDF_OK', 'AQ_CREATE_UDF_ERROR',
 
1484
                      'AQ_LIST_VOLUMES', 'AQ_LIST_VOLUMES_ERROR',
 
1485
                      'AQ_DELETE_VOLUME_OK', 'AQ_DELETE_VOLUME_ERROR',
 
1486
                      'SV_VOLUME_NEW_GENERATION')
 
1487
        for event in new_events:
 
1488
            self.assertTrue(event in EVENTS)
 
1489
 
 
1490
        self.assertEqual(('volume',), EVENTS['SV_VOLUME_CREATED'])
 
1491
        self.assertEqual(('volume_id',), EVENTS['SV_VOLUME_DELETED'])
 
1492
        self.assertEqual(('volume_id', 'node_id', 'marker'),
 
1493
                          EVENTS['AQ_CREATE_UDF_OK'])
 
1494
        self.assertEqual(('error', 'marker'), EVENTS['AQ_CREATE_UDF_ERROR'])
 
1495
        self.assertEqual(('volumes',), EVENTS['AQ_LIST_VOLUMES'])
 
1496
        self.assertEqual(('error',), EVENTS['AQ_LIST_VOLUMES_ERROR'])
 
1497
        self.assertEqual(('volume_id',), EVENTS['AQ_DELETE_VOLUME_OK'])
 
1498
        self.assertEqual(('volume_id', 'error',),
 
1499
                         EVENTS['AQ_DELETE_VOLUME_ERROR'])
 
1500
        self.assertEqual(('volume_id', 'generation',),
 
1501
                         EVENTS['SV_VOLUME_NEW_GENERATION'])
 
1502
 
 
1503
    def test_create_udf(self):
 
1504
        """Test volume creation."""
 
1505
        path = PATH
 
1506
        name = NAME
 
1507
        self.action_queue.create_udf(path, name, marker=None)
 
1508
 
 
1509
    def test_list_volumes(self):
 
1510
        """Test volume listing."""
 
1511
        self.action_queue.list_volumes()
 
1512
 
 
1513
    def test_delete_volume(self):
 
1514
        """Test volume deletion."""
 
1515
        volume_id = VOLUME
 
1516
        self.action_queue.delete_volume(volume_id, 'path')
 
1517
 
 
1518
 
 
1519
class ActionQueueCommandTestCase(ConnectedBaseTestCase):
 
1520
    """Test for the generic functionality of ActionQueueCommand."""
 
1521
 
 
1522
    @defer.inlineCallbacks
 
1523
    def setUp(self):
 
1524
        """Set up."""
 
1525
        yield super(ActionQueueCommandTestCase, self).setUp()
 
1526
 
 
1527
        class MyCommand(ActionQueueCommand):
 
1528
            logged_attrs = ('a', 'b', 'c', 'd')
 
1529
            a = 3
 
1530
            b = 'foo'
 
1531
            c = u'año'
 
1532
 
 
1533
            def _run(self):
 
1534
                return defer.succeed(True)
 
1535
 
 
1536
            @property
 
1537
            def uniqueness(self):
 
1538
                return (self.a, self.b, self.c)
 
1539
 
 
1540
        self.rq = RequestQueue(action_queue=self.action_queue)
 
1541
        self.rq.active = True
 
1542
        self.cmd = MyCommand(self.rq)
 
1543
        self.cmd.make_logger()
 
1544
 
 
1545
    def test_runnable(self):
 
1546
        """All commands are runnable by default."""
 
1547
        self.assertTrue(self.cmd.is_runnable)
 
1548
 
 
1549
    def test_cancelled(self):
 
1550
        """All commands are not cancelled by default."""
 
1551
        self.assertFalse(self.cmd.cancelled)
 
1552
 
 
1553
    def test_dump_to_dict(self):
 
1554
        """Test to dict dumping."""
 
1555
        d = self.cmd.to_dict()
 
1556
        self.assertEqual(d, dict(a=3, b='foo', c=u'año', d=None))
 
1557
 
 
1558
    @defer.inlineCallbacks
 
1559
    def test_demark_not_marker(self):
 
1560
        """Test demark with not a marker."""
 
1561
        self.cmd.possible_markers = 'foo',
 
1562
        self.cmd.foo = 'not a marker'
 
1563
        yield self.cmd.demark()
 
1564
        self.assertEqual(self.cmd.foo, 'not a marker')
 
1565
 
 
1566
    @defer.inlineCallbacks
 
1567
    def test_demark_with_marker_future(self):
 
1568
        """Test demark with a marker not ready.
 
1569
 
 
1570
        Here, on purpose, set up everything and trigger later.
 
1571
        """
 
1572
        marker = MDMarker('foo')
 
1573
        self.cmd.possible_markers = 'foo',
 
1574
        self.cmd.foo = marker
 
1575
        d = self.cmd.demark()
 
1576
 
 
1577
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1578
        yield d
 
1579
        self.assertEqual(self.cmd.foo, 'node_id')
 
1580
        self.assertTrue(self.handler.check_debug(
 
1581
                        "waiting for the real value of marker:foo"))
 
1582
 
 
1583
    @defer.inlineCallbacks
 
1584
    def test_demark_with_marker_ready(self):
 
1585
        """Test demark with a marker that had data."""
 
1586
        marker = MDMarker('foo')
 
1587
        self.cmd.possible_markers = 'foo',
 
1588
        self.cmd.foo = marker
 
1589
        d = self.cmd.demark()
 
1590
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1591
        yield d
 
1592
        self.assertEqual(self.cmd.foo, 'node_id')
 
1593
        self.assertTrue(self.handler.check_debug(
 
1594
                        "waiting for the real value of marker:foo"))
 
1595
 
 
1596
    @defer.inlineCallbacks
 
1597
    def test_demark_mixed_markers(self):
 
1598
        """Test demark with both a marker and not."""
 
1599
        # call demark with both
 
1600
        marker = MDMarker('foo')
 
1601
        self.cmd.possible_markers = 'foo', 'bar'
 
1602
        self.cmd.foo = 'notamarker'
 
1603
        self.cmd.bar = marker
 
1604
        d = self.cmd.demark()
 
1605
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1606
        yield d
 
1607
 
 
1608
        # check
 
1609
        self.assertEqual(self.cmd.foo, 'notamarker')
 
1610
        self.assertEqual(self.cmd.bar, 'node_id')
 
1611
        self.assertTrue(self.handler.check_debug(
 
1612
                        "waiting for the real value of marker:foo"))
 
1613
        self.assertFalse(self.handler.check_debug(
 
1614
                         "waiting for the real value of 'notamarker'"))
 
1615
 
 
1616
    @defer.inlineCallbacks
 
1617
    def test_demark_marker_future_got_ok(self):
 
1618
        """Test demark getting a marker triggered ok later."""
 
1619
        # don't have the info now
 
1620
        marker = MDMarker('foo')
 
1621
        self.cmd.possible_markers = 'foo',
 
1622
        self.cmd.foo = marker
 
1623
        d = self.cmd.demark()
 
1624
        self.assertFalse(self.handler.check_debug("for marker:foo"))
 
1625
 
 
1626
        # set and check
 
1627
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1628
        yield d
 
1629
        self.assertEqual(self.cmd.foo, 'node_id')
 
1630
        self.assertTrue(self.handler.check_debug(
 
1631
                        "for marker:foo got value 'node_id'"))
 
1632
 
 
1633
    @defer.inlineCallbacks
 
1634
    def test_demark_marker_future_got_failure(self):
 
1635
        """Test demark getting a marker triggered with failure later."""
 
1636
        # don't have the info now
 
1637
        marker = MDMarker('foo')
 
1638
        self.cmd.possible_markers = 'foo',
 
1639
        self.cmd.foo = marker
 
1640
        d = self.cmd.demark()
 
1641
        self.assertFalse(self.handler.check_error("failed marker:foo"))
 
1642
 
 
1643
        # set the marker and check
 
1644
        self.action_queue.uuid_map.err(marker, Failure(Exception('bad')))
 
1645
        yield d
 
1646
        self.assertTrue(self.handler.check_error("failed marker:foo"))
 
1647
        try:
 
1648
            yield self.cmd.markers_resolved_deferred
 
1649
        except Exception, e:
 
1650
            self.assertEqual(str(e), 'bad')
 
1651
        else:
 
1652
            self.fail("An exception should have been raised!")
 
1653
 
 
1654
    @defer.inlineCallbacks
 
1655
    def test_demark_two_markers_ok(self):
 
1656
        """Test demark with two markers that finish ok."""
 
1657
        # call demark with both
 
1658
        marker1 = MDMarker('foo')
 
1659
        marker2 = MDMarker('bar')
 
1660
        self.cmd.possible_markers = 'foo', 'bar'
 
1661
        self.cmd.foo = marker1
 
1662
        self.cmd.bar = marker2
 
1663
        d = self.cmd.demark()
 
1664
        self.action_queue.uuid_map.set(marker1, 'data1')
 
1665
        self.action_queue.uuid_map.set(marker2, 'data2')
 
1666
        yield d
 
1667
 
 
1668
        # check
 
1669
        self.assertEqual(self.cmd.foo, 'data1')
 
1670
        self.assertEqual(self.cmd.bar, 'data2')
 
1671
        yield self.cmd.markers_resolved_deferred
 
1672
 
 
1673
    @defer.inlineCallbacks
 
1674
    def test_demark_two_markers_one_fail(self):
 
1675
        """Test demark with two markers that one ends in failure."""
 
1676
        # call demark with both
 
1677
        marker1 = MDMarker('foo')
 
1678
        marker2 = MDMarker('bar')
 
1679
        self.cmd.possible_markers = 'foo', 'bar'
 
1680
        self.cmd.foo = marker1
 
1681
        self.cmd.bar = marker2
 
1682
        d = self.cmd.demark()
 
1683
        self.action_queue.uuid_map.set(marker1, 'data ok')
 
1684
        self.action_queue.uuid_map.err(marker2, Failure(Exception('data bad')))
 
1685
        yield d
 
1686
 
 
1687
        # check
 
1688
        try:
 
1689
            yield self.cmd.markers_resolved_deferred
 
1690
        except Exception, e:
 
1691
            self.assertEqual(str(e), 'data bad')
 
1692
        else:
 
1693
            self.fail("An exception should have been raised!")
 
1694
 
 
1695
    @defer.inlineCallbacks
 
1696
    def test_demark_fixes_hashedwaiting_active(self):
 
1697
        """The attribute changes: it also need to change the hashed_waiting."""
 
1698
        marker = MDMarker('b')
 
1699
        self.cmd.possible_markers = 'b',
 
1700
        self.cmd.b = marker
 
1701
        queue = self.cmd._queue
 
1702
        queue.hashed_waiting[self.cmd.uniqueness] = self.cmd
 
1703
        d = self.cmd.demark()
 
1704
 
 
1705
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1706
        yield d
 
1707
        self.assertTrue(queue.hashed_waiting[self.cmd.uniqueness], self.cmd)
 
1708
 
 
1709
    @defer.inlineCallbacks
 
1710
    def test_demark_fixes_hashedwaiting_cancelled(self):
 
1711
        """The attribute changes: no change because cancelled."""
 
1712
        marker = MDMarker('b')
 
1713
        self.cmd.possible_markers = 'b',
 
1714
        self.cmd.b = marker
 
1715
        self.cmd.cancelled = True
 
1716
        queue = self.cmd._queue
 
1717
        queue.hashed_waiting[self.cmd.uniqueness] = self.cmd
 
1718
        d = self.cmd.demark()
 
1719
 
 
1720
        self.action_queue.uuid_map.set(marker, 'node_id')
 
1721
        yield d
 
1722
        self.assertFalse(self.cmd.uniqueness in queue.hashed_waiting)
 
1723
 
 
1724
    def test_go_makes_logger_and_demarks(self):
 
1725
        """Make the logger and demark."""
 
1726
        called = []
 
1727
        self.cmd.make_logger = lambda: called.append(1)
 
1728
        self.cmd.demark = lambda: called.append(2)
 
1729
 
 
1730
        self.cmd.go()
 
1731
        self.assertEqual(called, [1, 2])
 
1732
 
 
1733
    def test_go_stop_if_shouldnt_execute_command(self):
 
1734
        """Don't queue and don't acquire the pathlock."""
 
1735
        called = []
 
1736
        self.cmd._acquire_pathlock = lambda: called.append(True)
 
1737
        self.rq.queue = lambda c: called.append(True)
 
1738
        self.cmd._should_be_queued = lambda: False
 
1739
 
 
1740
        self.cmd.go()
 
1741
        self.assertFalse(called)
 
1742
 
 
1743
    def test_go_queue_pathlock_run(self):
 
1744
        """Queue the command, acquire the pathlock and run."""
 
1745
        called = []
 
1746
        self.rq.queue = lambda c: called.append(1)
 
1747
        self.cmd._acquire_pathlock = lambda: called.append(2)
 
1748
        self.cmd.run = lambda: called.append(3)
 
1749
 
 
1750
        self.cmd.go()
 
1751
        self.assertEqual(called, [1, 2, 3])
 
1752
 
 
1753
    def test_go_stop_cancels_while_pathlocking(self):
 
1754
        """If the command is cancelled while locked, stop."""
 
1755
        called = []
 
1756
        self.cmd.run = lambda: called.append(True)
 
1757
        d = defer.Deferred()
 
1758
        self.cmd._acquire_pathlock = lambda: d
 
1759
 
 
1760
        self.cmd.go()
 
1761
        self.cmd.cancel()
 
1762
        d.callback(None)
 
1763
 
 
1764
        self.assertFalse(called)
 
1765
 
 
1766
    def test_go_release_cancelled_while_pathlocking(self):
 
1767
        """If the command is cancelled while locked, release the pathlock."""
 
1768
        called = []
 
1769
        self.cmd.run = lambda: called.append(1)
 
1770
        d = defer.Deferred()
 
1771
        self.cmd._acquire_pathlock = lambda: d
 
1772
 
 
1773
        self.cmd.go()
 
1774
        self.cmd.cancel()
 
1775
        d.callback(lambda: called.append(2))
 
1776
 
 
1777
        self.assertEqual(called, [2])
 
1778
        self.assertTrue(self.handler.check_debug(
 
1779
                        'releasing the pathlock because of cancelled'))
 
1780
 
 
1781
    def test_go_run_ok_release_pathlock(self):
 
1782
        """If run went ok, release the pathlock."""
 
1783
        called = []
 
1784
        self.cmd.run = lambda: defer.succeed(True)
 
1785
        self.cmd._acquire_pathlock = lambda: defer.succeed(
 
1786
                                                lambda: called.append(True))
 
1787
 
 
1788
        self.cmd.go()
 
1789
        self.assertTrue(called)
 
1790
 
 
1791
    @defer.inlineCallbacks
 
1792
    def test_go_run_bad_release_pathlock(self):
 
1793
        """If run went bad, release the pathlock."""
 
1794
        called = []
 
1795
        self.cmd.run = lambda: defer.fail(ValueError("error message"))
 
1796
        self.cmd._acquire_pathlock = lambda: defer.succeed(
 
1797
                                                lambda: called.append(True))
 
1798
 
 
1799
        yield self.cmd.go()
 
1800
        self.assertTrue(called)
 
1801
 
 
1802
        # check exception to assure a traceback was logged, and check the
 
1803
        # messages in ERROR (the real logging level); finally, clean the
 
1804
        # records as if we leave them with the exception the test will fail
 
1805
        self.assertTrue(self.handler.check_exception(ValueError))
 
1806
        self.assertTrue(self.handler.check_error("Error running the command",
 
1807
                                                 "error message"))
 
1808
        self.handler.records = []
 
1809
 
 
1810
    def test_run_initial(self):
 
1811
        """Call ._start, log, and set running."""
 
1812
        called = []
 
1813
        d = defer.Deferred()
 
1814
        self.cmd._start = lambda: called.append(True) or d
 
1815
 
 
1816
        # run, and will lock in the _start
 
1817
        self.cmd.run()
 
1818
        self.assertTrue(called)
 
1819
        self.assertTrue(self.handler.check_debug('starting'))
 
1820
 
 
1821
        # release the _start, check log and that it still not running
 
1822
        d.callback(True)
 
1823
        self.assertTrue(self.handler.check_debug('started'))
 
1824
        self.assertFalse(self.cmd.running)
 
1825
 
 
1826
    def test_run_stop_if_cancelled_while_start(self):
 
1827
        """Cancelled while _start."""
 
1828
        self.rq.queue(self.cmd)
 
1829
        assert self.rq.active
 
1830
        assert self.cmd.is_runnable
 
1831
        self.cmd.markers_resolved_deferred.callback(True)
 
1832
 
 
1833
        called = []
 
1834
        self.cmd._run = lambda: called.append(True)
 
1835
        d = defer.Deferred()
 
1836
        self.cmd._start = lambda: d
 
1837
 
 
1838
        # run, cancel, and unlock start
 
1839
        self.cmd.run()
 
1840
        self.cmd.cancel()
 
1841
        d.callback(True)
 
1842
 
 
1843
        self.assertFalse(called)
 
1844
        self.assertTrue(self.handler.check_debug(
 
1845
                        'cancelled before trying to run'))
 
1846
 
 
1847
    def test_run_queue_not_active(self):
 
1848
        """Waiting cycle for queue not active."""
 
1849
        self.rq.queue(self.cmd)
 
1850
        assert self.cmd.is_runnable
 
1851
        self.cmd.markers_resolved_deferred.callback(True)
 
1852
 
 
1853
        self.rq.active = False
 
1854
        called = []
 
1855
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
 
1856
 
 
1857
        # run first time
 
1858
        self.cmd.run()
 
1859
        self.assertFalse(called)
 
1860
        self.assertTrue(self.handler.check_debug(
 
1861
                        'not running because of inactive queue'))
 
1862
        self.assertFalse(self.handler.check_debug('unblocked: queue active'))
 
1863
 
 
1864
        # active the queue
 
1865
        self.rq.run()
 
1866
        self.assertTrue(called)
 
1867
        self.assertTrue(self.handler.check_debug('unblocked: queue active'))
 
1868
 
 
1869
    def test_run_command_not_runnable(self):
 
1870
        """Waiting cycle for command not runnable."""
 
1871
        self.rq.queue(self.cmd)
 
1872
        assert self.rq.active
 
1873
        self.cmd.markers_resolved_deferred.callback(True)
 
1874
 
 
1875
        self.cmd.is_runnable = False
 
1876
        called = []
 
1877
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
 
1878
 
 
1879
        # run first time
 
1880
        self.cmd.run()
 
1881
        self.assertFalse(called)
 
1882
        self.assertTrue(self.handler.check_debug(
 
1883
                        'not running because of conditions'))
 
1884
        self.assertFalse(self.handler.check_debug('unblocked: conditions ok'))
 
1885
 
 
1886
        # active the command
 
1887
        self.cmd.is_runnable = True
 
1888
        self.action_queue.conditions_locker.check_conditions()
 
1889
        self.assertTrue(called)
 
1890
        self.assertTrue(self.handler.check_debug('unblocked: conditions ok'))
 
1891
 
 
1892
    def test_run_notrunnable_inactivequeue(self):
 
1893
        """Mixed behaviour between both stoppers."""
 
1894
        self.rq.queue(self.cmd)
 
1895
        self.cmd.markers_resolved_deferred.callback(True)
 
1896
        assert self.cmd.is_runnable
 
1897
        self.rq.active = False
 
1898
        called = []
 
1899
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
 
1900
 
 
1901
        # run first time
 
1902
        self.cmd.run()
 
1903
        self.assertFalse(called)
 
1904
 
 
1905
        # active the queue but inactive the command
 
1906
        self.cmd.is_runnable = False
 
1907
        self.rq.run()
 
1908
        self.assertFalse(called)
 
1909
 
 
1910
        # active the command but inactive the queue again!
 
1911
        self.rq.stop()
 
1912
        self.cmd.is_runnable = True
 
1913
        self.action_queue.conditions_locker.check_conditions()
 
1914
        self.assertFalse(called)
 
1915
 
 
1916
        # finally resume the queue
 
1917
        self.rq.run()
 
1918
        self.assertTrue(called)
 
1919
 
 
1920
    def test_run_inactivequeue_cancel(self):
 
1921
        """Got cancelled while waiting the queue to resume."""
 
1922
        self.rq.queue(self.cmd)
 
1923
        assert self.cmd.is_runnable
 
1924
        self.cmd.markers_resolved_deferred.callback(True)
 
1925
 
 
1926
        self.rq.active = False
 
1927
        called = []
 
1928
        self.cmd._run = lambda: called.append(True)
 
1929
 
 
1930
        # run and cancel
 
1931
        self.cmd.run()
 
1932
        self.cmd.cancel()
 
1933
 
 
1934
        # active the queue
 
1935
        self.rq.run()
 
1936
        self.assertFalse(called)
 
1937
        self.assertTrue(self.handler.check_debug(
 
1938
                        'cancelled before trying to run'))
 
1939
 
 
1940
    def test_run_notrunnable_cancel(self):
 
1941
        """Got cancelled while waiting the conditions to run."""
 
1942
        self.rq.queue(self.cmd)
 
1943
        assert self.rq.active
 
1944
        self.cmd.markers_resolved_deferred.callback(True)
 
1945
 
 
1946
        self.cmd.is_runnable = False
 
1947
        called = []
 
1948
        self.cmd._run = lambda: called.append(True)
 
1949
 
 
1950
        # run and cancel
 
1951
        self.cmd.run()
 
1952
        self.cmd.cancel()
 
1953
 
 
1954
        # active the command
 
1955
        self.cmd.is_runnable = True
 
1956
        self.action_queue.conditions_locker.check_conditions()
 
1957
        self.assertFalse(called)
 
1958
        self.handler.debug = True
 
1959
        self.assertTrue(self.handler.check_debug(
 
1960
                        'cancelled before trying to run'))
 
1961
 
 
1962
    def test_run_waits_markers_dereferencing(self):
 
1963
        """Don't call _run_command until have the markers."""
 
1964
        self.rq.queue(self.cmd)
 
1965
        assert self.cmd.is_runnable
 
1966
        assert self.rq.active
 
1967
 
 
1968
        called = []
 
1969
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
 
1970
 
 
1971
        # run first time
 
1972
        self.cmd.run()
 
1973
        self.assertFalse(called)
 
1974
 
 
1975
        # resolve the markers
 
1976
        self.cmd.markers_resolved_deferred.callback(True)
 
1977
        self.assertTrue(called)
 
1978
        self.assertTrue(self.cmd.running)
 
1979
 
 
1980
    def test_run_endok_calls_finishing_stuff_not_cancelled(self):
 
1981
        """Call finish on end ok."""
 
1982
        self.rq.queue(self.cmd)
 
1983
        called = []
 
1984
        self.cmd.finish = lambda: called.append(2)
 
1985
        self.cmd.handle_success = lambda a: called.append(a)
 
1986
        self.cmd._run = lambda: defer.succeed(1)
 
1987
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
1988
        self.cmd.run()
 
1989
 
 
1990
        # check that handle_success was called *before* finish
 
1991
        self.assertEqual(called, [1, 2])
 
1992
        self.assertTrue(self.handler.check_debug('success'))
 
1993
 
 
1994
    def test_run_endok_calls_finishing_stuff_cancelled(self):
 
1995
        """Call finish on end ok, cancelled while running."""
 
1996
        self.rq.queue(self.cmd)
 
1997
        called = []
 
1998
        self.cmd.handle_success = lambda a: called.append(a)
 
1999
        d = defer.Deferred()
 
2000
        self.cmd._run = lambda: d
 
2001
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
2002
        self.cmd.run()
 
2003
 
 
2004
        # cancel and let _run finish
 
2005
        self.cmd.cancel()
 
2006
        d.callback(True)
 
2007
        self.assertFalse(called)
 
2008
        self.assertTrue(self.handler.check_debug('cancelled while running'))
 
2009
 
 
2010
    def test_run_enderr_calls_finish(self):
 
2011
        """Call finish on end_errback."""
 
2012
        called = []
 
2013
        self.cmd.finish = lambda: called.append(1)
 
2014
        self.cmd.handle_failure = lambda f: called.append(f.value)
 
2015
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
2016
        self.cmd.suppressed_error_messages.append(ValueError)
 
2017
        exc = ValueError()
 
2018
        self.cmd._run = lambda: defer.fail(exc)
 
2019
        self.cmd.run()
 
2020
 
 
2021
        # check that handle_failure was called *before* finish
 
2022
        self.assertEqual(called, [exc, 1])
 
2023
 
 
2024
    def test_run_enderr_retry(self):
 
2025
        """Command retried, call the handle and retry."""
 
2026
        called = []
 
2027
        self.cmd.finish = lambda: called.append('should not')
 
2028
        self.cmd.handle_failure = lambda: called.append('should not')
 
2029
        self.cmd.handle_retryable = lambda f: called.append('ok')
 
2030
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
2031
        assert self.rq.active
 
2032
 
 
2033
        def fake_run():
 
2034
            """Set the queue inactive to avoid retry loop and fail."""
 
2035
            self.rq.active = False
 
2036
            raise twisted_error.ConnectionDone()
 
2037
 
 
2038
        # set up and test
 
2039
        self.cmd._run = fake_run
 
2040
 
 
2041
        # run and check finish was not called
 
2042
        self.cmd.run()
 
2043
        self.assertEqual(called, ['ok'])
 
2044
 
 
2045
    def test_run_retry_on_commandpaused(self):
 
2046
        """Command retried because of pausing."""
 
2047
        called = []
 
2048
        self.cmd.finish = lambda: called.append(True)
 
2049
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
2050
        self.rq.waiting.append(self.cmd)
 
2051
        assert self.rq.active
 
2052
 
 
2053
        # deferreds, first one stucks, the second allows to continue
 
2054
        deferreds = [defer.Deferred(), defer.succeed(True)]
 
2055
        self.cmd._run = lambda: deferreds.pop(0)
 
2056
 
 
2057
        # run and check finish was not called
 
2058
        self.cmd.run()
 
2059
        self.assertFalse(called)
 
2060
 
 
2061
        # pause, still nothing called
 
2062
        self.rq.stop()
 
2063
        self.assertFalse(called)
 
2064
 
 
2065
        # resume, now it finished!
 
2066
        self.rq.run()
 
2067
        self.assertTrue(called)
 
2068
 
 
2069
    @defer.inlineCallbacks
 
2070
    def test_start_default(self):
 
2071
        """Default _start just returns a triggered deferred and sets done."""
 
2072
        yield self.cmd._start()
 
2073
 
 
2074
    def test_possible_markers_default(self):
 
2075
        """Default value for possible markers."""
 
2076
        self.assertEqual(self.cmd.possible_markers, ())
 
2077
 
 
2078
    @defer.inlineCallbacks
 
2079
    def test_path_locking(self):
 
2080
        """Test it has a generic _acquire_pathlock."""
 
2081
        r = yield self.cmd._acquire_pathlock()
 
2082
        self.assertIdentical(r, None)
 
2083
 
 
2084
    def test_finish_running(self):
 
2085
        """Set running to False when finish."""
 
2086
        self.cmd.running = True
 
2087
        self.rq.unqueue = lambda c: None # don't do anything
 
2088
        self.cmd.finish()
 
2089
        self.assertFalse(self.cmd.running)
 
2090
 
 
2091
    def test_finish_unqueue(self):
 
2092
        """Unqueue the command when finish."""
 
2093
        called = []
 
2094
        self.rq.unqueue = lambda c: called.append(c)
 
2095
        self.cmd.finish()
 
2096
        self.assertEqual(called, [self.cmd])
 
2097
        self.assertFalse(self.cmd.running)
 
2098
 
 
2099
    @defer.inlineCallbacks
 
2100
    def test_pause_running(self):
 
2101
        """Pause while running."""
 
2102
        self.cmd.running_deferred = InterruptibleDeferred(defer.Deferred())
 
2103
        called = []
 
2104
        self.cmd.cleanup = lambda: called.append(True)
 
2105
 
 
2106
        self.cmd.pause()
 
2107
        self.assertTrue(self.handler.check_debug("pausing"))
 
2108
        self.assertTrue(called)
 
2109
 
 
2110
        try:
 
2111
            yield self.cmd.running_deferred
 
2112
        except DeferredInterrupted:
 
2113
            pass   # this is handled by run() to retry
 
2114
        else:
 
2115
            self.fail("Test should have raised an exception")
 
2116
 
 
2117
    def test_pause_norunning(self):
 
2118
        """Pause while not running."""
 
2119
        assert self.cmd.running_deferred is None
 
2120
        called = []
 
2121
        self.cmd.cleanup = lambda: called.append(True)
 
2122
 
 
2123
        self.cmd.pause()
 
2124
        self.assertTrue(self.handler.check_debug("pausing"))
 
2125
        self.assertTrue(called)
 
2126
 
 
2127
    def test_cancel_works(self):
 
2128
        """Do default cleaning."""
 
2129
        called = []
 
2130
        self.cmd.cleanup = lambda: called.append(1)
 
2131
        self.cmd.finish = lambda: called.append(2)
 
2132
        assert not self.cmd.cancelled
 
2133
        did_cancel = self.cmd.cancel()
 
2134
        self.assertTrue(did_cancel)
 
2135
        self.assertEqual(called, [1, 2])
 
2136
        self.assertTrue(self.cmd.cancelled)
 
2137
        self.assertTrue(self.handler.check_debug('cancelled'))
 
2138
 
 
2139
    def test_cancel_releases_conditions(self):
 
2140
        """Cancel calls the conditions locker for the command."""
 
2141
        self.cmd.finish = lambda: None # don't try to unqueue!
 
2142
        d = self.action_queue.conditions_locker.get_lock(self.cmd)
 
2143
        self.cmd.cancel()
 
2144
        self.assertTrue(d.called)
 
2145
 
 
2146
    def test_cancel_cancelled(self):
 
2147
        """Don't do anything if command already cancelled."""
 
2148
        called = []
 
2149
        self.cmd.cleanup = lambda: called.append(True)
 
2150
        self.cmd.finish = lambda: called.append(True)
 
2151
        self.cmd.cancelled = True
 
2152
        did_cancel = self.cmd.cancel()
 
2153
        self.assertFalse(did_cancel)
 
2154
        self.assertFalse(called)
 
2155
        self.assertTrue(self.cmd.cancelled)
 
2156
 
 
2157
    def test_slots(self):
 
2158
        """Inherited commands must have __slot__ (that is not inherited)."""
 
2159
        for obj_name in dir(action_queue):
 
2160
            obj = getattr(action_queue, obj_name)
 
2161
            if isinstance(obj, type) and issubclass(obj, ActionQueueCommand) \
 
2162
               and obj is not ActionQueueCommand:
 
2163
                self.assertNotIdentical(obj.__slots__,
 
2164
                                        ActionQueueCommand.__slots__,
 
2165
                                        "class %s has no __slots__" % obj)
 
2166
 
 
2167
 
 
2168
class CreateUDFTestCase(ConnectedBaseTestCase):
 
2169
    """Test for CreateUDF ActionQueueCommand."""
 
2170
 
 
2171
    @defer.inlineCallbacks
 
2172
    def setUp(self):
 
2173
        """Init."""
 
2174
        yield super(CreateUDFTestCase, self).setUp()
 
2175
 
 
2176
        request_queue = RequestQueue(action_queue=self.action_queue)
 
2177
        self.marker = VOLUME
 
2178
        self.command = CreateUDF(request_queue, PATH, NAME, marker=self.marker)
 
2179
 
 
2180
        # silence the event to avoid propagation
 
2181
        listener_map = self.action_queue.event_queue.listener_map
 
2182
        del listener_map['AQ_CREATE_UDF_OK']
 
2183
        del listener_map['AQ_CREATE_UDF_ERROR']
 
2184
 
 
2185
    def test_is_action_queue_command(self):
 
2186
        """Test proper inheritance."""
 
2187
        self.assertTrue(isinstance(self.command, ActionQueueCommand))
 
2188
 
 
2189
    def test_init(self):
 
2190
        """Test creation."""
 
2191
        self.assertEqual(PATH, self.command.path)
 
2192
        self.assertEqual(NAME, self.command.name)
 
2193
        self.assertEqual(self.marker, self.command.marker)
 
2194
 
 
2195
    def test_run_returns_a_deferred(self):
 
2196
        """Test a deferred is returned."""
 
2197
        res = self.command._run()
 
2198
        self.assertIsInstance(res, defer.Deferred)
 
2199
        res.addErrback(self.silent_connection_lost)
 
2200
 
 
2201
    def test_run_calls_protocol(self):
 
2202
        """Test protocol's create_udf is called."""
 
2203
        original = self.command.action_queue.client.create_udf
 
2204
        self.called = False
 
2205
 
 
2206
        def check(path, name):
 
2207
            """Take control over client's feature."""
 
2208
            self.called = True
 
2209
            self.assertEqual(PATH, path)
 
2210
            self.assertEqual(NAME, name)
 
2211
 
 
2212
        self.command.action_queue.client.create_udf = check
 
2213
 
 
2214
        self.command._run()
 
2215
 
 
2216
        self.assertTrue(self.called, 'command was called')
 
2217
 
 
2218
        self.command.action_queue.client.create_udf = original
 
2219
 
 
2220
    def test_handle_success_push_event(self):
 
2221
        """Test AQ_CREATE_UDF_OK is pushed on success."""
 
2222
        request = client.CreateUDF(self.action_queue.client, PATH, NAME)
 
2223
        request.volume_id = VOLUME
 
2224
        request.node_id = NODE
 
2225
        self.command.handle_success(success=request)
 
2226
        events = [('AQ_CREATE_UDF_OK', {'volume_id': VOLUME,
 
2227
                                        'node_id': NODE,
 
2228
                                        'marker': self.marker})]
 
2229
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
2230
 
 
2231
    def test_handle_failure_push_event(self):
 
2232
        """Test AQ_CREATE_UDF_ERROR is pushed on failure."""
 
2233
        msg = 'Something went wrong'
 
2234
        failure = Failure(DefaultException(msg))
 
2235
        self.command.handle_failure(failure=failure)
 
2236
        events = [('AQ_CREATE_UDF_ERROR',
 
2237
                    {'error': msg, 'marker': self.marker})]
 
2238
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
2239
 
 
2240
    def test_path_locking(self):
 
2241
        """Test that it acquires correctly the path lock."""
 
2242
        t = []
 
2243
        self.patch(PathLockingTree, 'acquire',
 
2244
                   lambda s, *a, **k: t.extend((a, k)))
 
2245
        self.command._acquire_pathlock()
 
2246
        self.assertEqual(t, [tuple(PATH.split(os.path.sep)), {'logger': None}])
 
2247
 
 
2248
 
 
2249
class ActionQueueCommandErrorsTestCase(ConnectedBaseTestCase):
 
2250
    """Test the error handling in ActionQueueCommand."""
 
2251
 
 
2252
    @defer.inlineCallbacks
 
2253
    def setUp(self):
 
2254
        yield super(ActionQueueCommandErrorsTestCase, self).setUp()
 
2255
 
 
2256
        self.deferred = defer.Deferred()
 
2257
 
 
2258
        class MyLogger(object):
 
2259
            """Fake logger that just stores error and warning calls."""
 
2260
            def __init__(self):
 
2261
                self.logged = None
 
2262
 
 
2263
            def exception(self, *a):
 
2264
                """Mark that this method was called."""
 
2265
                self.logged = "exception"
 
2266
 
 
2267
            def warn(self, *a):
 
2268
                """Mark that this method was called."""
 
2269
                self.logged = "warn"
 
2270
 
 
2271
            def debug(self, *a):
 
2272
                """Nothing."""
 
2273
 
 
2274
        class MyCommand(ActionQueueCommand):
 
2275
            """Patchable."""
 
2276
 
 
2277
        self.rq = RequestQueue(action_queue=self.action_queue)
 
2278
        self.rq.unqueue = lambda c: None
 
2279
        self.rq.active = True
 
2280
        self.command = MyCommand(self.rq)
 
2281
        self.command.markers_resolved_deferred = defer.succeed(True)
 
2282
        self.command.log = MyLogger()
 
2283
 
 
2284
    def test_suppressed_yes_knownerrors(self):
 
2285
        """Check that the log is in warning for the known errors."""
 
2286
        def send_failure_and_check(errnum, exception_class):
 
2287
            """Send the failure."""
 
2288
            # prepare what to send
 
2289
            protocol_msg = protocol_pb2.Message()
 
2290
            protocol_msg.type = protocol_pb2.Message.ERROR
 
2291
            protocol_msg.error.type = errnum
 
2292
            err = exception_class("request", protocol_msg)
 
2293
 
 
2294
            def fake_run():
 
2295
                """Set the queue inactive to avoid retry loops and fail."""
 
2296
                self.rq.active = False
 
2297
                raise err
 
2298
 
 
2299
            # set up and test
 
2300
            self.command.log.logged = None
 
2301
            self.command._run = fake_run
 
2302
            self.command.run()
 
2303
            self.assertEqual(self.command.log.logged, "warn",
 
2304
                             "Bad log in exception %s" % (exception_class,))
 
2305
 
 
2306
        known_errors = [x for x in errors._error_mapping.items()
 
2307
                        if x[1] != errors.InternalError]
 
2308
        for errnum, exception_class in known_errors:
 
2309
            self.rq.active = True
 
2310
            send_failure_and_check(errnum, exception_class)
 
2311
 
 
2312
    def test_suppressed_no_internalerror(self):
 
2313
        """Check that the log is in error for InternalError."""
 
2314
        # prepare what to send
 
2315
        protocol_msg = protocol_pb2.Message()
 
2316
        protocol_msg.type = protocol_pb2.Message.ERROR
 
2317
        protocol_msg.error.type = protocol_pb2.Error.INTERNAL_ERROR
 
2318
        err = errors.InternalError("request", protocol_msg)
 
2319
 
 
2320
        self.command._run = lambda: defer.fail(err)
 
2321
        self.command.run()
 
2322
        self.assertEqual(self.command.log.logged, "exception")
 
2323
 
 
2324
    def test_suppressed_yes_cancelled(self):
 
2325
        """Check that the log is in warning for Cancelled."""
 
2326
        err = errors.RequestCancelledError("CANCELLED")
 
2327
        self.command._run = lambda: defer.fail(err)
 
2328
        self.command.run()
 
2329
        self.assertEqual(self.command.log.logged, "warn")
 
2330
 
 
2331
    def test_suppressed_yes_and_retry_when_connectiondone(self):
 
2332
        """Check that the log is in warning and retries for ConnectionDone."""
 
2333
        self.handle_success = self.deferred.callback(True)
 
2334
        err = twisted_error.ConnectionDone()
 
2335
        runs = [defer.fail(err), defer.succeed(True)]
 
2336
        self.command._run = lambda: runs.pop(0)
 
2337
        self.command.run()
 
2338
        self.assertEqual(self.command.log.logged, "warn")
 
2339
        return self.deferred
 
2340
 
 
2341
    def test_retry_connectionlost(self):
 
2342
        """Check that it retries when ConnectionLost."""
 
2343
        self.handle_success = self.deferred.callback(True)
 
2344
        err = twisted_error.ConnectionLost()
 
2345
        runs = [defer.fail(err), defer.succeed(True)]
 
2346
        self.command._run = lambda: runs.pop(0)
 
2347
        self.command.run()
 
2348
        return self.deferred
 
2349
 
 
2350
    def test_retry_tryagain(self):
 
2351
        """Check that it retries when TryAgain."""
 
2352
        self.handle_success = self.deferred.callback(True)
 
2353
        protocol_msg = protocol_pb2.Message()
 
2354
        protocol_msg.type = protocol_pb2.Message.ERROR
 
2355
        protocol_msg.error.type = protocol_pb2.Error.TRY_AGAIN
 
2356
        err = errors.TryAgainError("request", protocol_msg)
 
2357
        runs = [defer.fail(err), defer.succeed(True)]
 
2358
        self.command._run = lambda: runs.pop(0)
 
2359
        self.command.run()
 
2360
        return self.deferred
 
2361
 
 
2362
 
 
2363
class ListSharesTestCase(ConnectedBaseTestCase):
 
2364
    """Test for ListShares ActionQueueCommand."""
 
2365
 
 
2366
    @defer.inlineCallbacks
 
2367
    def setUp(self):
 
2368
        """Set up."""
 
2369
        yield super(ListSharesTestCase, self).setUp()
 
2370
        self.rq = RequestQueue(action_queue=self.action_queue)
 
2371
 
 
2372
    def test_queued_mixed_types(self):
 
2373
        """Command gets queued if other command is waiting."""
 
2374
        cmd1 = FakeCommand()
 
2375
        self.rq.queue(cmd1)
 
2376
        cmd2 = ListShares(self.rq)
 
2377
        self.assertTrue(cmd2._should_be_queued())
 
2378
 
 
2379
    def test_queued_two(self):
 
2380
        """Two queued commands is not ok."""
 
2381
        cmd1 = ListShares(self.rq)
 
2382
        self.rq.queue(cmd1)
 
2383
        cmd2 = ListShares(self.rq)
 
2384
        self.assertFalse(cmd2._should_be_queued())
 
2385
 
 
2386
    def test_uniqueness(self):
 
2387
        """Info used for uniqueness."""
 
2388
        cmd = ListShares(self.rq)
 
2389
        self.assertEqual(cmd.uniqueness, 'ListShares')
 
2390
 
 
2391
 
 
2392
class ListVolumesTestCase(ConnectedBaseTestCase):
 
2393
    """Test for ListVolumes ActionQueueCommand."""
 
2394
 
 
2395
    @defer.inlineCallbacks
 
2396
    def setUp(self):
 
2397
        """Init."""
 
2398
        yield super(ListVolumesTestCase, self).setUp()
 
2399
        self.rq = RequestQueue(action_queue=self.action_queue)
 
2400
        self.command = ListVolumes(self.rq)
 
2401
 
 
2402
    def test_is_action_queue_command(self):
 
2403
        """Test proper inheritance."""
 
2404
        self.assertTrue(isinstance(self.command, ActionQueueCommand))
 
2405
 
 
2406
    def test_run_returns_a_deferred(self):
 
2407
        """Test a deferred is returned."""
 
2408
        res = self.command._run()
 
2409
        self.assertIsInstance(res, defer.Deferred)
 
2410
        res.addErrback(self.silent_connection_lost)
 
2411
 
 
2412
    def test_run_calls_protocol(self):
 
2413
        """Test protocol's list_volumes is called."""
 
2414
        original = self.command.action_queue.client.list_volumes
 
2415
        self.called = False
 
2416
 
 
2417
        def check():
 
2418
            """Take control over client's feature."""
 
2419
            self.called = True
 
2420
 
 
2421
        self.command.action_queue.client.list_volumes = check
 
2422
 
 
2423
        self.command._run()
 
2424
 
 
2425
        self.assertTrue(self.called, 'command was called')
 
2426
 
 
2427
        self.command.action_queue.client.list_volumes = original
 
2428
 
 
2429
    def test_handle_success_push_event(self):
 
2430
        """Test AQ_LIST_VOLUMES is pushed on success."""
 
2431
        request = client.ListVolumes(self.action_queue.client)
 
2432
        request.volumes = [FakedVolume(), FakedVolume()]
 
2433
        self.command.handle_success(success=request)
 
2434
        event = ('AQ_LIST_VOLUMES', {'volumes': request.volumes})
 
2435
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2436
 
 
2437
    def test_handle_failure_push_event(self):
 
2438
        """Test AQ_LIST_VOLUMES_ERROR is pushed on failure."""
 
2439
        msg = 'Something went wrong'
 
2440
        failure = Failure(DefaultException(msg))
 
2441
        self.command.handle_failure(failure=failure)
 
2442
        event = ('AQ_LIST_VOLUMES_ERROR', {'error': msg})
 
2443
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2444
 
 
2445
    def test_queued_mixed_types(self):
 
2446
        """Command gets queued if other command is waiting."""
 
2447
        cmd1 = FakeCommand()
 
2448
        self.rq.queue(cmd1)
 
2449
        cmd2 = ListVolumes(self.rq)
 
2450
        self.assertTrue(cmd2._should_be_queued())
 
2451
 
 
2452
    def test_queued_two(self):
 
2453
        """Two queued commands is not ok."""
 
2454
        cmd1 = ListVolumes(self.rq)
 
2455
        self.rq.queue(cmd1)
 
2456
        cmd2 = ListVolumes(self.rq)
 
2457
        self.assertFalse(cmd2._should_be_queued())
 
2458
 
 
2459
    def test_uniqueness(self):
 
2460
        """Info used for uniqueness."""
 
2461
        cmd = ListVolumes(self.rq)
 
2462
        self.assertEqual(cmd.uniqueness, 'ListVolumes')
 
2463
 
 
2464
 
 
2465
class DeleteVolumeTestCase(ConnectedBaseTestCase):
 
2466
    """Test for DeleteVolume ActionQueueCommand."""
 
2467
 
 
2468
    @defer.inlineCallbacks
 
2469
    def setUp(self):
 
2470
        """Init."""
 
2471
        yield super(DeleteVolumeTestCase, self).setUp()
 
2472
 
 
2473
        request_queue = RequestQueue(action_queue=self.action_queue)
 
2474
        self.command = DeleteVolume(request_queue, VOLUME, PATH)
 
2475
 
 
2476
        # silence the event to avoid propagation
 
2477
        listener_map = self.action_queue.event_queue.listener_map
 
2478
        del listener_map['AQ_DELETE_VOLUME_OK']
 
2479
        del listener_map['AQ_DELETE_VOLUME_ERROR']
 
2480
 
 
2481
    def test_is_action_queue_command(self):
 
2482
        """Test proper inheritance."""
 
2483
        self.assertTrue(isinstance(self.command, ActionQueueCommand))
 
2484
 
 
2485
    def test_init(self):
 
2486
        """Test creation."""
 
2487
        self.assertEqual(VOLUME, self.command.volume_id)
 
2488
 
 
2489
    def test_run_returns_a_deferred(self):
 
2490
        """Test a deferred is returned."""
 
2491
        res = self.command._run()
 
2492
        self.assertIsInstance(res, defer.Deferred)
 
2493
        res.addErrback(self.silent_connection_lost)
 
2494
 
 
2495
    def test_run_calls_protocol(self):
 
2496
        """Test protocol's delete_volume is called."""
 
2497
        original = self.command.action_queue.client.delete_volume
 
2498
        self.called = False
 
2499
 
 
2500
        def check(volume_id):
 
2501
            """Take control over client's feature."""
 
2502
            self.called = True
 
2503
            self.assertEqual(VOLUME, volume_id)
 
2504
 
 
2505
        self.command.action_queue.client.delete_volume = check
 
2506
 
 
2507
        self.command._run()
 
2508
 
 
2509
        self.assertTrue(self.called, 'command was called')
 
2510
 
 
2511
        self.command.action_queue.client.delete_volume = original
 
2512
 
 
2513
    def test_handle_success_push_event(self):
 
2514
        """Test AQ_DELETE_VOLUME_OK is pushed on success."""
 
2515
        request = client.DeleteVolume(self.action_queue.client,
 
2516
                                      volume_id=VOLUME)
 
2517
        self.command.handle_success(success=request)
 
2518
        events = [('AQ_DELETE_VOLUME_OK', {'volume_id': VOLUME})]
 
2519
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
2520
 
 
2521
    def test_handle_failure_push_event(self):
 
2522
        """Test AQ_DELETE_VOLUME_ERROR is pushed on failure."""
 
2523
        msg = 'Something went wrong'
 
2524
        failure = Failure(DefaultException(msg))
 
2525
        self.command.handle_failure(failure=failure)
 
2526
        event = ('AQ_DELETE_VOLUME_ERROR', {'volume_id': VOLUME, 'error': msg})
 
2527
        self.assertTrue(event in self.command.action_queue.event_queue.events)
 
2528
 
 
2529
    def test_path_locking(self):
 
2530
        """Test that it acquires correctly the path lock."""
 
2531
        t = []
 
2532
        self.patch(PathLockingTree, 'acquire',
 
2533
                   lambda s, *a, **k: t.extend((a, k)))
 
2534
        self.command._acquire_pathlock()
 
2535
        self.assertEqual(t, [tuple(PATH.split(os.path.sep)), {'logger': None}])
 
2536
 
 
2537
 
 
2538
class FilterEventsTestCase(BasicTestCase):
 
2539
    """Tests for event filtering when a volume is not of our interest."""
 
2540
 
 
2541
    @defer.inlineCallbacks
 
2542
    def setUp(self):
 
2543
        """Init."""
 
2544
        yield super(FilterEventsTestCase, self).setUp()
 
2545
        self.vm = self.main.vm
 
2546
        self.old_home = os.environ.get('HOME', None)
 
2547
        os.environ['HOME'] = self.home
 
2548
 
 
2549
    @defer.inlineCallbacks
 
2550
    def tearDown(self):
 
2551
        """Clean up."""
 
2552
        if self.old_home is None:
 
2553
            os.environ.pop('HOME')
 
2554
        else:
 
2555
            os.environ['HOME'] = self.old_home
 
2556
 
 
2557
        yield super(FilterEventsTestCase, self).tearDown()
 
2558
 
 
2559
 
 
2560
class ChangePublicAccessTests(ConnectedBaseTestCase):
 
2561
    """Tests for the ChangePublicAccess ActionQueueCommand."""
 
2562
 
 
2563
    @defer.inlineCallbacks
 
2564
    def setUp(self):
 
2565
        yield super(ChangePublicAccessTests, self).setUp()
 
2566
        self.user_connect()
 
2567
        request_queue = RequestQueue(action_queue=self.action_queue)
 
2568
        self.command = ChangePublicAccess(request_queue, VOLUME, NODE, True)
 
2569
 
 
2570
    def test_change_public_access(self):
 
2571
        """Test the change_public_access method.."""
 
2572
        self.action_queue.change_public_access(VOLUME, NODE, True)
 
2573
 
 
2574
    def test_is_action_queue_command(self):
 
2575
        """Test proper inheritance."""
 
2576
        self.assertTrue(isinstance(self.command, ActionQueueCommand))
 
2577
 
 
2578
    def test_init(self):
 
2579
        """Test creation."""
 
2580
        self.assertEqual(VOLUME, self.command.share_id)
 
2581
        self.assertEqual(NODE, self.command.node_id)
 
2582
        self.assertEqual(True, self.command.is_public)
 
2583
 
 
2584
    def test_run_defers_work_to_thread(self):
 
2585
        """Test that work is deferred to a thread."""
 
2586
        original = threads.deferToThread
 
2587
        self.called = False
 
2588
 
 
2589
        def check(function):
 
2590
            self.called = True
 
2591
            self.assertEqual(
 
2592
                self.command._change_public_access_http, function)
 
2593
            return defer.Deferred()
 
2594
 
 
2595
        threads.deferToThread = check
 
2596
        try:
 
2597
            res = self.command._run()
 
2598
        finally:
 
2599
            threads.deferToThread = original
 
2600
 
 
2601
        self.assertIsInstance(res, defer.Deferred)
 
2602
        self.assertTrue(self.called, "deferToThread was called")
 
2603
 
 
2604
    def test_change_public_access_http(self):
 
2605
        """Test the blocking portion of the command."""
 
2606
        self.called = False
 
2607
        def check(request):
 
2608
            self.called = True
 
2609
            url = 'https://one.ubuntu.com/files/api/set_public/%s:%s' % (
 
2610
                base64.urlsafe_b64encode(VOLUME.bytes).strip("="),
 
2611
                base64.urlsafe_b64encode(NODE.bytes).strip("="))
 
2612
            self.assertEqual(url, request.get_full_url())
 
2613
            self.assertEqual("is_public=True", request.get_data())
 
2614
            return StringIO(
 
2615
                '{"is_public": true, "public_url": "http://example.com"}')
 
2616
 
 
2617
        from ubuntuone.syncdaemon import action_queue
 
2618
        action_queue.urlopen = check
 
2619
        try:
 
2620
            res = self.command._change_public_access_http()
 
2621
        finally:
 
2622
            action_queue.urlopen = urllib2.urlopen
 
2623
 
 
2624
        self.assertEqual(
 
2625
            {'is_public': True, 'public_url': 'http://example.com'}, res)
 
2626
 
 
2627
    def test_handle_success_push_event(self):
 
2628
        """Test AQ_CHANGE_PUBLIC_ACCESS_OK is pushed on success."""
 
2629
        response = {'is_public': True, 'public_url': 'http://example.com'}
 
2630
        self.command.handle_success(success=response)
 
2631
        event = ('AQ_CHANGE_PUBLIC_ACCESS_OK',
 
2632
                 {'share_id': VOLUME, 'node_id': NODE, 'is_public': True,
 
2633
                  'public_url': 'http://example.com'})
 
2634
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2635
 
 
2636
    def test_handle_failure_push_event(self):
 
2637
        """Test AQ_CHANGE_PUBLIC_ACCESS_ERROR is pushed on failure."""
 
2638
        msg = 'Something went wrong'
 
2639
        failure = Failure(urllib2.HTTPError(
 
2640
                "http://example.com", 500, "Error", [], StringIO(msg)))
 
2641
        self.command.handle_failure(failure=failure)
 
2642
        event = ('AQ_CHANGE_PUBLIC_ACCESS_ERROR',
 
2643
                 {'share_id': VOLUME, 'node_id': NODE, 'error': msg})
 
2644
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2645
 
 
2646
    def test_possible_markers(self):
 
2647
        """Test that it returns the correct values."""
 
2648
        res = [getattr(self.command, x) for x in self.command.possible_markers]
 
2649
        self.assertEqual(res, [NODE])
 
2650
 
 
2651
 
 
2652
class GetPublicFilesTestCase(ConnectedBaseTestCase):
 
2653
    """Tests for GetPublicFiles ActionQueueCommand."""
 
2654
 
 
2655
    @defer.inlineCallbacks
 
2656
    def setUp(self):
 
2657
        yield super(GetPublicFilesTestCase, self).setUp()
 
2658
        self.user_connect()
 
2659
        self.rq = RequestQueue(action_queue=self.action_queue)
 
2660
        self.command = GetPublicFiles(self.rq)
 
2661
 
 
2662
    def test_init(self):
 
2663
        """Test __init__ method."""
 
2664
        default_url = 'https://one.ubuntu.com/files/api/public_files'
 
2665
        request_queue = RequestQueue(action_queue=self.action_queue)
 
2666
        command = GetPublicFiles(request_queue)
 
2667
        self.assertEqual(command._url, default_url)
 
2668
        custom_url = 'http://example.com:1234/files/api/public_files'
 
2669
        command_2 = GetPublicFiles(request_queue,
 
2670
                                   base_url='http://example.com:1234')
 
2671
        self.assertEqual(command_2._url, custom_url)
 
2672
 
 
2673
    def test_change_public_access(self):
 
2674
        """Test the get_public_files method.."""
 
2675
        self.action_queue.get_public_files()
 
2676
 
 
2677
    def test_is_action_queue_command(self):
 
2678
        """Test proper inheritance."""
 
2679
        self.assertTrue(isinstance(self.command, ActionQueueCommand))
 
2680
 
 
2681
    def test_run_defers_work_to_thread(self):
 
2682
        """Test that work is deferred to a thread."""
 
2683
        original = threads.deferToThread
 
2684
        self.called = False
 
2685
 
 
2686
        def check(function):
 
2687
            self.called = True
 
2688
            self.assertEqual(
 
2689
                self.command._get_public_files_http, function)
 
2690
            return defer.Deferred()
 
2691
 
 
2692
        threads.deferToThread = check
 
2693
        try:
 
2694
            res = self.command._run()
 
2695
        finally:
 
2696
            threads.deferToThread = original
 
2697
 
 
2698
        self.assertIsInstance(res, defer.Deferred)
 
2699
        self.assertTrue(self.called, "deferToThread was called")
 
2700
 
 
2701
    def test_get_public_files_http(self):
 
2702
        """Test the blocking portion of the command."""
 
2703
        self.called = False
 
2704
        node_id = uuid.uuid4()
 
2705
        nodekey = '%s' % (base64.urlsafe_b64encode(node_id.bytes).strip("="))
 
2706
        node_id_2 = uuid.uuid4()
 
2707
        nodekey_2 = '%s' % (base64.urlsafe_b64encode(
 
2708
                                                node_id_2.bytes).strip("="))
 
2709
        volume_id = uuid.uuid4()
 
2710
        def check(request):
 
2711
            self.called = True
 
2712
            url = 'https://one.ubuntu.com/files/api/public_files'
 
2713
            self.assertEqual(url, request.get_full_url())
 
2714
            return StringIO(
 
2715
                '[{"nodekey": "%s", "volume_id": null,"public_url": '
 
2716
                '"http://example.com"}, '
 
2717
                '{"nodekey": "%s", "volume_id": "%s", "public_url": '
 
2718
                '"http://example.com"}]' % (nodekey, nodekey_2, volume_id))
 
2719
 
 
2720
        from ubuntuone.syncdaemon import action_queue
 
2721
        action_queue.urlopen = check
 
2722
        try:
 
2723
            res = self.command._get_public_files_http()
 
2724
        finally:
 
2725
            action_queue.urlopen = urllib2.urlopen
 
2726
        self.assertEqual([{'node_id': str(node_id), 'volume_id': '',
 
2727
                          'public_url': 'http://example.com'},
 
2728
                          {'node_id': str(node_id_2),
 
2729
                           'volume_id': str(volume_id),
 
2730
                           'public_url': 'http://example.com'}], res)
 
2731
 
 
2732
    def test_handle_success_push_event(self):
 
2733
        """Test AQ_PUBLIC_FILES_LIST_OK is pushed on success."""
 
2734
        response = [{'node_id': uuid.uuid4(), 'volume_id':None,
 
2735
                    'public_url': 'http://example.com'}]
 
2736
        self.command.handle_success(success=response)
 
2737
        event = ('AQ_PUBLIC_FILES_LIST_OK', {'public_files': response,})
 
2738
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2739
 
 
2740
    def test_handle_failure_push_event(self):
 
2741
        """Test AQ_PUBLIC_FILES_LIST_ERROR is pushed on failure."""
 
2742
        msg = 'Something went wrong'
 
2743
        failure = Failure(urllib2.HTTPError(
 
2744
                "http://example.com", 500, "Error", [], StringIO(msg)))
 
2745
        self.command.handle_failure(failure=failure)
 
2746
        event = ('AQ_PUBLIC_FILES_LIST_ERROR', {'error': msg})
 
2747
        self.assertIn(event, self.command.action_queue.event_queue.events)
 
2748
 
 
2749
    def test_queued_mixed_types(self):
 
2750
        """Command gets queued if other command is waiting."""
 
2751
        cmd1 = FakeCommand()
 
2752
        self.rq.queue(cmd1)
 
2753
        cmd2 = GetPublicFiles(self.rq)
 
2754
        self.assertTrue(cmd2._should_be_queued())
 
2755
 
 
2756
    def test_queued_two(self):
 
2757
        """Two queued commands is not ok."""
 
2758
        cmd1 = GetPublicFiles(self.rq)
 
2759
        self.rq.queue(cmd1)
 
2760
        cmd2 = GetPublicFiles(self.rq)
 
2761
        self.assertFalse(cmd2._should_be_queued())
 
2762
 
 
2763
    def test_uniqueness(self):
 
2764
        """Info used for uniqueness."""
 
2765
        cmd = GetPublicFiles(self.rq)
 
2766
        self.assertEqual(cmd.uniqueness, 'GetPublicFiles')
 
2767
 
 
2768
 
 
2769
class DownloadUnconnectedTestCase(FactoryBaseTestCase):
 
2770
    """Test for Download ActionQueueCommand, no connection"""
 
2771
 
 
2772
    @defer.inlineCallbacks
 
2773
    def setUp(self):
 
2774
        """Init."""
 
2775
        yield super(DownloadUnconnectedTestCase, self).setUp()
 
2776
 
 
2777
        self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
 
2778
        self.command = Download(request_queue, share_id='a_share_id',
 
2779
                               node_id='a_node_id', server_hash='server_hash',
 
2780
                               path='path', fileobj_factory=lambda: None)
 
2781
        self.command.make_logger()
 
2782
 
 
2783
    def test_progress_information_setup(self):
 
2784
        """Test the setting up of the progress information in ._run()."""
 
2785
        self.command.action_queue.connect_in_progress = False
 
2786
        self.command.action_queue.client = FakeClient()
 
2787
        self.command._run()
 
2788
        self.assertEqual(self.command.n_bytes_read_last, 0)
 
2789
 
 
2790
        self.command.n_bytes_read = 20
 
2791
        self.command._run()
 
2792
        self.assertEqual(len(self.action_queue.client.called), 2)
 
2793
        meth, args, kwargs = self.action_queue.client.called[1]
 
2794
        self.assertEqual(meth, 'get_content_request')
 
2795
        self.assertEqual(kwargs['offset'], 20)
 
2796
 
 
2797
 
 
2798
class DownloadTestCase(ConnectedBaseTestCase):
 
2799
    """Test for Download ActionQueueCommand."""
 
2800
 
 
2801
    @defer.inlineCallbacks
 
2802
    def setUp(self):
 
2803
        """Init."""
 
2804
        yield super(DownloadTestCase, self).setUp()
 
2805
 
 
2806
        self.rq = RequestQueue(action_queue=self.action_queue)
 
2807
        self.rq.transfers_semaphore = FakeSemaphore()
 
2808
 
 
2809
        class MyDownload(Download):
 
2810
            """Just to allow monkeypatching."""
 
2811
            sync = lambda s: None
 
2812
        self.command = MyDownload(self.rq, share_id='a_share_id',
 
2813
                                  node_id='a_node_id',
 
2814
                                  server_hash='server_hash',
 
2815
                                  path= os.path.join(os.path.sep, 'foo', 'bar'),
 
2816
                                  fileobj_factory=StringIO)
 
2817
        self.command.make_logger()
 
2818
        self.rq.waiting.append(self.command)
 
2819
 
 
2820
    def test_AQ_DOWNLOAD_FILE_PROGRESS_is_valid_event(self):
 
2821
        """AQ_DOWNLOAD_FILE_PROGRESS is a valid event."""
 
2822
        event = 'AQ_DOWNLOAD_FILE_PROGRESS'
 
2823
        self.assertTrue(event in EVENTS)
 
2824
        self.assertEqual(('share_id', 'node_id', 'n_bytes_read',
 
2825
                          'deflated_size'), EVENTS[event])
 
2826
 
 
2827
    def test_progress(self):
 
2828
        """Test the progress machinery."""
 
2829
        # would first get the node attribute including this
 
2830
        class FakeDecompressor(object):
 
2831
            """Fake decompressor."""
 
2832
 
 
2833
            def decompress(self, data):
 
2834
                """Nothing!"""
 
2835
                return ""
 
2836
 
 
2837
        self.command._run()
 
2838
        self.command.gunzip = FakeDecompressor()
 
2839
        self.assertEqual(self.command.n_bytes_read, 0)
 
2840
        self.assertEqual(self.command.n_bytes_read_last, 0)
 
2841
        self.command.node_attr_cb(
 
2842
                            deflated_size = TRANSFER_PROGRESS_THRESHOLD * 2)
 
2843
 
 
2844
        self.command.downloaded_cb('x' * 5)
 
2845
        events = self.command.action_queue.event_queue.events
 
2846
        self.assertFalse('AQ_DOWNLOAD_FILE_PROGRESS' in [x[0] for x in events])
 
2847
        self.assertEqual(self.command.n_bytes_read, 5)
 
2848
        self.assertEqual(self.command.n_bytes_read_last, 0)
 
2849
 
 
2850
        self.command.downloaded_cb('x' * (TRANSFER_PROGRESS_THRESHOLD - 10))
 
2851
        self.assertFalse('AQ_DOWNLOAD_FILE_PROGRESS' in [x[0] for x in events])
 
2852
        self.assertEqual(self.command.n_bytes_read,
 
2853
                         TRANSFER_PROGRESS_THRESHOLD - 5)
 
2854
        self.assertEqual(self.command.n_bytes_read_last, 0)
 
2855
 
 
2856
        self.command.downloaded_cb('x' * 10)
 
2857
        kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id',
 
2858
                  'deflated_size': TRANSFER_PROGRESS_THRESHOLD * 2,
 
2859
                  'n_bytes_read': TRANSFER_PROGRESS_THRESHOLD + 5}
 
2860
        expected = ('AQ_DOWNLOAD_FILE_PROGRESS', kwargs)
 
2861
        self.assertTrue(expected in events)
 
2862
        self.assertEqual(self.command.n_bytes_read,
 
2863
                         TRANSFER_PROGRESS_THRESHOLD + 5)
 
2864
        self.assertEqual(self.command.n_bytes_read_last,
 
2865
                         self.command.n_bytes_read)
 
2866
 
 
2867
    def test_possible_markers(self):
 
2868
        """Test that it returns the correct values."""
 
2869
        res = [getattr(self.command, x) for x in self.command.possible_markers]
 
2870
        self.assertEqual(res, ['a_node_id'])
 
2871
 
 
2872
    def test_cancel_set_cancelled(self):
 
2873
        """Set the command to cancelled."""
 
2874
        assert not self.command.cancelled, "test badly set up"
 
2875
        did_cancel = self.command.cancel()
 
2876
        self.assertTrue(did_cancel)
 
2877
        self.assertTrue(self.command.cancelled)
 
2878
 
 
2879
    def test_cancel_download_req_is_none(self):
 
2880
        """It's ok to have download_req in None when cancelling."""
 
2881
        assert self.command.download_req is None, "test badly set up"
 
2882
        did_cancel = self.command.cancel()
 
2883
        self.assertTrue(did_cancel)
 
2884
 
 
2885
    def test_cancel_download_req_is_something(self):
 
2886
        """download_req is also cancelled."""
 
2887
        # set up the mocker
 
2888
        mocker = Mocker()
 
2889
        obj = mocker.mock()
 
2890
        obj.cancel()
 
2891
 
 
2892
        # test
 
2893
        with mocker:
 
2894
            self.command.download_req = obj
 
2895
            self.command.cancel()
 
2896
 
 
2897
    def test_cancel_remove(self):
 
2898
        """Remove the command from the queue."""
 
2899
        # set up the mocker
 
2900
        mocker = Mocker()
 
2901
        obj = mocker.mock()
 
2902
        obj.unqueue(self.command)
 
2903
 
 
2904
        # test
 
2905
        with mocker:
 
2906
            self.command._queue = obj
 
2907
            self.command.cancel()
 
2908
 
 
2909
    def test_cancel_clean_up(self):
 
2910
        """Clean up."""
 
2911
        called = []
 
2912
        self.command.cleanup = lambda: called.append(True)
 
2913
        self.command.cancel()
 
2914
        self.assertTrue(called)
 
2915
 
 
2916
    def test_uniqueness(self):
 
2917
        """Info used for uniqueness."""
 
2918
        u = self.command.uniqueness
 
2919
        self.assertEqual(u, ('MyDownload', 'a_share_id', 'a_node_id'))
 
2920
 
 
2921
    def test_path_locking(self):
 
2922
        """Test that it acquires correctly the path lock."""
 
2923
        t = []
 
2924
        self.patch(PathLockingTree, 'acquire',
 
2925
                   lambda s, *a, **k: t.extend((a, k)))
 
2926
        self.command._acquire_pathlock()
 
2927
        self.assertEqual(t, [("", "foo", "bar"), {'logger': self.command.log}])
 
2928
 
 
2929
    def test_upload_download_uniqueness(self):
 
2930
        """There should be only one upload/download for a specific node."""
 
2931
        # totally fake, we don't care: the messages are only validated on run
 
2932
        self.action_queue.download('foo', 'bar', 0, 'path', 0)
 
2933
        first_cmd = self.action_queue.queue.waiting[0]
 
2934
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
 
2935
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
2936
        self.assertTrue(first_cmd.cancelled)
 
2937
 
 
2938
    def test_uniqueness_upload(self):
 
2939
        """There should be only one upload/download for a specific node."""
 
2940
        # totally fake, we don't care: the messages are only validated on run
 
2941
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
 
2942
        first_cmd = self.action_queue.queue.waiting[0]
 
2943
        self.action_queue.download('foo', 'bar', 0, 'path', 0)
 
2944
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
2945
        self.assertTrue(first_cmd.cancelled)
 
2946
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
 
2947
                                                 "Upload", "foo", "bar"))
 
2948
 
 
2949
    def test_uniqueness_download(self):
 
2950
        """There should be only one upload/download for a specific node."""
 
2951
        # totally fake, we don't care: the messages are only validated on run
 
2952
        self.action_queue.download('foo', 'bar', 0, 'path0', 0)
 
2953
        first_cmd = self.action_queue.queue.waiting[0]
 
2954
        self.action_queue.download('foo', 'bar', 1, 'path1', 1)
 
2955
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
2956
        self.assertTrue(first_cmd.cancelled)
 
2957
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
 
2958
                                                 "Download", "foo", "bar"))
 
2959
 
 
2960
    def test_uniqueness_even_with_markers(self):
 
2961
        """Only one upload/download per node, even using markers."""
 
2962
        m = MDMarker('bar')
 
2963
        self.action_queue.download('share', m, 0, 'path', 0)
 
2964
        first_cmd = self.action_queue.queue.waiting[0]
 
2965
        self.action_queue.uuid_map.set('bar', 'bah')
 
2966
        self.action_queue.download('share', 'bah', 0, 'path', 0)
 
2967
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
2968
        self.assertTrue(first_cmd.cancelled)
 
2969
 
 
2970
    def test_uniqueness_tried_to_cancel_but_no(self):
 
2971
        """Previous command didn't cancel even if we tried it."""
 
2972
        # the first command will refuse to cancel (patch the class because
 
2973
        # the instance is not patchable)
 
2974
        self.action_queue.download('foo', 'bar', 0, 'path0', 0)
 
2975
        self.action_queue.queue.waiting[0]
 
2976
        self.patch(Download, 'cancel', lambda instance: False)
 
2977
 
 
2978
        self.action_queue.download('foo', 'bar', 1, 'path1', 1)
 
2979
        self.assertEqual(len(self.action_queue.queue.waiting), 2)
 
2980
        self.assertTrue(self.handler.check_debug("Tried to cancel", "couldn't",
 
2981
                                                 "Download", "foo", "bar"))
 
2982
 
 
2983
    def test_start_locks_on_semaphore(self):
 
2984
        """_start acquire the semaphore and locks."""
 
2985
        lock = defer.Deferred()
 
2986
        self.rq.transfers_semaphore.acquire = lambda: lock
 
2987
 
 
2988
        # _start and check it locked
 
2989
        started = self.command._start()
 
2990
        self.assertFalse(started.called)
 
2991
 
 
2992
        # release the lock and check it finished
 
2993
        o = object()
 
2994
        lock.callback(o)
 
2995
        self.assertTrue(started.called)
 
2996
        self.assertTrue(self.handler.check_debug('semaphore acquired'))
 
2997
        self.assertIdentical(o, self.command.tx_semaphore)
 
2998
 
 
2999
    def test_start_releases_semaphore_if_cancelled(self):
 
3000
        """Release the semaphore if cancelled while locking."""
 
3001
        lock = defer.Deferred()
 
3002
        self.rq.transfers_semaphore.acquire = lambda: lock
 
3003
 
 
3004
        # call start and cancel the command
 
3005
        self.command._start()
 
3006
        self.command.cancelled = True
 
3007
 
 
3008
        # release the lock
 
3009
        mocker = Mocker()
 
3010
        req = mocker.mock()
 
3011
        req.release()
 
3012
        with mocker:
 
3013
            lock.callback(req)
 
3014
 
 
3015
        # check it released the semaphore
 
3016
        self.assertTrue(self.handler.check_debug('semaphore released',
 
3017
                                                 'cancelled'))
 
3018
        self.assertIdentical(self.command.tx_semaphore, None)
 
3019
 
 
3020
    def test_finish_releases_semaphore_if_acquired(self):
 
3021
        """Test semaphore is released on finish if it was acquired."""
 
3022
        s = FakeSemaphore()
 
3023
        s.count = 1
 
3024
        self.command.tx_semaphore = s
 
3025
 
 
3026
        # finish and check
 
3027
        self.command.finish()
 
3028
        self.assertEqual(s.count, 0)
 
3029
        self.assertTrue(self.handler.check_debug('semaphore released'))
 
3030
 
 
3031
    def test_finish_releases_semaphore_not_there(self):
 
3032
        """Test semaphore is not released on finish if it was not acquired.
 
3033
 
 
3034
        This tests the situation where the command is finished before the lock
 
3035
        was acquired (cancelled even before its _start).
 
3036
        """
 
3037
        assert self.command.tx_semaphore is None
 
3038
        self.command.finish()
 
3039
 
 
3040
    def test_decompressor_restarted(self):
 
3041
        """Restart the decompressor on each _run (because of retries!)."""
 
3042
        # don't use the real protocol
 
3043
        obj = Mocker().mock()
 
3044
        obj.deferred
 
3045
        self.action_queue.client.get_content_request = lambda *a, **k: obj
 
3046
 
 
3047
        self.command._run()
 
3048
        decompressor1 = self.command.gunzip
 
3049
        self.command._run()
 
3050
        decompressor2 = self.command.gunzip
 
3051
        self.assertNotIdentical(decompressor1, decompressor2)
 
3052
 
 
3053
    def test_fileobj_in_run(self):
 
3054
        """Create it first time, reset after that."""
 
3055
        # don't use the real protocol
 
3056
        obj = Mocker().mock()
 
3057
        obj.deferred
 
3058
        self.action_queue.client.get_content_request = lambda *a, **k: obj
 
3059
 
 
3060
        class FakeFileObjFactory(object):
 
3061
            """Fake class to check behaviour."""
 
3062
            def __init__(self):
 
3063
                self.seek_count = 0
 
3064
                self.truncate_count = 0
 
3065
 
 
3066
            def seek(self, a, b):
 
3067
                """Fake seek."""
 
3068
                self.seek_count += 1
 
3069
 
 
3070
            def truncate(self, a):
 
3071
                """Fake truncate."""
 
3072
                self.truncate_count += 1
 
3073
 
 
3074
        cmd = Download(self.rq, 'a_share_id','a_node_id', 'server_hash',
 
3075
                       os.path.join(os.path.sep, 'foo','bar'),
 
3076
                       FakeFileObjFactory)
 
3077
 
 
3078
        # first run, it is just instantiated
 
3079
        cmd._run()
 
3080
        self.assertTrue(isinstance(cmd.fileobj, FakeFileObjFactory))
 
3081
        self.assertEqual(cmd.fileobj.seek_count, 0)
 
3082
        self.assertEqual(cmd.fileobj.truncate_count, 0)
 
3083
 
 
3084
        # next times it is reset
 
3085
        cmd._run()
 
3086
        self.assertEqual(cmd.fileobj.seek_count, 1)
 
3087
        self.assertEqual(cmd.fileobj.truncate_count, 1)
 
3088
 
 
3089
        cmd._run()
 
3090
        self.assertEqual(cmd.fileobj.seek_count, 2)
 
3091
        self.assertEqual(cmd.fileobj.truncate_count, 2)
 
3092
 
 
3093
 
 
3094
class UploadUnconnectedTestCase(FactoryBaseTestCase):
 
3095
    """Test for Upload ActionQueueCommand, no connection"""
 
3096
 
 
3097
    @defer.inlineCallbacks
 
3098
    def setUp(self):
 
3099
        """Init."""
 
3100
        yield super(UploadUnconnectedTestCase, self).setUp()
 
3101
 
 
3102
        class FakeMagicHash(object):
 
3103
            """Fake magic hash."""
 
3104
            _magic_hash = '666'
 
3105
 
 
3106
        self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
 
3107
        self.command = Upload(request_queue, share_id='a_share_id',
 
3108
                              node_id='a_node_id', previous_hash='prev_hash',
 
3109
                              hash='yadda', crc32=0, size=0, path='path',
 
3110
                              fileobj_factory=lambda: None)
 
3111
        self.command.make_logger()
 
3112
        self.command.magic_hash = FakeMagicHash()
 
3113
        self.client = FakeClient()
 
3114
        self.command.action_queue.client = self.client
 
3115
 
 
3116
    def test_upload_progress_wrapper_setup(self):
 
3117
        """Test the setting up of the progress wrapper in ._run()."""
 
3118
        self.command.action_queue.connect_in_progress = False
 
3119
        self.command._run()
 
3120
 
 
3121
        self.assertEqual(len(self.client.called), 1)
 
3122
        meth, args, kwargs = self.client.called[0]
 
3123
        self.assertEqual(meth, 'put_content_request')
 
3124
        upload_wrapper = args[7]
 
3125
        self.assertEqual(upload_wrapper.command, self.command)
 
3126
 
 
3127
    @defer.inlineCallbacks
 
3128
    def test_client_request(self):
 
3129
        """Request the corrent operation on the client."""
 
3130
 
 
3131
        data = "content to be sent in the upload"
 
3132
        self.command.tempfile = StringIO(data)
 
3133
        self.command.deflated_size = 123
 
3134
        yield self.command._run()
 
3135
 
 
3136
        self.assertEqual(len(self.client.called), 1)
 
3137
        meth, args, kwargs = self.client.called[0]
 
3138
        self.assertEqual(meth, 'put_content_request')
 
3139
        self.assertEqual(args[0], 'a_share_id')
 
3140
        self.assertEqual(args[1], 'a_node_id')
 
3141
        self.assertEqual(args[2], 'prev_hash')
 
3142
        self.assertEqual(args[3], 'yadda')
 
3143
        self.assertEqual(args[4], 0)
 
3144
        self.assertEqual(args[5], 0)
 
3145
        self.assertEqual(args[6], 123)
 
3146
        self.assertTrue(isinstance(args[7], UploadProgressWrapper))
 
3147
        self.assertEqual(kwargs['magic_hash'], '666')
 
3148
 
 
3149
 
 
3150
class UploadProgressWrapperTestCase(BaseTwistedTestCase):
 
3151
    """Test for the UploadProgressWrapper helper class."""
 
3152
 
 
3153
    def test_reset(self):
 
3154
        """Reset the values at start."""
 
3155
        f = StringIO("x" * 10  + "y" * 5)
 
3156
        cmd = FakeCommand()
 
3157
 
 
3158
        # first time
 
3159
        UploadProgressWrapper(f, cmd)
 
3160
        self.assertEqual(cmd.n_bytes_written, 0)
 
3161
        self.assertEqual(cmd.n_bytes_written_last, 0)
 
3162
 
 
3163
        # fake as it worked a little, was interrupted, and tried again
 
3164
        cmd.n_bytes_written_last = 1234
 
3165
        cmd.n_bytes_written = 1248
 
3166
        UploadProgressWrapper(f, cmd)
 
3167
        self.assertEqual(cmd.n_bytes_written, 0)
 
3168
        self.assertEqual(cmd.n_bytes_written_last, 0)
 
3169
 
 
3170
    def test_read(self):
 
3171
        """Test the read method."""
 
3172
        class FakeCommand(object):
 
3173
            """Fake command."""
 
3174
 
 
3175
            def __init__(self):
 
3176
                self.n_bytes_written = 0
 
3177
                self._progress_hook_called = 0
 
3178
 
 
3179
            def progress_hook(innerself):
 
3180
                """Count how many times it was called."""
 
3181
                innerself._progress_hook_called += 1
 
3182
 
 
3183
        f = StringIO("x" * 10  + "y" * 5)
 
3184
        cmd = FakeCommand()
 
3185
        upw = UploadProgressWrapper(f, cmd)
 
3186
 
 
3187
        res = upw.read(10)
 
3188
        self.assertEqual(res, "x" * 10)
 
3189
        self.assertEqual(cmd.n_bytes_written, 10)
 
3190
        self.assertEqual(cmd._progress_hook_called, 1)
 
3191
 
 
3192
        res = upw.read(10)
 
3193
        self.assertEqual(res, "y" * 5)
 
3194
        self.assertEqual(cmd.n_bytes_written, 15)
 
3195
        self.assertEqual(cmd._progress_hook_called, 2)
 
3196
 
 
3197
    def test_seek(self):
 
3198
        """Test the seek method."""
 
3199
        class FakeCommand(object):
 
3200
            """Fake command."""
 
3201
 
 
3202
            def __init__(self):
 
3203
                self.n_bytes_written = 0
 
3204
                self.n_bytes_written_last = 0
 
3205
                self._progress_hook_called = 0
 
3206
 
 
3207
            def progress_hook(innerself):
 
3208
                """Count how many times it was called."""
 
3209
                innerself._progress_hook_called += 1
 
3210
 
 
3211
        f = StringIO("v" * 10 + "w" * 10 + "x" * 5 + "y" * 5)
 
3212
        cmd = FakeCommand()
 
3213
        upw = UploadProgressWrapper(f, cmd)
 
3214
 
 
3215
        upw.seek(10)
 
3216
        self.assertEqual(cmd.n_bytes_written, 10)
 
3217
        self.assertEqual(cmd._progress_hook_called, 0)
 
3218
        res = upw.read(10)
 
3219
        self.assertEqual(res, "w" * 10)
 
3220
        self.assertEqual(cmd.n_bytes_written, 20)
 
3221
        self.assertEqual(cmd._progress_hook_called, 1)
 
3222
 
 
3223
        upw.seek(25)
 
3224
        self.assertEqual(cmd.n_bytes_written, 25)
 
3225
        self.assertEqual(cmd._progress_hook_called, 1)
 
3226
        res = upw.read(10)
 
3227
        self.assertEqual(res, "y" * 5)
 
3228
        self.assertEqual(cmd.n_bytes_written, 30)
 
3229
        self.assertEqual(cmd._progress_hook_called, 2)
 
3230
 
 
3231
 
 
3232
class UploadTestCase(ConnectedBaseTestCase):
 
3233
    """Test for Upload ActionQueueCommand."""
 
3234
 
 
3235
    @defer.inlineCallbacks
 
3236
    def setUp(self):
 
3237
        """Init."""
 
3238
        yield super(UploadTestCase, self).setUp()
 
3239
 
 
3240
        self.rq = RequestQueue(action_queue=self.action_queue)
 
3241
        self.rq.transfers_semaphore = FakeSemaphore()
 
3242
        self.rq.unqueue = lambda c: None
 
3243
        self.rq.active = True
 
3244
 
 
3245
        class MyUpload(Upload):
 
3246
            """Just to allow monkeypatching."""
 
3247
 
 
3248
        self.share_id = str(uuid.uuid4())
 
3249
        self.command = MyUpload(self.rq, share_id=self.share_id,
 
3250
                                node_id='a_node_id', previous_hash='prev_hash',
 
3251
                                hash='yadda', crc32=0, size=0,
 
3252
                                path=os.path.join(os.path.sep, 'foo', 'bar'),
 
3253
                                fileobj_factory=StringIO)
 
3254
        self.command.make_logger()
 
3255
 
 
3256
    @defer.inlineCallbacks
 
3257
    def test_upload_in_progress(self):
 
3258
        """Test Upload retries on UploadInProgress."""
 
3259
        # prepare the failure
 
3260
        protocol_msg = protocol_pb2.Message()
 
3261
        protocol_msg.type = protocol_pb2.Message.ERROR
 
3262
        protocol_msg.error.type = protocol_pb2.Error.UPLOAD_IN_PROGRESS
 
3263
        err = errors.UploadInProgressError("request", protocol_msg)
 
3264
 
 
3265
        # first fails with UploadInProgress, then finishes ok
 
3266
        called = []
 
3267
        run_deferreds = [defer.fail(err), defer.succeed(True)]
 
3268
        self.command._run = lambda: called.append(':)') or run_deferreds.pop(0)
 
3269
 
 
3270
        # wait handle_success
 
3271
        d = defer.Deferred()
 
3272
        self.command.handle_success = lambda _: d.callback(True)
 
3273
 
 
3274
        # go and check
 
3275
        self.command.go()
 
3276
        yield d
 
3277
        self.assertEqual(called, [':)', ':)'])
 
3278
 
 
3279
    def test_handle_success_push_event(self):
 
3280
        """Test AQ_UPLOAD_FINISHED is pushed on success."""
 
3281
        # create a request and fill it with succesful information
 
3282
        aq_client = TestingProtocol()
 
3283
        request = client.PutContent(aq_client, VOLUME, 'node',
 
3284
                                    'prvhash', 'newhash', 'crc32', 'size',
 
3285
                                    'deflated', 'fd')
 
3286
        request.new_generation = 13
 
3287
        self.command.tempfile = FakeTempFile(self.tmpdir)
 
3288
 
 
3289
        # trigger success in the command
 
3290
        self.command.handle_success(request)
 
3291
 
 
3292
        # check for successful event
 
3293
        kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
 
3294
                      hash='yadda', new_generation=13)
 
3295
        events = [('AQ_UPLOAD_FINISHED', kwargs)]
 
3296
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
3297
 
 
3298
    def test_handle_failure_push_event(self):
 
3299
        """Test AQ_UPLOAD_ERROR is pushed on failure."""
 
3300
        self.command.tempfile = FakeTempFile(self.tmpdir)
 
3301
        msg = 'Something went wrong'
 
3302
        failure = Failure(DefaultException(msg))
 
3303
        self.command.handle_failure(failure=failure)
 
3304
        kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
 
3305
                      hash='yadda', error=msg)
 
3306
        events = [('AQ_UPLOAD_ERROR', kwargs)]
 
3307
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
3308
 
 
3309
    def test_handle_failure_removes_temp_file(self):
 
3310
        """Test temp file is removed on failure."""
 
3311
        self.command.tempfile = FakeTempFile(self.tmpdir)
 
3312
        assert path_exists(self.command.tempfile.name)
 
3313
 
 
3314
        msg = 'Something went wrong'
 
3315
        failure = Failure(DefaultException(msg))
 
3316
        self.command.handle_failure(failure=failure)
 
3317
        self.assertFalse(path_exists(self.command.tempfile.name))
 
3318
 
 
3319
    def test_retryable_failure_push_quota_exceeded_if_that_error(self):
 
3320
        """Test SYS_QUOTA_EXCEEDED is pushed on QuotaExceededError."""
 
3321
        protocol_msg = protocol_pb2.Message()
 
3322
        protocol_msg.type = protocol_pb2.Message.ERROR
 
3323
        protocol_msg.error.type = protocol_pb2.Error.QUOTA_EXCEEDED
 
3324
        protocol_msg.free_space_info.share_id = self.command.share_id
 
3325
        protocol_msg.free_space_info.free_bytes = 1331564676
 
3326
        error = errors.QuotaExceededError("request", protocol_msg)
 
3327
        failure = Failure(error)
 
3328
 
 
3329
        self.command.handle_retryable(failure)
 
3330
        event = ('SYS_QUOTA_EXCEEDED', {'volume_id': self.command.share_id,
 
3331
                                        'free_bytes': 1331564676})
 
3332
        self.assertTrue(event in self.command.action_queue.event_queue.events)
 
3333
 
 
3334
    def test_retryable_failure_nothing_on_other_errors(self):
 
3335
        """Test nothing is pushed on other errors."""
 
3336
        failure = Failure(twisted_error.ConnectionLost())
 
3337
        self.command.handle_retryable(failure)
 
3338
        event_names = [x[0]
 
3339
                       for x in self.command.action_queue.event_queue.events]
 
3340
        self.assertFalse('SYS_QUOTA_EXCEEDED' in event_names)
 
3341
 
 
3342
    def test_AQ_UPLOAD_FILE_PROGRESS_is_valid_event(self):
 
3343
        """AQ_UPLOAD_FILE_PROGRESS is a valid event."""
 
3344
        event = 'AQ_UPLOAD_FILE_PROGRESS'
 
3345
        self.assertTrue(event in EVENTS)
 
3346
        self.assertEqual(('share_id', 'node_id', 'n_bytes_written',
 
3347
                          'deflated_size'), EVENTS[event])
 
3348
 
 
3349
    def test_progress_hook(self):
 
3350
        """Test the progress hook."""
 
3351
        self.command.deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD
 
3352
        self.command.n_bytes_written_last = 0
 
3353
 
 
3354
        self.command.n_bytes_written = 5
 
3355
        self.command.progress_hook()
 
3356
        self.assertEqual([], self.command.action_queue.event_queue.events)
 
3357
        self.assertEqual(self.command.n_bytes_written_last, 0)
 
3358
 
 
3359
        self.command.n_bytes_written = TRANSFER_PROGRESS_THRESHOLD - 5
 
3360
        self.command.progress_hook()
 
3361
        self.assertEqual([], self.command.action_queue.event_queue.events)
 
3362
        self.assertEqual(self.command.n_bytes_written_last, 0)
 
3363
 
 
3364
        self.command.n_bytes_written = TRANSFER_PROGRESS_THRESHOLD + 5
 
3365
        self.command.progress_hook()
 
3366
        kwargs = {'share_id': self.command.share_id, 'node_id': 'a_node_id',
 
3367
                  'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD,
 
3368
                  'n_bytes_written': 5+TRANSFER_PROGRESS_THRESHOLD }
 
3369
        events = [('AQ_UPLOAD_FILE_PROGRESS', kwargs)]
 
3370
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
3371
        self.assertEqual(self.command.n_bytes_written_last,
 
3372
                         TRANSFER_PROGRESS_THRESHOLD + 5)
 
3373
 
 
3374
    def test_runnable_space_ok(self):
 
3375
        """The upload is runnable if space ok."""
 
3376
        self.action_queue.have_sufficient_space_for_upload = lambda *a: True
 
3377
        self.assertTrue(self.command.is_runnable)
 
3378
 
 
3379
    def test_runnable_space_bad(self):
 
3380
        """The upload is not runnable without free space."""
 
3381
        self.action_queue.have_sufficient_space_for_upload = lambda *a: False
 
3382
        self.assertFalse(self.command.is_runnable)
 
3383
 
 
3384
    def test_runnable_space_bad_cancelled(self):
 
3385
        """The upload is runnable if cancelled even with no free space."""
 
3386
        self.action_queue.have_sufficient_space_for_upload = lambda *a: False
 
3387
        self.command.cancelled = True
 
3388
        self.assertTrue(self.command.is_runnable)
 
3389
 
 
3390
    def test_possible_markers(self):
 
3391
        """Test that it returns the correct values."""
 
3392
        res = [getattr(self.command, x) for x in self.command.possible_markers]
 
3393
        self.assertEqual(res, ['a_node_id'])
 
3394
 
 
3395
    def test_cancel_set_cancelled(self):
 
3396
        """Set the command to cancelled."""
 
3397
        assert not self.command.cancelled, "test badly set up"
 
3398
        self.command.cancel()
 
3399
        self.assertTrue(self.command.cancelled)
 
3400
 
 
3401
    def test_cancel_upload_req_is_none(self):
 
3402
        """It's ok to have upload_req in None when cancelling."""
 
3403
        assert self.command.upload_req is None, "test badly set up"
 
3404
        did_cancel = self.command.cancel()
 
3405
        self.assertTrue(did_cancel)
 
3406
 
 
3407
    def test_cancel_abort_when_producer_finished(self):
 
3408
        """If the producer already finished, don't really cancel."""
 
3409
        called = []
 
3410
        self.patch(ActionQueueCommand, 'cancel', lambda s: called.append(1))
 
3411
 
 
3412
        class FakeProducer(object):
 
3413
            """Fake producer."""
 
3414
            finished = True
 
3415
 
 
3416
        fake_request = FakeRequest()
 
3417
        fake_request.producer = FakeProducer()
 
3418
        self.command.upload_req = fake_request
 
3419
 
 
3420
        did_cancel = self.command.cancel()
 
3421
        self.assertFalse(did_cancel)
 
3422
        self.assertFalse(called)
 
3423
        self.assertFalse(fake_request.cancelled)
 
3424
 
 
3425
    def test_cancel_cancels_when_producer_not_finished(self):
 
3426
        """If the producer didn't finished, really cancel."""
 
3427
        called = []
 
3428
        self.patch(ActionQueueCommand, 'cancel',
 
3429
                   lambda s: called.append(True) or True)
 
3430
 
 
3431
        class FakeProducer(object):
 
3432
            """Fake producer."""
 
3433
            finished = False
 
3434
 
 
3435
        fake_request = FakeRequest()
 
3436
        fake_request.producer = FakeProducer()
 
3437
        self.command.upload_req = fake_request
 
3438
 
 
3439
        did_cancel = self.command.cancel()
 
3440
        self.assertTrue(did_cancel)
 
3441
        self.assertTrue(called)
 
3442
        self.assertTrue(fake_request.cancelled)
 
3443
 
 
3444
    def test_cancel_upload_req_is_something(self):
 
3445
        """upload_req is also cancelled."""
 
3446
        # set up the mocker
 
3447
        mocker = Mocker()
 
3448
        obj = mocker.mock()
 
3449
        obj.cancel()
 
3450
        obj.producer
 
3451
        obj.producer
 
3452
 
 
3453
        # test
 
3454
        with mocker:
 
3455
            self.command.upload_req = obj
 
3456
            self.command.cancel()
 
3457
 
 
3458
    def test_cancel_remove(self):
 
3459
        """Remove the command from the queue."""
 
3460
        # set up the mocker
 
3461
        mocker = Mocker()
 
3462
        obj = mocker.mock()
 
3463
        obj.unqueue(self.command)
 
3464
 
 
3465
        # test
 
3466
        with mocker:
 
3467
            self.command._queue = obj
 
3468
            self.command.cancel()
 
3469
 
 
3470
    def test_cancel_clean_up(self):
 
3471
        """Clean up."""
 
3472
        called = []
 
3473
        self.command.cleanup = lambda: called.append(True)
 
3474
        self.command.cancel()
 
3475
        self.assertTrue(called)
 
3476
 
 
3477
    def test_uniqueness(self):
 
3478
        """Info used for uniqueness."""
 
3479
        u = self.command.uniqueness
 
3480
        self.assertEqual(u, ('MyUpload', self.share_id, 'a_node_id'))
 
3481
 
 
3482
    def test_path_locking(self):
 
3483
        """Test that it acquires correctly the path lock."""
 
3484
        t = []
 
3485
        self.patch(PathLockingTree, 'acquire',
 
3486
                   lambda s, *a, **k: t.extend((a, k)))
 
3487
        self.command._acquire_pathlock()
 
3488
        self.assertEqual(t, [("", "foo", "bar"), {'logger': self.command.log}])
 
3489
 
 
3490
    def test_uniqueness_upload(self):
 
3491
        """There should be only one upload/download for a specific node."""
 
3492
        # totally fake, we don't care: the messages are only validated on run
 
3493
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0', StringIO)
 
3494
        first_cmd = self.action_queue.queue.waiting[0]
 
3495
        self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1', StringIO)
 
3496
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
3497
        self.assertTrue(first_cmd.cancelled)
 
3498
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
 
3499
                                                 "Upload", "foo", "bar"))
 
3500
 
 
3501
    def test_uniqueness_download(self):
 
3502
        """There should be only one upload/download for a specific node."""
 
3503
        # totally fake, we don't care: the messages are only validated on run
 
3504
        self.action_queue.download('foo', 'bar', 0, 'path', StringIO)
 
3505
        first_cmd = self.action_queue.queue.waiting[0]
 
3506
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
 
3507
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
3508
        self.assertTrue(first_cmd.cancelled)
 
3509
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
 
3510
                                                 "Download", "foo", "bar"))
 
3511
 
 
3512
    def test_uniqueness_even_with_markers(self):
 
3513
        """Only one upload/download per node, even using markers."""
 
3514
        m = MDMarker('bar')
 
3515
        self.action_queue.download('share', m, 0, 'path', StringIO)
 
3516
        first_cmd = self.action_queue.queue.waiting[0]
 
3517
        self.action_queue.uuid_map.set('bar', 'bah')
 
3518
        self.action_queue.upload('share', 'bah', 0, 0, 0, 0, 'path', StringIO)
 
3519
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
 
3520
        self.assertTrue(first_cmd.cancelled)
 
3521
 
 
3522
    def test_uniqueness_tried_to_cancel_but_no(self):
 
3523
        """Previous command didn't cancel even if we tried it."""
 
3524
        # the first command will refuse to cancel
 
3525
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0', StringIO)
 
3526
        self.action_queue.queue.waiting[0]
 
3527
        self.patch(Upload, 'cancel', lambda instance: False)
 
3528
 
 
3529
        self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1', StringIO)
 
3530
        self.assertEqual(len(self.action_queue.queue.waiting), 2)
 
3531
        self.assertTrue(self.handler.check_debug("Tried to cancel", "couldn't",
 
3532
                                                 "Upload", "foo", "bar"))
 
3533
 
 
3534
    def test_start_locks_on_semaphore(self):
 
3535
        """_start acquire the semaphore and locks."""
 
3536
        lock = defer.Deferred()
 
3537
        self.rq.transfers_semaphore.acquire = lambda: lock
 
3538
        self.action_queue.zip_queue.zip = lambda u: defer.succeed(True)
 
3539
 
 
3540
        # _start and check it locked
 
3541
        started = self.command._start()
 
3542
        self.assertFalse(started.called)
 
3543
 
 
3544
        # release the lock and check it finished
 
3545
        o = object()
 
3546
        lock.callback(o)
 
3547
        self.assertTrue(started.called)
 
3548
        self.assertTrue(self.handler.check_debug('semaphore acquired'))
 
3549
        self.assertIdentical(o, self.command.tx_semaphore)
 
3550
 
 
3551
    def test_start_releases_semaphore_if_cancelled(self):
 
3552
        """Release the semaphore if cancelled while locking."""
 
3553
        lock = defer.Deferred()
 
3554
        self.rq.transfers_semaphore.acquire = lambda: lock
 
3555
 
 
3556
        # call start and cancel the command
 
3557
        self.command._start()
 
3558
        self.command.cancelled = True
 
3559
 
 
3560
        # release the lock
 
3561
        mocker = Mocker()
 
3562
        req = mocker.mock()
 
3563
        req.release()
 
3564
        with mocker:
 
3565
            lock.callback(req)
 
3566
 
 
3567
        # check it released the semaphore
 
3568
        self.assertTrue(self.handler.check_debug('semaphore released',
 
3569
                                                 'cancelled'))
 
3570
        self.assertIdentical(self.command.tx_semaphore, None)
 
3571
 
 
3572
    def test_finish_releases_semaphore_if_acquired(self):
 
3573
        """Test semaphore is released on finish if it was acquired."""
 
3574
        s = FakeSemaphore()
 
3575
        s.count = 1
 
3576
        self.command.tx_semaphore = s
 
3577
 
 
3578
        # finish and check
 
3579
        self.command.finish()
 
3580
        self.assertEqual(s.count, 0)
 
3581
        self.assertTrue(self.handler.check_debug('semaphore released'))
 
3582
 
 
3583
    def test_finish_releases_semaphore_not_there(self):
 
3584
        """Test semaphore is not released on finish if it was not acquired.
 
3585
 
 
3586
        This tests the situation where the command is finished before the lock
 
3587
        was acquired (cancelled even before its _start).
 
3588
        """
 
3589
        assert self.command.tx_semaphore is None
 
3590
        self.command.finish()
 
3591
 
 
3592
    def test_handle_upload_id(self):
 
3593
        """Test the handling of upload_id."""
 
3594
        # change the share_id of the command
 
3595
        self.command.share_id = request.ROOT
 
3596
        # create the node
 
3597
        path = os.path.join(self.main.root_dir, 'foo')
 
3598
        self.main.fs.create(path=path, share_id=self.command.share_id,
 
3599
                            is_dir=False)
 
3600
        self.main.fs.set_node_id(path, self.command.node_id)
 
3601
        self.command._upload_id_cb('hola')
 
3602
        mdobj = self.main.fs.get_by_node_id(self.command.share_id,
 
3603
                                            self.command.node_id)
 
3604
        self.assertEqual('hola', mdobj.upload_id)
 
3605
 
 
3606
    def test_start_paused_use_upload_id(self):
 
3607
        """Test that starting a paused command make use of the upload_id."""
 
3608
        mh = content_hash.magic_hash_factory()
 
3609
        self.command.magic_hash = mh.content_hash()
 
3610
        # patch the client to check the args
 
3611
        self.command.action_queue.client = FakeClient()
 
3612
        self.command.tempfile = StringIO()
 
3613
        # change the share_id of the command
 
3614
        self.command.share_id = request.ROOT
 
3615
        # create the node
 
3616
        path = os.path.join(self.main.root_dir, 'foo')
 
3617
        self.main.fs.create(path=path, share_id=self.command.share_id,
 
3618
                            is_dir=False)
 
3619
        self.main.fs.set_node_id(path, self.command.node_id)
 
3620
        self.action_queue.queue.queue(self.command)
 
3621
        self.command._run()
 
3622
        # set the producer attribute
 
3623
        self.command.upload_req.producer = None
 
3624
        # upload id is None as this is the first upload
 
3625
        upload_id = self.command.action_queue.client.called[0][2]['upload_id']
 
3626
        self.assertEqual(upload_id, None)
 
3627
        # set the upload id via the callback
 
3628
        self.command._upload_id_cb('hola')
 
3629
        # pause it
 
3630
        self.command.pause()
 
3631
        # make it run again
 
3632
        self.command._run()
 
3633
        try:
 
3634
            upload_id = self.command.action_queue.client.called[1][2]['upload_id']
 
3635
            self.assertEqual(upload_id, 'hola')
 
3636
        finally:
 
3637
            self.command.action_queue.client = None
 
3638
 
 
3639
    def test_uses_rb_flags_when_creating_temp_file(self):
 
3640
        """Check that the 'b' flag is used for the temporary file."""
 
3641
        tempfile = NamedTemporaryFile()
 
3642
        self.assertEqual(tempfile.mode, 'w+b')
 
3643
 
 
3644
 
 
3645
class CreateShareTestCase(ConnectedBaseTestCase):
 
3646
    """Test for CreateShare ActionQueueCommand."""
 
3647
 
 
3648
    @defer.inlineCallbacks
 
3649
    def setUp(self):
 
3650
        """Init."""
 
3651
        yield super(CreateShareTestCase, self).setUp()
 
3652
        self.request_queue = RequestQueue(action_queue=self.action_queue)
 
3653
        self.orig_create_share_http = CreateShare._create_share_http
 
3654
 
 
3655
    @defer.inlineCallbacks
 
3656
    def tearDown(self):
 
3657
        yield super(CreateShareTestCase, self).tearDown()
 
3658
        CreateShare._create_share_http = self.orig_create_share_http
 
3659
 
 
3660
    @defer.inlineCallbacks
 
3661
    def test_access_level_modify_http(self):
 
3662
        """Test proper handling of the access level in the http case."""
 
3663
        # replace _create_share_http with a fake, just to check the args
 
3664
        d = defer.Deferred()
 
3665
        def check_create_http(self, node_id, user, name, read_only, deferred):
 
3666
            """Fire the deferred with the args."""
 
3667
            d.callback((node_id, user, name, read_only))
 
3668
            deferred.callback(None)
 
3669
        CreateShare._create_share_http = check_create_http
 
3670
        command = CreateShare(self.request_queue, 'node_id',
 
3671
                                   'share_to@example.com', 'share_name',
 
3672
                                   'Modify', 'marker', 'path')
 
3673
        self.assertTrue(command.use_http, 'CreateShare should be in http mode')
 
3674
 
 
3675
        command._run()
 
3676
        node_id, user, name, read_only = yield d
 
3677
        self.assertEqual('node_id', node_id)
 
3678
        self.assertEqual('share_to@example.com', user)
 
3679
        self.assertEqual('share_name', name)
 
3680
        self.assertFalse(read_only)
 
3681
 
 
3682
    @defer.inlineCallbacks
 
3683
    def test_access_level_view_http(self):
 
3684
        """Test proper handling of the access level in the http case."""
 
3685
        # replace _create_share_http with a fake, just to check the args
 
3686
        d = defer.Deferred()
 
3687
        def check_create_http(self, node_id, user, name, read_only, deferred):
 
3688
            """Fire the deferred with the args."""
 
3689
            d.callback((node_id, user, name, read_only))
 
3690
            deferred.callback(None)
 
3691
        CreateShare._create_share_http = check_create_http
 
3692
        command = CreateShare(self.request_queue, 'node_id',
 
3693
                                   'share_to@example.com', 'share_name',
 
3694
                                   'View', 'marker', 'path')
 
3695
        self.assertTrue(command.use_http, 'CreateShare should be in http mode')
 
3696
        command._run()
 
3697
        node_id, user, name, read_only = yield d
 
3698
        self.assertEqual('node_id', node_id)
 
3699
        self.assertEqual('share_to@example.com', user)
 
3700
        self.assertEqual('share_name', name)
 
3701
        self.assertTrue(read_only)
 
3702
 
 
3703
    def test_possible_markers(self):
 
3704
        """Test that it returns the correct values."""
 
3705
        cmd = CreateShare(self.request_queue, 'node_id', 'shareto@example.com',
 
3706
                          'share_name', 'View', 'marker', 'path')
 
3707
        res = [getattr(cmd, x) for x in cmd.possible_markers]
 
3708
        self.assertEqual(res, ['node_id'])
 
3709
 
 
3710
    def test_handle_success_sends_invitation(self):
 
3711
        """The success handler sends the AQ_SHARE_INVITATION_SENT event."""
 
3712
        marker_id = "marker"
 
3713
        share_name = "share name"
 
3714
        share_to = "share_to@example.com"
 
3715
 
 
3716
        class MockResult(object):
 
3717
            """A mock result."""
 
3718
            share_id = SHARE
 
3719
 
 
3720
        mock_success = MockResult()
 
3721
        cmd = CreateShare(self.request_queue, NODE, share_to,
 
3722
                          share_name, 'View', marker_id, 'path')
 
3723
        cmd.log = cmd.make_logger()
 
3724
        cmd.use_http = True
 
3725
        cmd.handle_success(mock_success)
 
3726
 
 
3727
        event_params = { 'marker': marker_id }
 
3728
        events = [('AQ_SHARE_INVITATION_SENT', event_params)]
 
3729
        self.assertEqual(events, cmd.action_queue.event_queue.events)
 
3730
 
 
3731
    def test_path_locking(self):
 
3732
        """Test that it acquires correctly the path lock."""
 
3733
        t = []
 
3734
        self.patch(PathLockingTree, 'acquire',
 
3735
                   lambda s, *a, **k: t.extend((a, k)))
 
3736
        cmd = CreateShare(self.request_queue, NODE, 'share_to',
 
3737
                          'share_name', 'View', 'marker_id',
 
3738
                          os.path.join('foo', 'bar'))
 
3739
        cmd._acquire_pathlock()
 
3740
        self.assertEqual(t, [('foo', 'bar'), {'logger': None}])
 
3741
 
 
3742
 
 
3743
class DeleteShareTestCase(ConnectedBaseTestCase):
 
3744
    """Test for DeleteShare ActionQueueCommand."""
 
3745
 
 
3746
    @defer.inlineCallbacks
 
3747
    def setUp(self):
 
3748
        """Init."""
 
3749
        yield super(DeleteShareTestCase, self).setUp()
 
3750
        request_queue = RequestQueue(action_queue=self.action_queue)
 
3751
        self.command = DeleteShare(request_queue, SHARE)
 
3752
 
 
3753
        # silence the event to avoid propagation
 
3754
        listener_map = self.action_queue.event_queue.listener_map
 
3755
        del listener_map['AQ_DELETE_SHARE_OK']
 
3756
        del listener_map['AQ_DELETE_SHARE_ERROR']
 
3757
 
 
3758
    def test_run_returns_a_deferred(self):
 
3759
        """Test a deferred is returned."""
 
3760
        res = self.command._run()
 
3761
        self.assertIsInstance(res, defer.Deferred)
 
3762
        res.addErrback(self.silent_connection_lost)
 
3763
 
 
3764
    def test_run_calls_protocol(self):
 
3765
        """Test protocol's delete_volume is called."""
 
3766
        self.called = False
 
3767
        def check(share_id):
 
3768
            """Take control over client's feature."""
 
3769
            self.called = True
 
3770
            self.assertEqual(SHARE, share_id)
 
3771
        self.patch(self.command.action_queue.client, 'delete_share', check)
 
3772
        self.command._run()
 
3773
        self.assertTrue(self.called, "command wasn't called")
 
3774
 
 
3775
    def test_handle_success_push_event(self):
 
3776
        """Test AQ_DELETE_SHARE_OK is pushed on success."""
 
3777
        request = client.DeleteShare(self.action_queue.client, SHARE)
 
3778
        self.command.handle_success(success=request)
 
3779
        events = [('AQ_DELETE_SHARE_OK', {'share_id': SHARE})]
 
3780
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
3781
 
 
3782
    def test_handle_failure_push_event(self):
 
3783
        """Test AQ_DELETE_SHARE_ERROR is pushed on failure."""
 
3784
        msg = 'Something went wrong'
 
3785
        failure = Failure(DefaultException(msg))
 
3786
        self.command.handle_failure(failure=failure)
 
3787
        events = [('AQ_DELETE_SHARE_ERROR',
 
3788
                  {'share_id': SHARE, 'error': msg})]
 
3789
        self.assertEqual(events, self.command.action_queue.event_queue.events)
 
3790
 
 
3791
 
 
3792
class SimpleAQTestCase(BasicTestCase):
 
3793
    """Simple tests for AQ API."""
 
3794
 
 
3795
    def test_aq_query_volumes(self):
 
3796
        """Check the API of AQ.query_volumes."""
 
3797
        self.main.start()
 
3798
        d = defer.Deferred()
 
3799
        def list_volumes():
 
3800
            """Fake list_volumes."""
 
3801
            result = DummyClass()
 
3802
            result.volumes = ['foo', 'bar']
 
3803
            return defer.succeed(result)
 
3804
 
 
3805
        self.action_queue.client = DummyClass()
 
3806
        self.action_queue.client.list_volumes = list_volumes
 
3807
        d = self.action_queue.query_volumes()
 
3808
        def check(result):
 
3809
            self.assertIn('foo', result)
 
3810
            self.assertIn('bar', result)
 
3811
            return result
 
3812
        d.addCallback(check)
 
3813
        return d
 
3814
 
 
3815
    def test_have_sufficient_space_for_upload_if_free_space_is_none(self):
 
3816
        """Check have_sufficient_space_for_upload.
 
3817
 
 
3818
        If free_space is None, SYS_QUOTA_EXCEEDED is not pushed.
 
3819
 
 
3820
        """
 
3821
        self.patch(self.action_queue.main.vm, 'get_free_space',
 
3822
                   lambda share_id: None)  # free space is None
 
3823
        volume_id = 'test share'
 
3824
        res = self.action_queue.have_sufficient_space_for_upload(volume_id,
 
3825
                                                                 upload_size=1)
 
3826
        self.assertTrue(res, "Must have enough space to upload.")
 
3827
        events = map(operator.itemgetter(0),
 
3828
                     self.action_queue.event_queue.events)
 
3829
        self.assertNotIn('SYS_QUOTA_EXCEEDED', events)
 
3830
 
 
3831
    def test_have_sufficient_space_for_upload_if_no_free_space(self):
 
3832
        """Check have_sufficient_space_for_upload pushes SYS_QUOTA_EXCEEDED."""
 
3833
        self.patch(self.action_queue.main.vm, 'get_free_space',
 
3834
                   lambda share_id: 0) # no free space, always
 
3835
        volume_id = 'test share'
 
3836
        res = self.action_queue.have_sufficient_space_for_upload(volume_id,
 
3837
                                                                 upload_size=1)
 
3838
        self.assertEqual(res, False, "Must not have enough space to upload.")
 
3839
        msg = 'SYS_QUOTA_EXCEEDED must have been pushed to event queue.'
 
3840
        expected = ('SYS_QUOTA_EXCEEDED',
 
3841
                    {'volume_id': volume_id, 'free_bytes': 0})
 
3842
        self.assertTrue(expected in self.action_queue.event_queue.events, msg)
 
3843
 
 
3844
    def test_have_sufficient_space_for_upload_if_free_space(self):
 
3845
        """Check have_sufficient_space_for_upload doesn't push any event."""
 
3846
        self.patch(self.action_queue.main.vm, 'get_free_space',
 
3847
                   lambda share_id: 1) # free space, always
 
3848
        res = self.action_queue.have_sufficient_space_for_upload(share_id=None,
 
3849
                                                                 upload_size=0)
 
3850
        self.assertEqual(res, True, "Must have enough space to upload.")
 
3851
        msg = 'No event must have been pushed to event queue.'
 
3852
        self.assertEqual(self.action_queue.event_queue.events, [], msg)
 
3853
 
 
3854
    def test_SYS_QUOTA_EXCEEDED_is_valid_event(self):
 
3855
        """SYS_QUOTA_EXCEEDED is a valid event."""
 
3856
        event = 'SYS_QUOTA_EXCEEDED'
 
3857
        self.assertTrue(event in EVENTS)
 
3858
        self.assertEqual(('volume_id', 'free_bytes'), EVENTS[event])
 
3859
 
 
3860
    def test_SYS_USER_CONNECT_is_valid_event(self):
 
3861
        """SYS_USER_CONNECT is a valid event."""
 
3862
        event = 'SYS_USER_CONNECT'
 
3863
        self.assertIn(event, EVENTS)
 
3864
        self.assertEqual(('access_token',), EVENTS[event])
 
3865
 
 
3866
    def test_handle_SYS_USER_CONNECT(self):
 
3867
        """handle_SYS_USER_CONNECT stores credentials."""
 
3868
        self.assertEqual(self.action_queue.token, None)
 
3869
        self.assertEqual(self.action_queue.consumer, None)
 
3870
 
 
3871
        self.user_connect()
 
3872
 
 
3873
        expected = oauth.OAuthToken('bla', 'ble')
 
3874
        self.assertEqual(self.action_queue.token.key, expected.key)
 
3875
        self.assertEqual(self.action_queue.token.secret, expected.secret)
 
3876
 
 
3877
        expected = oauth.OAuthConsumer('foo', 'bar')
 
3878
        self.assertEqual(self.action_queue.consumer.key, expected.key)
 
3879
        self.assertEqual(self.action_queue.consumer.secret, expected.secret)
 
3880
 
 
3881
 
 
3882
class SpecificException(Exception):
 
3883
    """The specific exception."""
 
3884
 
 
3885
 
 
3886
class SillyClass(object):
 
3887
    """Silly class that accepts the set of any attribute.
 
3888
 
 
3889
    We can't use object() directly, since its raises AttributeError.
 
3890
 
 
3891
    """
 
3892
 
 
3893
 
 
3894
class ErrorHandlingTestCase(BasicTestCase):
 
3895
    """Error handling tests for ActionQueue."""
 
3896
 
 
3897
    @defer.inlineCallbacks
 
3898
    def setUp(self):
 
3899
        """Init."""
 
3900
        yield super(ErrorHandlingTestCase, self).setUp()
 
3901
 
 
3902
        self.called = False
 
3903
        self.action_queue.client = SillyClass()
 
3904
        self.patch(self.main, 'restart', lambda: None)
 
3905
 
 
3906
        self.main.start()
 
3907
 
 
3908
    def fail_please(self, an_exception):
 
3909
        """Raise the given exception."""
 
3910
        def inner(*args, **kwargs):
 
3911
            """A request to the server that fails."""
 
3912
            self.called = True
 
3913
            return defer.fail(an_exception)
 
3914
        return inner
 
3915
 
 
3916
    def succeed_please(self, result):
 
3917
        """Return the given result."""
 
3918
        def inner(*args, **kwargs):
 
3919
            """A request to the server that succeeds."""
 
3920
            self.called = True
 
3921
            return defer.succeed(result)
 
3922
        return inner
 
3923
 
 
3924
    def mock_caps(self, accepted):
 
3925
        """Reply to query caps with False."""
 
3926
        def gset_caps(caps):
 
3927
            """get/set caps helper."""
 
3928
            req = SillyClass()
 
3929
            req.caps = caps
 
3930
            req.accepted = accepted
 
3931
            return defer.succeed(req)
 
3932
        return gset_caps
 
3933
 
 
3934
    def test_valid_event(self):
 
3935
        """SYS_SERVER_ERROR is valid in EventQueue."""
 
3936
        event = 'SYS_SERVER_ERROR'
 
3937
        self.assertTrue(event in EVENTS)
 
3938
        self.assertEqual(('error',), EVENTS[event])
 
3939
 
 
3940
    @defer.inlineCallbacks
 
3941
    def test_send_request_and_handle_errors_on_no_error(self):
 
3942
        """_send_request_and_handle_errors is correct when no error."""
 
3943
 
 
3944
        event = 'SYS_SPECIFIC_OK'
 
3945
        EVENTS[event] = () # add event to the global valid events list
 
3946
        self.addCleanup(EVENTS.pop, event)
 
3947
 
 
3948
        result = object()
 
3949
        request = self.succeed_please(result)
 
3950
        kwargs = dict(request=request, request_error=SpecificException,
 
3951
                      event_error='YADDA_YADDA', event_ok=event,
 
3952
                      args=(1, 2), kwargs={})
 
3953
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
3954
        actual_result = yield d
 
3955
 
 
3956
        self.assertTrue(self.called, 'the request was called')
 
3957
        self.assertEqual(actual_result, result)
 
3958
        self.assertEqual((event, {}),
 
3959
                         self.action_queue.event_queue.events[-1])
 
3960
 
 
3961
        # assert over logging
 
3962
        self.assertTrue(self.handler.check_info(request.__name__, 'OK'))
 
3963
 
 
3964
    @defer.inlineCallbacks
 
3965
    def test_send_request_and_handle_errors_with_no_event_ok(self):
 
3966
        """_send_request_and_handle_errors does not push event if is None."""
 
3967
        original_events = self.action_queue.event_queue.events[:]
 
3968
 
 
3969
        result = object()
 
3970
        request = self.succeed_please(result)
 
3971
        kwargs = dict(request=request, request_error=SpecificException,
 
3972
                      event_error='YADDA_YADDA', event_ok=None)
 
3973
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
3974
        actual_result = yield d
 
3975
 
 
3976
        self.assertTrue(self.called, 'the request was called')
 
3977
        self.assertEqual(actual_result, result)
 
3978
        self.assertEqual(original_events,
 
3979
                         self.action_queue.event_queue.events)
 
3980
 
 
3981
        # assert over logging
 
3982
        self.assertTrue(self.handler.check_info(request.__name__, 'OK'))
 
3983
 
 
3984
    @defer.inlineCallbacks
 
3985
    def test_send_request_and_handle_errors_on_valid_error(self):
 
3986
        """_send_request_and_handle_errors is correct when expected error."""
 
3987
 
 
3988
        event = 'SYS_SPECIFIC_ERROR'
 
3989
        EVENTS[event] = ('error',) # add event to the global valid events list
 
3990
        self.addCleanup(EVENTS.pop, event)
 
3991
 
 
3992
        exc = SpecificException('The request failed! please be happy.')
 
3993
        request = self.fail_please(exc)
 
3994
        kwargs = dict(request=request, request_error=SpecificException,
 
3995
                      event_error=event, event_ok='YADDA_YADDA')
 
3996
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
3997
        yield d
 
3998
 
 
3999
        self.assertTrue(self.called, 'the request was called')
 
4000
        self.assertEqual((event, {'error': str(exc)}),
 
4001
                         self.action_queue.event_queue.events[-1])
 
4002
 
 
4003
        # assert over logging
 
4004
        self.assertTrue(self.handler.check_info(request.__name__,
 
4005
                                                event, str(exc)))
 
4006
 
 
4007
    @defer.inlineCallbacks
 
4008
    def assert_send_request_and_handle_errors_on_connection_end(self, exc):
 
4009
        """_send_request_and_handle_errors is ok when connection lost/done."""
 
4010
        request = self.fail_please(exc)
 
4011
        kwargs = dict(request=request, request_error=SpecificException,
 
4012
                      event_error='BAR', event_ok='FOO')
 
4013
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
4014
        yield d
 
4015
 
 
4016
        # check that SYS_UNKNOWN_ERROR wasn't sent, and that logged ok
 
4017
        events = self.action_queue.event_queue.events
 
4018
        self.assertNotIn('SYS_UNKNOWN_ERROR', [x[0] for x in events])
 
4019
        self.assertTrue(self.handler.check_info(request.__name__, str(exc)))
 
4020
 
 
4021
    def test_send_request_and_handle_errors_on_connection_lost(self):
 
4022
        """_send_request_and_handle_errors is correct when connection lost."""
 
4023
        e = twisted_error.ConnectionLost()
 
4024
        return self.assert_send_request_and_handle_errors_on_connection_end(e)
 
4025
 
 
4026
    def test_send_request_and_handle_errors_on_connection_done(self):
 
4027
        """_send_request_and_handle_errors is correct when connection lost."""
 
4028
        e = twisted_error.ConnectionDone()
 
4029
        return self.assert_send_request_and_handle_errors_on_connection_end(e)
 
4030
 
 
4031
    def test_send_request_and_handle_errors_on_ssl_error(self):
 
4032
        """_send_request_and_handle_errors is correct when get a SSL error."""
 
4033
        e = OpenSSL.SSL.Error()
 
4034
        return self.assert_send_request_and_handle_errors_on_connection_end(e)
 
4035
 
 
4036
    @defer.inlineCallbacks
 
4037
    def assert_send_request_and_handle_errors_on_server_error(self, serr):
 
4038
        """_send_request_and_handle_errors is correct when server error."""
 
4039
        # XXX: we need to replace this list with and exception list
 
4040
        # once bug #557718 is resolved
 
4041
        msg = protocol_pb2.Message()
 
4042
        msg.type = protocol_pb2.Message.ERROR
 
4043
        msg.error.type = serr
 
4044
        msg.error.comment = 'Error message for %s.' % serr
 
4045
        exc = errors.error_to_exception(serr)(request=None, message=msg)
 
4046
 
 
4047
        request = self.fail_please(exc)
 
4048
        kwargs = dict(request=request, request_error=SpecificException,
 
4049
                      event_error='BAR', event_ok='FOO')
 
4050
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
4051
        yield d
 
4052
 
 
4053
        event = 'SYS_SERVER_ERROR'
 
4054
        self.assertEqual((event, {'error': str(exc)}),
 
4055
                         self.action_queue.event_queue.events[-1])
 
4056
 
 
4057
        # assert over logging
 
4058
        self.assertTrue(self.handler.check_info(request.__name__,
 
4059
                                                event, str(exc)))
 
4060
 
 
4061
    @defer.inlineCallbacks
 
4062
    def test_send_request_and_handle_errors_on_try_again(self):
 
4063
        """_send_request_and_handle_errors is correct when server error."""
 
4064
        serr = protocol_pb2.Error.TRY_AGAIN
 
4065
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4066
 
 
4067
    @defer.inlineCallbacks
 
4068
    def test_send_request_and_handle_errors_on_internal_error(self):
 
4069
        """_send_request_and_handle_errors is correct when server error."""
 
4070
        serr = protocol_pb2.Error.INTERNAL_ERROR
 
4071
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4072
 
 
4073
    @defer.inlineCallbacks
 
4074
    def test_send_request_and_handle_errors_on_protocol_error(self):
 
4075
        """_send_request_and_handle_errors is correct when server error."""
 
4076
        serr = protocol_pb2.Error.PROTOCOL_ERROR
 
4077
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4078
 
 
4079
    @defer.inlineCallbacks
 
4080
    def test_send_request_and_handle_errors_on_unsupported_version(self):
 
4081
        """_send_request_and_handle_errors is correct when server error."""
 
4082
        serr = protocol_pb2.Error.UNSUPPORTED_VERSION
 
4083
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4084
 
 
4085
    @defer.inlineCallbacks
 
4086
    def test_send_request_and_handle_errors_on_authetication_failed(self):
 
4087
        """_send_request_and_handle_errors is correct when server error."""
 
4088
        serr = protocol_pb2.Error.AUTHENTICATION_FAILED
 
4089
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4090
 
 
4091
    @defer.inlineCallbacks
 
4092
    def test_send_request_and_handle_errors_on_no_permission(self):
 
4093
        """_send_request_and_handle_errors is correct when server error."""
 
4094
        serr = protocol_pb2.Error.NO_PERMISSION
 
4095
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4096
 
 
4097
    @defer.inlineCallbacks
 
4098
    def test_send_request_and_handle_errors_on_already_exists(self):
 
4099
        """_send_request_and_handle_errors is correct when server error."""
 
4100
        serr = protocol_pb2.Error.ALREADY_EXISTS
 
4101
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4102
 
 
4103
    @defer.inlineCallbacks
 
4104
    def test_send_request_and_handle_errors_on_does_not_exist(self):
 
4105
        """_send_request_and_handle_errors is correct when server error."""
 
4106
        serr = protocol_pb2.Error.DOES_NOT_EXIST
 
4107
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4108
 
 
4109
    @defer.inlineCallbacks
 
4110
    def test_send_request_and_handle_errors_on_not_a_dir(self):
 
4111
        """_send_request_and_handle_errors is correct when server error."""
 
4112
        serr = protocol_pb2.Error.NOT_A_DIRECTORY
 
4113
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4114
 
 
4115
    @defer.inlineCallbacks
 
4116
    def test_send_request_and_handle_errors_on_not_empty(self):
 
4117
        """_send_request_and_handle_errors is correct when server error."""
 
4118
        serr = protocol_pb2.Error.NOT_EMPTY
 
4119
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4120
 
 
4121
    @defer.inlineCallbacks
 
4122
    def test_send_request_and_handle_errors_on_not_available(self):
 
4123
        """_send_request_and_handle_errors is correct when server error."""
 
4124
        serr = protocol_pb2.Error.NOT_AVAILABLE
 
4125
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4126
 
 
4127
    @defer.inlineCallbacks
 
4128
    def test_send_request_and_handle_errors_on_upload_in_progress(self):
 
4129
        """_send_request_and_handle_errors is correct when server error."""
 
4130
        serr = protocol_pb2.Error.UPLOAD_IN_PROGRESS
 
4131
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4132
 
 
4133
    @defer.inlineCallbacks
 
4134
    def test_send_request_and_handle_errors_on_upload_corrupt(self):
 
4135
        """_send_request_and_handle_errors is correct when server error."""
 
4136
        serr = protocol_pb2.Error.UPLOAD_CORRUPT
 
4137
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4138
 
 
4139
    @defer.inlineCallbacks
 
4140
    def test_send_request_and_handle_errors_on_upload_canceled(self):
 
4141
        """_send_request_and_handle_errors is correct when server error."""
 
4142
        serr = protocol_pb2.Error.UPLOAD_CANCELED
 
4143
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4144
 
 
4145
    @defer.inlineCallbacks
 
4146
    def test_send_request_and_handle_errors_on_conflict(self):
 
4147
        """_send_request_and_handle_errors is correct when server error."""
 
4148
        serr = protocol_pb2.Error.CONFLICT
 
4149
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4150
 
 
4151
    @defer.inlineCallbacks
 
4152
    def test_send_request_and_handle_errors_on_quota_exceeded(self):
 
4153
        """_send_request_and_handle_errors is correct when server error."""
 
4154
        serr = protocol_pb2.Error.QUOTA_EXCEEDED
 
4155
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4156
 
 
4157
    @defer.inlineCallbacks
 
4158
    def test_send_request_and_handle_errors_on_invalid_filename(self):
 
4159
        """_send_request_and_handle_errors is correct when server error."""
 
4160
        serr = protocol_pb2.Error.INVALID_FILENAME
 
4161
        yield self.assert_send_request_and_handle_errors_on_server_error(serr)
 
4162
 
 
4163
    @defer.inlineCallbacks
 
4164
    def test_send_request_and_handle_errors_on_unknown_error(self):
 
4165
        """_send_request_and_handle_errors is correct when unknown error."""
 
4166
        # XXX: we need to replace this list with and exception list
 
4167
        # once bug #557718 is resolved
 
4168
        serr = protocol_pb2.Error.AUTHENTICATION_REQUIRED
 
4169
        msg = protocol_pb2.Message()
 
4170
        msg.type = protocol_pb2.Message.ERROR
 
4171
        msg.error.type = serr
 
4172
        msg.error.comment = 'Error message for %s.' % serr
 
4173
        exc = errors.error_to_exception(serr)(request=None, message=msg)
 
4174
 
 
4175
        request = self.fail_please(exc)
 
4176
        kwargs = dict(request=request, request_error=SpecificException,
 
4177
                  event_error='BAR', event_ok='FOO')
 
4178
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
4179
        yield d
 
4180
 
 
4181
        event = 'SYS_UNKNOWN_ERROR'
 
4182
        self.assertIn((event, {}), self.action_queue.event_queue.events)
 
4183
 
 
4184
        # assert over logging
 
4185
        self.assertTrue(self.handler.check_info(request.__name__,
 
4186
                                                event, str(exc)))
 
4187
 
 
4188
    @defer.inlineCallbacks
 
4189
    def test_send_request_and_handle_errors_on_no_protocol_error(self):
 
4190
        """_send_request_and_handle_errors is ok when no-protocol error."""
 
4191
 
 
4192
        event = 'SYS_UNKNOWN_ERROR'
 
4193
        error_msg = 'Error message for any Exception.'
 
4194
        exc = Exception(error_msg)
 
4195
        request = self.fail_please(exc)
 
4196
        kwargs = dict(request=request, request_error=SpecificException,
 
4197
                      event_error='BAR', event_ok='FOO')
 
4198
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
4199
        yield d
 
4200
 
 
4201
        self.assertIn((event, {}), self.action_queue.event_queue.events)
 
4202
 
 
4203
        # assert over logging
 
4204
        self.assertTrue(self.handler.check_info(request.__name__,
 
4205
                                                event, str(exc)))
 
4206
 
 
4207
    @defer.inlineCallbacks
 
4208
    def test_send_request_and_handle_errors_on_client_mismatch(self):
 
4209
        """_send_request_and_handle_errors is correct when client mismatch."""
 
4210
 
 
4211
        def change_client(*args, **kwargs):
 
4212
            """Change AQ's client while doing the request."""
 
4213
            self.action_queue.client = object()
 
4214
 
 
4215
        self.action_queue.event_queue.events = [] # event cleanup
 
4216
        kwargs = dict(request=change_client, request_error=SpecificException,
 
4217
                      event_error='BAR', event_ok='FOO')
 
4218
        d = self.action_queue._send_request_and_handle_errors(**kwargs)
 
4219
        yield d
 
4220
 
 
4221
        self.assertEqual([], self.action_queue.event_queue.events)
 
4222
 
 
4223
        # assert over logging
 
4224
        self.assertTrue(self.handler.check_warning(change_client.__name__,
 
4225
                                                   'Client mismatch'))
 
4226
 
 
4227
    @defer.inlineCallbacks
 
4228
    def test_check_version_when_unsupported_version_exception(self):
 
4229
        """Test error handling after UnsupportedVersionError."""
 
4230
        # raise a UnsupportedVersionError
 
4231
        msg = protocol_pb2.Message()
 
4232
        msg.type = protocol_pb2.Message.ERROR
 
4233
        msg.error.type = protocol_pb2.Error.UNSUPPORTED_VERSION
 
4234
        msg.error.comment = 'This is a funny comment.'
 
4235
        exc = errors.UnsupportedVersionError(request=None, message=msg)
 
4236
 
 
4237
        self.action_queue.client.protocol_version = self.fail_please(exc)
 
4238
        yield self.action_queue.check_version()
 
4239
        event = ('SYS_PROTOCOL_VERSION_ERROR', {'error': str(exc)})
 
4240
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4241
 
 
4242
    @defer.inlineCallbacks
 
4243
    def test_set_capabilities_when_query_caps_not_accepted(self):
 
4244
        """Test error handling when the query caps are not accepeted."""
 
4245
 
 
4246
        # query_caps returns False
 
4247
        self.action_queue.client.query_caps = self.mock_caps(accepted=False)
 
4248
 
 
4249
        yield self.action_queue.set_capabilities(caps=None)
 
4250
        msg = "The server doesn't have the requested capabilities"
 
4251
        event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
 
4252
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4253
        self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
 
4254
                          self.action_queue.event_queue.events)
 
4255
 
 
4256
    @defer.inlineCallbacks
 
4257
    def test_set_capabilities_when_set_caps_not_accepted(self):
 
4258
        """Test error handling when the query caps are not accepted."""
 
4259
 
 
4260
        # query_caps returns True and set_caps returns False
 
4261
        self.action_queue.client.query_caps = self.mock_caps(accepted=True)
 
4262
        self.action_queue.client.set_caps = self.mock_caps(accepted=False)
 
4263
 
 
4264
        caps = 'very difficult cap'
 
4265
        yield self.action_queue.set_capabilities(caps=caps)
 
4266
        msg = "The server denied setting '%s' capabilities" % caps
 
4267
        event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
 
4268
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4269
        self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
 
4270
                          self.action_queue.event_queue.events)
 
4271
 
 
4272
    @defer.inlineCallbacks
 
4273
    def test_set_capabilities_when_client_is_none(self):
 
4274
        """Test error handling when the client is None."""
 
4275
 
 
4276
        self.action_queue.client = None
 
4277
 
 
4278
        yield self.action_queue.set_capabilities(caps=None)
 
4279
        msg = "'NoneType' object has no attribute 'query_caps'"
 
4280
        event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
 
4281
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4282
        self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
 
4283
                          self.action_queue.event_queue.events)
 
4284
 
 
4285
    @defer.inlineCallbacks
 
4286
    def test_set_capabilities_when_set_caps_is_accepted(self):
 
4287
        """Test error handling when the query caps are not accepeted."""
 
4288
 
 
4289
        # query_caps returns True and set_caps returns True
 
4290
        self.action_queue.client.query_caps = self.mock_caps(accepted=True)
 
4291
        self.action_queue.client.set_caps = self.mock_caps(accepted=True)
 
4292
 
 
4293
        yield self.action_queue.set_capabilities(caps=None)
 
4294
        event = ('SYS_SET_CAPABILITIES_OK', {})
 
4295
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4296
 
 
4297
    @defer.inlineCallbacks
 
4298
    def test_authenticate_when_authenticated(self):
 
4299
        """Test error handling after authenticate with no error."""
 
4300
        request = client.Authenticate(self.action_queue.client,
 
4301
                                      {'dummy_token': 'credentials'})
 
4302
        request.session_id = str(uuid.uuid4())
 
4303
        self.action_queue.client.oauth_authenticate = \
 
4304
            self.succeed_please(result=request)
 
4305
        yield self.action_queue.authenticate()
 
4306
        event = ('SYS_AUTH_OK', {})
 
4307
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4308
 
 
4309
    @defer.inlineCallbacks
 
4310
    def test_authenticate_when_authentication_failed_exception(self):
 
4311
        """Test error handling after AuthenticationFailedError."""
 
4312
        # raise a AuthenticationFailedError
 
4313
        msg = protocol_pb2.Message()
 
4314
        msg.type = protocol_pb2.Message.ERROR
 
4315
        msg.error.type = protocol_pb2.Error.AUTHENTICATION_FAILED
 
4316
        msg.error.comment = 'This is a funny comment.'
 
4317
        exc = errors.AuthenticationFailedError(request=None, message=msg)
 
4318
 
 
4319
        self.action_queue.client.oauth_authenticate = self.fail_please(exc)
 
4320
        yield self.action_queue.authenticate()
 
4321
        event = ('SYS_AUTH_ERROR', {'error': str(exc)})
 
4322
        self.assertEqual(event, self.action_queue.event_queue.events[-1])
 
4323
 
 
4324
 
 
4325
class GetDeltaTestCase(ConnectedBaseTestCase):
 
4326
    """Test for GetDelta ActionQueueCommand."""
 
4327
 
 
4328
    @defer.inlineCallbacks
 
4329
    def setUp(self):
 
4330
        """Init."""
 
4331
        yield super(GetDeltaTestCase, self).setUp()
 
4332
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4333
 
 
4334
    def test_action_queue_get_delta(self):
 
4335
        """Test AQ get delta."""
 
4336
        self.action_queue.get_delta(VOLUME, 0)
 
4337
 
 
4338
    def test_is_action_queue_command(self):
 
4339
        """Test proper inheritance."""
 
4340
        cmd = GetDelta(self.rq, VOLUME, 0)
 
4341
        self.assertTrue(isinstance(cmd, ActionQueueCommand))
 
4342
 
 
4343
    def test_run_returns_a_deferred(self):
 
4344
        """Test a deferred is returned."""
 
4345
        cmd = GetDelta(self.rq, VOLUME, 0)
 
4346
        res = cmd._run()
 
4347
        self.assertIsInstance(res, defer.Deferred)
 
4348
        res.addErrback(self.silent_connection_lost)
 
4349
 
 
4350
    def test_run_calls_protocol(self):
 
4351
        """Test protocol's get delta is called."""
 
4352
        called = []
 
4353
        self.patch(self.action_queue.client, 'get_delta',
 
4354
                   lambda *a: called.append(a))
 
4355
 
 
4356
        cmd = GetDelta(self.rq, VOLUME, 35)
 
4357
        cmd._run()
 
4358
        self.assertEqual(called[0], (VOLUME, 35))
 
4359
 
 
4360
    def test_handle_success_push_event(self):
 
4361
        """Test AQ_DELTA_OK is pushed on success."""
 
4362
        # create a request and fill it with succesful information
 
4363
        request = client.GetDelta(self.action_queue.client,
 
4364
                                  share_id=VOLUME, from_generation=21)
 
4365
        request.response = ['foo', 'bar']
 
4366
        request.end_generation = 76
 
4367
        request.full = True
 
4368
        request.free_bytes = 1231234
 
4369
 
 
4370
        # create a command and trigger it success
 
4371
        cmd = GetDelta(self.rq, VOLUME, 21)
 
4372
        cmd.handle_success(request)
 
4373
 
 
4374
        # check for successful event
 
4375
        received = self.action_queue.event_queue.events[0]
 
4376
        delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'],
 
4377
                          end_generation=76,
 
4378
                          full=True, free_bytes=1231234)
 
4379
        self.assertEqual(received, ('AQ_DELTA_OK', delta_info))
 
4380
        self.assertTrue(isinstance(received[1]["delta_content"], DeltaList))
 
4381
 
 
4382
    def test_handle_generic_failure_push_event(self):
 
4383
        """Test AQ_DELTA_ERROR is pushed on failure."""
 
4384
        # create a failure
 
4385
        msg = 'Something went wrong'
 
4386
        failure = Failure(DefaultException(msg))
 
4387
 
 
4388
        # create a command and trigger it success
 
4389
        cmd = GetDelta(self.rq, VOLUME, 77)
 
4390
        cmd.handle_failure(failure=failure)
 
4391
 
 
4392
        # check for event
 
4393
        received = self.action_queue.event_queue.events[0]
 
4394
        self.assertEqual(received, ('AQ_DELTA_ERROR',
 
4395
                                    {'volume_id': VOLUME, 'error': msg}))
 
4396
 
 
4397
    def test_handle_notpossible_failure_push_event(self):
 
4398
        """Test AQ_DELTA_NOT_POSSIBLE is pushed on that failure."""
 
4399
        # create a failure
 
4400
        msg = protocol_pb2.Message()
 
4401
        msg.type = protocol_pb2.Message.ERROR
 
4402
        msg.error.type = protocol_pb2.Error.CANNOT_PRODUCE_DELTA
 
4403
        msg.error.comment = 'Something went wrong'
 
4404
        failure = Failure(errors.CannotProduceDelta(self.rq, msg))
 
4405
 
 
4406
        # create a command and trigger it success
 
4407
        cmd = GetDelta(self.rq, VOLUME, 2)
 
4408
        cmd.handle_failure(failure=failure)
 
4409
 
 
4410
        # check for event
 
4411
        received = self.action_queue.event_queue.events[0]
 
4412
        self.assertEqual(received, ('AQ_DELTA_NOT_POSSIBLE',
 
4413
                                    {'volume_id': VOLUME}))
 
4414
 
 
4415
    def test_queued_mixed_types(self):
 
4416
        """Command gets queued if other command is waiting."""
 
4417
        cmd1 = FakeCommand()
 
4418
        self.rq.queue(cmd1)
 
4419
        cmd2 = GetDelta(self.rq, 'vol2', 0)
 
4420
        self.assertTrue(cmd2._should_be_queued())
 
4421
 
 
4422
    def test_queued_two_different(self):
 
4423
        """Two different queued commands is ok."""
 
4424
        cmd1 = GetDelta(self.rq, 'vol1', 0)
 
4425
        self.rq.queue(cmd1)
 
4426
        cmd2 = GetDelta(self.rq, 'vol2', 0)
 
4427
        self.assertTrue(cmd2._should_be_queued())
 
4428
 
 
4429
    def test_queued_two_equal_second_bigger(self):
 
4430
        """When two equals, only survive the one with smaller gen, first."""
 
4431
        cmd1 = GetDelta(self.rq, 'vol', 3)
 
4432
        self.rq.queue(cmd1)
 
4433
        cmd2 = GetDelta(self.rq, 'vol', 5)
 
4434
        cmd2.make_logger()
 
4435
        self.assertFalse(cmd2._should_be_queued())
 
4436
        self.assertTrue(self.handler.check_debug("not queueing self"))
 
4437
 
 
4438
    def test_queued_two_equal_second_smaller_first_not_running(self):
 
4439
        """Two equals, survive the one with smaller gen, second, not run."""
 
4440
        cmd1 = GetDelta(self.rq, 'vol', 5)
 
4441
        self.rq.queue(cmd1)
 
4442
        cmd1.running = False
 
4443
        cmd2 = GetDelta(self.rq, 'vol', 3)
 
4444
        cmd2.make_logger()
 
4445
        self.assertTrue(cmd2._should_be_queued())
 
4446
        self.assertFalse(cmd1 in self.rq.waiting)
 
4447
        self.assertTrue(self.handler.check_debug("removing previous command"))
 
4448
 
 
4449
    def test_queued_two_equal_second_smaller_first_running(self):
 
4450
        """Two equals, survive the one with smaller gen, second, running."""
 
4451
        cmd1 = GetDelta(self.rq, 'vol', 5)
 
4452
        self.rq.queue(cmd1)
 
4453
        cmd2 = GetDelta(self.rq, 'vol', 3)
 
4454
        cmd2.make_logger()
 
4455
        self.assertTrue(cmd2._should_be_queued())
 
4456
 
 
4457
    def test_queued_three_equal(self):
 
4458
        """When several equals, only survive the one with smaller gen."""
 
4459
        cmd1 = GetDelta(self.rq, 'vol', 5)
 
4460
        self.rq.queue(cmd1)
 
4461
        cmd1.running = False
 
4462
        cmd2 = GetDelta(self.rq, 'vol', 3)
 
4463
        cmd2.make_logger()
 
4464
        assert cmd2._should_be_queued()
 
4465
        self.rq.queue(cmd2)
 
4466
        cmd3 = GetDelta(self.rq, 'vol', 7)
 
4467
        cmd3.make_logger()
 
4468
        self.assertFalse(cmd3._should_be_queued())
 
4469
        self.assertFalse(cmd1 in self.rq.waiting)
 
4470
 
 
4471
    def test_uniqueness(self):
 
4472
        """Info used for uniqueness."""
 
4473
        cmd = GetDelta(self.rq, 'vol', 1)
 
4474
        self.assertEqual(cmd.uniqueness, ('GetDelta', 'vol'))
 
4475
 
 
4476
 
 
4477
class GetDeltaFromScratchTestCase(ConnectedBaseTestCase):
 
4478
    """Test for GetDelta ActionQueueCommand."""
 
4479
 
 
4480
    @defer.inlineCallbacks
 
4481
    def setUp(self):
 
4482
        """Init."""
 
4483
        yield super(GetDeltaFromScratchTestCase, self).setUp()
 
4484
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4485
 
 
4486
    def test_action_queue_get_delta(self):
 
4487
        """Test AQ get delta."""
 
4488
        self.action_queue.rescan_from_scratch(VOLUME)
 
4489
 
 
4490
    def test_is_action_queue_command(self):
 
4491
        """Test proper inheritance."""
 
4492
        cmd = GetDeltaFromScratch(self.rq, VOLUME)
 
4493
        self.assertTrue(isinstance(cmd, ActionQueueCommand))
 
4494
 
 
4495
    def test_run_returns_a_deferred(self):
 
4496
        """Test a deferred is returned."""
 
4497
        cmd = GetDelta(self.rq, VOLUME, 0)
 
4498
        res = cmd._run()
 
4499
        self.assertIsInstance(res, defer.Deferred)
 
4500
        res.addErrback(self.silent_connection_lost)
 
4501
 
 
4502
    def test_run_calls_protocol(self):
 
4503
        """Test protocol's get delta is called."""
 
4504
        called = []
 
4505
        self.patch(self.action_queue.client, 'get_delta',
 
4506
                   lambda *a, **b: called.append((a, b)))
 
4507
 
 
4508
        cmd = GetDeltaFromScratch(self.rq, VOLUME)
 
4509
        cmd._run()
 
4510
        self.assertEqual(called[0], ((VOLUME,), {'from_scratch': True}))
 
4511
 
 
4512
    def test_handle_success_push_event(self):
 
4513
        """Test AQ_DELTA_OK is pushed on success."""
 
4514
        # create a request and fill it with succesful information
 
4515
        request = client.GetDelta(self.action_queue.client,
 
4516
                                  share_id=VOLUME, from_scratch=True)
 
4517
        request.response = ['foo', 'bar']
 
4518
        request.end_generation = 76
 
4519
        request.full = True
 
4520
        request.free_bytes = 1231234
 
4521
 
 
4522
        # create a command and trigger it success
 
4523
        cmd = GetDeltaFromScratch(self.rq, VOLUME)
 
4524
        cmd.handle_success(request)
 
4525
 
 
4526
        # check for successful event
 
4527
        received = self.action_queue.event_queue.events[0]
 
4528
        delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'],
 
4529
                          end_generation=76,
 
4530
                          free_bytes=1231234)
 
4531
        self.assertEqual(received, ('AQ_RESCAN_FROM_SCRATCH_OK', delta_info))
 
4532
        self.assertTrue(isinstance(received[1]["delta_content"], DeltaList))
 
4533
 
 
4534
    def test_handle_generic_failure_push_event(self):
 
4535
        """Test AQ_DELTA_ERROR is pushed on failure."""
 
4536
        # create a failure
 
4537
        msg = 'Something went wrong'
 
4538
        failure = Failure(DefaultException(msg))
 
4539
 
 
4540
        # create a command and trigger it success
 
4541
        cmd = GetDeltaFromScratch(self.rq, VOLUME)
 
4542
        cmd.handle_failure(failure=failure)
 
4543
 
 
4544
        # check for event
 
4545
        received = self.action_queue.event_queue.events[0]
 
4546
        self.assertEqual(received, ('AQ_RESCAN_FROM_SCRATCH_ERROR',
 
4547
                                    {'volume_id': VOLUME, 'error': msg}))
 
4548
 
 
4549
    def test_queued_mixed_types(self):
 
4550
        """Command gets queued if other command is waiting."""
 
4551
        cmd1 = FakeCommand()
 
4552
        self.rq.queue(cmd1)
 
4553
        cmd2 = GetDeltaFromScratch(self.rq, 'vol2')
 
4554
        self.assertTrue(cmd2._should_be_queued())
 
4555
 
 
4556
    def test_queued_two_different(self):
 
4557
        """Two different queued commands is ok."""
 
4558
        cmd1 = GetDeltaFromScratch(self.rq, 'vol1')
 
4559
        self.rq.queue(cmd1)
 
4560
        cmd2 = GetDeltaFromScratch(self.rq, 'vol2')
 
4561
        self.assertTrue(cmd2._should_be_queued())
 
4562
 
 
4563
    def test_queued_two_equal(self):
 
4564
        """When two equals, only survive the first one."""
 
4565
        cmd1 = GetDeltaFromScratch(self.rq, 'vol')
 
4566
        self.rq.queue(cmd1)
 
4567
        cmd2 = GetDeltaFromScratch(self.rq, 'vol')
 
4568
        cmd2.make_logger()
 
4569
        self.assertFalse(cmd2._should_be_queued())
 
4570
        self.assertTrue(self.handler.check_debug("not queueing self"))
 
4571
 
 
4572
    def test_uniqueness(self):
 
4573
        """Info used for uniqueness."""
 
4574
        cmd = GetDeltaFromScratch(self.rq, 'vol')
 
4575
        self.assertEqual(cmd.uniqueness, ('GetDeltaFromScratch', 'vol'))
 
4576
 
 
4577
 
 
4578
class UnlinkTestCase(ConnectedBaseTestCase):
 
4579
    """Test for Unlink ActionQueueCommand."""
 
4580
 
 
4581
    @defer.inlineCallbacks
 
4582
    def setUp(self):
 
4583
        """Init."""
 
4584
        yield super(UnlinkTestCase, self).setUp()
 
4585
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4586
 
 
4587
    def test_handle_success_push_event_file(self):
 
4588
        """Test AQ_UNLINK_OK is pushed on success for a file."""
 
4589
        sample_path = "sample path"
 
4590
        # create a request and fill it with succesful information
 
4591
        request = client.Unlink(self.action_queue.client, VOLUME, 'node_id')
 
4592
        request.new_generation = 13
 
4593
 
 
4594
        # create a command and trigger it success
 
4595
        cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', sample_path,
 
4596
                     False)
 
4597
        cmd.handle_success(request)
 
4598
 
 
4599
        # check for successful event
 
4600
        received = self.action_queue.event_queue.events[0]
 
4601
        info = dict(share_id=VOLUME, parent_id='parent_id',
 
4602
                    node_id='node_id', new_generation=13,
 
4603
                    was_dir=False, old_path=sample_path)
 
4604
        self.assertEqual(received, ('AQ_UNLINK_OK', info))
 
4605
 
 
4606
    def test_handle_success_push_event_directory(self):
 
4607
        """Test AQ_UNLINK_OK is pushed on success for a directory."""
 
4608
        # create a request and fill it with succesful information
 
4609
        request = client.Unlink(self.action_queue.client, VOLUME, 'node_id')
 
4610
        request.new_generation = 13
 
4611
 
 
4612
        # create a command and trigger it success
 
4613
        cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', 'test_path',
 
4614
                     True)
 
4615
        cmd.handle_success(request)
 
4616
 
 
4617
        full_path = "test_path"
 
4618
 
 
4619
        # check for successful event
 
4620
        received = self.action_queue.event_queue.events[0]
 
4621
        info = dict(share_id=VOLUME, parent_id='parent_id',
 
4622
                    node_id='node_id', new_generation=13,
 
4623
                    was_dir=True, old_path=full_path)
 
4624
        self.assertEqual(received, ('AQ_UNLINK_OK', info))
 
4625
 
 
4626
    def test_possible_markers(self):
 
4627
        """Test that it returns the correct values."""
 
4628
        cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', 'path', False)
 
4629
        res = [getattr(cmd, x) for x in cmd.possible_markers]
 
4630
        self.assertEqual(res, ['node_id', 'parent_id'])
 
4631
 
 
4632
    def test_path_locking(self):
 
4633
        """Test that it acquires correctly the path lock."""
 
4634
        t = []
 
4635
        self.patch(PathLockingTree, 'acquire',
 
4636
                   lambda s, *a, **k: t.extend((a, k)))
 
4637
        cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id',
 
4638
                     os.path.join('foo','bar'), False)
 
4639
        cmd._acquire_pathlock()
 
4640
        self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
 
4641
                                              'on_children': True,
 
4642
                                              'logger': None}])
 
4643
 
 
4644
 
 
4645
class MoveTestCase(ConnectedBaseTestCase):
 
4646
    """Test for Move ActionQueueCommand."""
 
4647
 
 
4648
    @defer.inlineCallbacks
 
4649
    def setUp(self):
 
4650
        """Init."""
 
4651
        yield super(MoveTestCase, self).setUp()
 
4652
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4653
 
 
4654
    def test_handle_success_push_event(self):
 
4655
        """Test AQ_MOVE_OK is pushed on success."""
 
4656
        # create a request and fill it with succesful information
 
4657
        request = client.Move(self.action_queue.client, VOLUME, 'node',
 
4658
                                'new_parent', 'new_name')
 
4659
        request.new_generation = 13
 
4660
 
 
4661
        # create a command and trigger it success
 
4662
        cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
 
4663
                   'path_from', 'path_to')
 
4664
        cmd.handle_success(request)
 
4665
 
 
4666
        # check for successful event
 
4667
        received = self.action_queue.event_queue.events[0]
 
4668
        info = dict(share_id=VOLUME, node_id='node', new_generation=13)
 
4669
        self.assertEqual(received, ('AQ_MOVE_OK', info))
 
4670
 
 
4671
    def test_possible_markers(self):
 
4672
        """Test that it returns the correct values."""
 
4673
        cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
 
4674
                   'path_from', 'path_to')
 
4675
        res = [getattr(cmd, x) for x in cmd.possible_markers]
 
4676
        self.assertEqual(res, ['node', 'o_parent', 'n_parent'])
 
4677
 
 
4678
    def test_uniqueness(self):
 
4679
        """Info used for uniqueness."""
 
4680
        cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
 
4681
                   'path_from', 'path_to')
 
4682
        self.assertEqual(cmd.uniqueness, ('Move', VOLUME, 'node'))
 
4683
 
 
4684
    def test_path_locking(self):
 
4685
        """Test that it acquires correctly the path lock."""
 
4686
        t = []
 
4687
        fake_acquire = lambda s, *a, **k: t.extend((a, k)) or defer.succeed(None)
 
4688
        self.patch(PathLockingTree, 'acquire', fake_acquire)
 
4689
        cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
 
4690
                   os.path.join(os.path.sep, 'path', 'from'),
 
4691
                   os.path.join(os.path.sep, 'path', 'to'))
 
4692
        cmd._acquire_pathlock()
 
4693
        should = [
 
4694
            ("", "path", "from"), {'on_parent': True, 'on_children': True,
 
4695
                                                            'logger': None},
 
4696
            ("", "path", "to"), {'on_parent': True, 'logger': None},
 
4697
        ]
 
4698
        self.assertEqual(t, should)
 
4699
 
 
4700
    def test_pathlock_mergepaths(self):
 
4701
        """Merge both path lockings."""
 
4702
        d1 = defer.Deferred()
 
4703
        d2 = defer.Deferred()
 
4704
        fake_defers = [d1, d2]
 
4705
        self.patch(PathLockingTree, 'acquire',
 
4706
                   lambda *a, **k: fake_defers.pop())
 
4707
        cmd = Move(self.rq, VOLUME, 'node', 'o_p', 'n_p', 'n_n', 'p/f', 'p/t')
 
4708
 
 
4709
        # get the path lock, and add a callback to get the release function
 
4710
        dl = cmd._acquire_pathlock()
 
4711
        merge_release = []
 
4712
        dl.addCallback(merge_release.append)
 
4713
 
 
4714
        # prepare marks to check both original releases are called
 
4715
        release_called = []
 
4716
 
 
4717
        # dl is triggered only when d1 and d2
 
4718
        self.assertFalse(dl.called)
 
4719
        d1.callback(lambda: release_called.append(1))
 
4720
        self.assertFalse(dl.called)
 
4721
        d2.callback(lambda: release_called.append(2))
 
4722
        self.assertTrue(dl.called)
 
4723
 
 
4724
        # release!
 
4725
        self.assertFalse(release_called)
 
4726
        merge_release[0]()
 
4727
        self.assertEqual(sorted(release_called), [1, 2])
 
4728
 
 
4729
 
 
4730
class MakeFileTestCase(ConnectedBaseTestCase):
 
4731
    """Test for MakeFile ActionQueueCommand."""
 
4732
 
 
4733
    @defer.inlineCallbacks
 
4734
    def setUp(self):
 
4735
        """Init."""
 
4736
        yield super(MakeFileTestCase, self).setUp()
 
4737
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4738
 
 
4739
    def test_handle_success_push_event(self):
 
4740
        """Test AQ_FILE_NEW_OK is pushed on success."""
 
4741
        # create a request and fill it with succesful information
 
4742
        request = client.MakeFile(self.action_queue.client, VOLUME,
 
4743
                                  'parent', 'name')
 
4744
        request.new_id = 'new_id'
 
4745
        request.new_generation = 13
 
4746
 
 
4747
        # create a command and trigger it success
 
4748
        cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4749
        cmd.handle_success(request)
 
4750
 
 
4751
        # check for successful event
 
4752
        received = self.action_queue.event_queue.events[0]
 
4753
        info = dict(marker='marker', new_id='new_id', new_generation=13,
 
4754
                    volume_id=VOLUME)
 
4755
        self.assertEqual(received, ('AQ_FILE_NEW_OK', info))
 
4756
 
 
4757
    def test_handle_failure_push_event(self):
 
4758
        """Test AQ_FILE_NEW_ERROR is pushed on error."""
 
4759
        # create a request and fill it with succesful information
 
4760
        request = client.MakeFile(self.action_queue.client, VOLUME,
 
4761
                                  'parent', 'name')
 
4762
        request.new_id = 'new_id'
 
4763
        request.new_generation = 13
 
4764
 
 
4765
        # create a command and trigger it fail
 
4766
        cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4767
        failure = Failure(Exception('foo'))
 
4768
        cmd.handle_failure(failure)
 
4769
 
 
4770
        # check for successful event
 
4771
        received = self.action_queue.event_queue.events[0]
 
4772
        info = dict(marker='marker', failure=failure)
 
4773
        self.assertEqual(received, ('AQ_FILE_NEW_ERROR', info))
 
4774
 
 
4775
    def test_possible_markers(self):
 
4776
        """Test that it returns the correct values."""
 
4777
        cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4778
        res = [getattr(cmd, x) for x in cmd.possible_markers]
 
4779
        self.assertEqual(res, [ 'parent'])
 
4780
 
 
4781
    def test_path_locking(self):
 
4782
        """Test that it acquires correctly the path lock."""
 
4783
        t = []
 
4784
        self.patch(PathLockingTree, 'acquire',
 
4785
                   lambda s, *a, **k: t.extend((a, k)))
 
4786
        cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker',
 
4787
                       os.path.join('foo','bar'))
 
4788
        cmd._acquire_pathlock()
 
4789
        self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
 
4790
                                              'logger': None}])
 
4791
 
 
4792
 
 
4793
class MakeDirTestCase(ConnectedBaseTestCase):
 
4794
    """Test for MakeDir ActionQueueCommand."""
 
4795
 
 
4796
    @defer.inlineCallbacks
 
4797
    def setUp(self):
 
4798
        """Init."""
 
4799
        yield super(MakeDirTestCase, self).setUp()
 
4800
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4801
 
 
4802
    def test_handle_success_push_event(self):
 
4803
        """Test AQ_DIR_NEW_OK is pushed on success."""
 
4804
        # create a request and fill it with succesful information
 
4805
        request = client.MakeDir(self.action_queue.client, VOLUME,
 
4806
                                 'parent', 'name')
 
4807
        request.new_id = 'new_id'
 
4808
        request.new_generation = 13
 
4809
 
 
4810
        # create a command and trigger it success
 
4811
        cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4812
        cmd.handle_success(request)
 
4813
 
 
4814
        # check for successful event
 
4815
        received = self.action_queue.event_queue.events[0]
 
4816
        info = dict(marker='marker', new_id='new_id', new_generation=13,
 
4817
                    volume_id=VOLUME)
 
4818
        self.assertEqual(received, ('AQ_DIR_NEW_OK', info))
 
4819
 
 
4820
    def test_handle_failure_push_event(self):
 
4821
        """Test AQ_DIR_NEW_ERROR is pushed on error."""
 
4822
        # create a request and fill it with succesful information
 
4823
        request = client.MakeDir(self.action_queue.client, VOLUME,
 
4824
                                 'parent', 'name')
 
4825
        request.new_id = 'new_id'
 
4826
        request.new_generation = 13
 
4827
 
 
4828
        # create a command and trigger it fail
 
4829
        cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4830
        failure = Failure(Exception('foo'))
 
4831
        cmd.handle_failure(failure)
 
4832
 
 
4833
        # check for successful event
 
4834
        received = self.action_queue.event_queue.events[0]
 
4835
        info = dict(marker='marker', failure=failure)
 
4836
        self.assertEqual(received, ('AQ_DIR_NEW_ERROR', info))
 
4837
 
 
4838
    def test_possible_markers(self):
 
4839
        """Test that it returns the correct values."""
 
4840
        cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
 
4841
        res = [getattr(cmd, x) for x in cmd.possible_markers]
 
4842
        self.assertEqual(res, ['parent'])
 
4843
 
 
4844
    def test_path_locking(self):
 
4845
        """Test that it acquires correctly the path lock."""
 
4846
        t = []
 
4847
        self.patch(PathLockingTree, 'acquire',
 
4848
                   lambda s, *a, **k: t.extend((a, k)))
 
4849
        cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker',
 
4850
                      os.path.join('foo','bar'))
 
4851
        cmd._acquire_pathlock()
 
4852
        self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
 
4853
                                              'logger': None}])
 
4854
 
 
4855
 
 
4856
class TestDeltaList(unittest.TestCase):
 
4857
    """Tests for DeltaList."""
 
4858
 
 
4859
    def test_is_list(self):
 
4860
        """A DeltaList is a list."""
 
4861
        l = [1, 2, 3]
 
4862
        a = DeltaList(l)
 
4863
        self.assertTrue(isinstance(a, list))
 
4864
 
 
4865
    def test_is_equal_list(self):
 
4866
        """A DeltaList is equal to the list it represents."""
 
4867
        l = [1, 2, 3]
 
4868
        a = DeltaList(l)
 
4869
        self.assertEqual(a, l)
 
4870
 
 
4871
    def test_repr(self):
 
4872
        """A DeltaList has a short representation."""
 
4873
        a = DeltaList(["a"*1000])
 
4874
        self.assertTrue(len(repr(a)) < 100)
 
4875
        self.assertTrue(len(str(a)) < 100)
 
4876
 
 
4877
 
 
4878
class AuthenticateTestCase(ConnectedBaseTestCase):
 
4879
    """Tests for authenticate."""
 
4880
 
 
4881
    @defer.inlineCallbacks
 
4882
    def setUp(self):
 
4883
        """Init."""
 
4884
        yield super(AuthenticateTestCase, self).setUp()
 
4885
        self.rq = RequestQueue(action_queue=self.action_queue)
 
4886
 
 
4887
    @defer.inlineCallbacks
 
4888
    def test_session_id_is_logged(self):
 
4889
        """Test that session_id is logged after auth ok."""
 
4890
        request = client.Authenticate(self.action_queue.client,
 
4891
                                      {'dummy_token': 'credentials'})
 
4892
        request.session_id = str(uuid.uuid4())
 
4893
        self.action_queue.client.oauth_authenticate = \
 
4894
                lambda *args: defer.succeed(request)
 
4895
 
 
4896
        yield self.action_queue.authenticate()
 
4897
 
 
4898
        self.assertTrue(self.handler.check_note('Session ID: %r' %
 
4899
                                                str(request.session_id)))
 
4900
 
 
4901
    @defer.inlineCallbacks
 
4902
    def test_send_platform_and_version(self):
 
4903
        """Test that platform and version is sent to the server."""
 
4904
        called = []
 
4905
        def fake_oauth_authenticate(*args, **kwargs):
 
4906
            called.append((args, kwargs))
 
4907
            request = client.Authenticate(self.action_queue.client,
 
4908
                                          {'dummy_token': 'credentials'})
 
4909
            request.session_id = str(uuid.uuid4())
 
4910
            return defer.succeed(request)
 
4911
        self.action_queue.client.oauth_authenticate = fake_oauth_authenticate
 
4912
        yield self.action_queue.authenticate()
 
4913
        self.assertEqual(len(called), 1)
 
4914
        metadata = called[0][0][2]
 
4915
        expected_metadata = {'platform':platform, 'version':clientdefs.VERSION}
 
4916
        self.assertEqual(metadata, expected_metadata)
 
4917
 
 
4918
 
 
4919
class ActionQueueProtocolTests(TwistedTestCase):
 
4920
    """Test the ACQ class."""
 
4921
 
 
4922
    def setUp(self):
 
4923
        """Set up."""
 
4924
        # create an AQP and put a factory to it
 
4925
        self.aqp = ActionQueueProtocol()
 
4926
        obj = Mocker().mock()
 
4927
        obj.event_queue.push('SYS_CONNECTION_MADE')
 
4928
        self.aqp.factory = obj
 
4929
 
 
4930
        # set up the logger
 
4931
        self.handler = MementoHandler()
 
4932
        self.handler.setLevel(logging.DEBUG)
 
4933
        self.aqp.log.addHandler(self.handler)
 
4934
 
 
4935
    def tearDown(self):
 
4936
        """Tear down."""
 
4937
        self.aqp.log.removeHandler(self.handler)
 
4938
        if self.aqp.ping_manager is not None:
 
4939
            self.aqp.ping_manager.stop()
 
4940
 
 
4941
    def test_connection_made(self):
 
4942
        """Connection is made."""
 
4943
        mocker = Mocker()
 
4944
        obj = mocker.mock()
 
4945
        obj.event_queue.push('SYS_CONNECTION_MADE')
 
4946
        self.aqp.factory = obj
 
4947
        super_called = []
 
4948
        self.patch(ThrottlingStorageClient, 'connectionMade',
 
4949
                   lambda s: super_called.append(True))
 
4950
        # test
 
4951
        with mocker:
 
4952
            self.aqp.connectionMade()
 
4953
        self.assertTrue(self.handler.check_info('Connection made.'))
 
4954
        self.assertNotIdentical(self.aqp.ping_manager, None)
 
4955
        self.assertTrue(super_called)
 
4956
 
 
4957
    def test_connection_lost(self):
 
4958
        """Connection is lost."""
 
4959
        super_called = []
 
4960
        self.patch(ThrottlingStorageClient, 'connectionLost',
 
4961
                   lambda s, r: super_called.append(True))
 
4962
        self.aqp.connectionLost('foo')
 
4963
        self.assertTrue(self.handler.check_info(
 
4964
                        'Connection lost, reason: foo.'))
 
4965
        self.assertIdentical(self.aqp.ping_manager, None)
 
4966
        self.assertTrue(super_called)
 
4967
 
 
4968
    def test_ping_connection_made_twice(self):
 
4969
        """If connection made is called twice, don't create two tasks."""
 
4970
        self.aqp.connectionMade()
 
4971
        task1 = self.aqp.ping_manager
 
4972
        self.aqp.connectionMade()
 
4973
        task2 = self.aqp.ping_manager
 
4974
        self.assertNotIdentical(task1, task2)
 
4975
        self.assertFalse(task1._running)
 
4976
        self.assertTrue(task2._running)
 
4977
 
 
4978
    def test_ping_connection_lost_twice(self):
 
4979
        """If connection lost is called twice, don't stop None."""
 
4980
        self.aqp.connectionMade()
 
4981
        self.assertNotIdentical(self.aqp.ping_manager, None)
 
4982
        self.aqp.connectionLost('reason')
 
4983
        self.assertIdentical(self.aqp.ping_manager, None)
 
4984
        self.aqp.connectionLost('reason')
 
4985
        self.assertIdentical(self.aqp.ping_manager, None)
 
4986
 
 
4987
    def test_init_max_payload_size(self):
 
4988
        """Configure max_payload_size on init according to config."""
 
4989
        user_config = config.get_user_config()
 
4990
        user_config.set_max_payload_size(12345)
 
4991
        aqp = ActionQueueProtocol()
 
4992
        self.assertEqual(aqp.max_payload_size, 12345)
 
4993
 
 
4994
 
 
4995
class CommandCycleTestCase(BasicTestCase):
 
4996
    """Test the command behaviour on run, retry, stop, etc.
 
4997
 
 
4998
    These tests are not exactly unit tests, but more about integration
 
4999
    between the queue and the command, and how the command life cycle is.
 
5000
    """
 
5001
 
 
5002
    @defer.inlineCallbacks
 
5003
    def setUp(self):
 
5004
        """Set up."""
 
5005
        yield super(CommandCycleTestCase, self).setUp()
 
5006
 
 
5007
        class MyCommand(ActionQueueCommand):
 
5008
            """Monkeypatchable AQC."""
 
5009
 
 
5010
            def _acquire_pathlock(self):
 
5011
                pathlock = PathLockingTree()
 
5012
                return pathlock.acquire('foo')
 
5013
 
 
5014
        self.queue = RequestQueue(action_queue=self.action_queue)
 
5015
        self.queue.run()
 
5016
        self.cmd = MyCommand(self.queue)
 
5017
 
 
5018
    def _check_finished_ok(self, cmd=None):
 
5019
        """Check that command finished ok."""
 
5020
        if cmd is None:
 
5021
            cmd = self.cmd
 
5022
        self.assertFalse(cmd.running)
 
5023
        self.assertNotIn(cmd, self.queue.waiting)
 
5024
        self.assertFalse(self.action_queue.pathlock.count)
 
5025
 
 
5026
    def test_simple_start_end_ok_queue_active(self):
 
5027
        """Simple normal cycle, ending ok, queued while active."""
 
5028
        # check _run was called, and returned ok
 
5029
        called = []
 
5030
        self.cmd._run = lambda: called.append(1) or defer.succeed(2)
 
5031
 
 
5032
        # check handled success
 
5033
        self.cmd.handle_success = lambda a: called.append(a)
 
5034
 
 
5035
        # let the command go
 
5036
        self.cmd.go()
 
5037
 
 
5038
        # all check
 
5039
        self.assertEqual(called, [1, 2])
 
5040
        self._check_finished_ok()
 
5041
 
 
5042
    def test_simple_start_end_bad_queue_active(self):
 
5043
        """Simple normal cycle, ending bad, queued while active.."""
 
5044
        # check _run was called, and returned bad
 
5045
        called = []
 
5046
        exc = ValueError('foo')
 
5047
        self.cmd._run = lambda: called.append(1) or defer.fail(Failure(exc))
 
5048
        self.cmd.suppressed_error_messages.append(ValueError)
 
5049
 
 
5050
        # check handled failure
 
5051
        self.cmd.handle_failure = lambda f: called.append(f.value)
 
5052
 
 
5053
        # let the command go
 
5054
        self.cmd.go()
 
5055
 
 
5056
        # all check
 
5057
        self.assertEqual(called, [1, exc])
 
5058
        self._check_finished_ok()
 
5059
 
 
5060
    def test_simple_start_end_ok_queue_notactive(self):
 
5061
        """Simple normal cycle, ending ok, queued while not active."""
 
5062
        # check _run was called, and returned ok
 
5063
        called = []
 
5064
        self.cmd._run = lambda: called.append(1) or defer.succeed(2)
 
5065
 
 
5066
        # check handled success
 
5067
        self.cmd.handle_success = lambda a: called.append(a)
 
5068
 
 
5069
        # stop the queue and let the command go
 
5070
        self.queue.stop()
 
5071
        self.cmd.go()
 
5072
 
 
5073
        # active the queue later
 
5074
        self.queue.run()
 
5075
 
 
5076
        # all check
 
5077
        self.assertEqual(called, [1, 2])
 
5078
        self._check_finished_ok()
 
5079
 
 
5080
    def test_simple_start_end_bad_queue_notactive(self):
 
5081
        """Simple normal cycle, ending bad, queued while not active.."""
 
5082
        # check _run was called, and returned bad
 
5083
        called = []
 
5084
        exc = ValueError('foo')
 
5085
        self.cmd._run = lambda: called.append(1) or defer.fail(Failure(exc))
 
5086
        self.cmd.suppressed_error_messages.append(ValueError)
 
5087
 
 
5088
        # check handled failure
 
5089
        self.cmd.handle_failure = lambda f: called.append(f.value)
 
5090
 
 
5091
        # stop the queue and let the command go
 
5092
        self.queue.stop()
 
5093
        self.cmd.go()
 
5094
 
 
5095
        # active the queue later
 
5096
        self.queue.run()
 
5097
 
 
5098
        # all check
 
5099
        self.assertEqual(called, [1, exc])
 
5100
        self._check_finished_ok()
 
5101
 
 
5102
    def test_condition_goes_ok(self):
 
5103
        """Command not run initially, but yes when conditions are ok."""
 
5104
        # check _run was called, and returned ok
 
5105
        called = []
 
5106
        self.cmd._run = lambda: called.append('run') or defer.succeed('finish')
 
5107
 
 
5108
        # check handled success
 
5109
        self.cmd.handle_success = lambda a: called.append(a)
 
5110
 
 
5111
        # command not ready to run
 
5112
        self.cmd.is_runnable = False
 
5113
 
 
5114
        # let the command go, it will not run
 
5115
        self.cmd.go()
 
5116
        self.assertEqual(called, [])
 
5117
 
 
5118
        # fix conditions and check them
 
5119
        self.cmd.is_runnable = True
 
5120
        self.action_queue.conditions_locker.check_conditions()
 
5121
 
 
5122
        # all check
 
5123
        self.assertEqual(called, ['run', 'finish'])
 
5124
        self._check_finished_ok()
 
5125
 
 
5126
    def test_check_conditions_while_running(self):
 
5127
        """Check conditions while the command is running.
 
5128
 
 
5129
        Check conditions can be executed at any time, we need to avoid
 
5130
        running the command twice.
 
5131
        """
 
5132
        # monkeypatch _run to flag called and test "while running"
 
5133
        called = []
 
5134
        d = defer.Deferred()
 
5135
        self.cmd._run = lambda: called.append(1) or d
 
5136
 
 
5137
        # check handled success
 
5138
        self.cmd.handle_success = lambda a: called.append(a)
 
5139
 
 
5140
        # let the command go
 
5141
        self.cmd.go()
 
5142
 
 
5143
        # before the command finishes, all conditions are checked
 
5144
        self.action_queue.conditions_locker.check_conditions()
 
5145
 
 
5146
        # command finished
 
5147
        d.callback(2)
 
5148
 
 
5149
        # all check
 
5150
        self.assertEqual(called, [1, 2])
 
5151
        self._check_finished_ok()
 
5152
 
 
5153
    def test_disconnect_connect_running_with_error(self):
 
5154
        """Simulate a disconnection and connection while command running."""
 
5155
 
 
5156
        def f1():
 
5157
            """Disconnect: stop the queue and fail with connection lost."""
 
5158
            self.queue.stop()
 
5159
            failure = Failure(twisted_error.ConnectionLost())
 
5160
            return defer.fail(failure)
 
5161
 
 
5162
        def f2():
 
5163
            """Run ok."""
 
5164
            return defer.succeed('finish')
 
5165
 
 
5166
        called = []
 
5167
        run_functions = [f1, f2]
 
5168
        self.cmd._run = lambda: called.append('run') or run_functions.pop(0)()
 
5169
 
 
5170
        # check handled success and cleanup
 
5171
        self.cmd.handle_success = lambda a: called.append(a)
 
5172
        self.cmd.cleanup = lambda: called.append('clean')
 
5173
 
 
5174
        # let the command go
 
5175
        self.cmd.go()
 
5176
 
 
5177
        # reconnect
 
5178
        self.queue.run()
 
5179
 
 
5180
        # all check
 
5181
        self.assertEqual(called, ['run', 'clean', 'clean', 'run', 'finish'])
 
5182
        self._check_finished_ok()
 
5183
 
 
5184
    def test_disconnect_connect_pathlocked(self):
 
5185
        """Simulate a disconnection and connection while waiting pathlock."""
 
5186
        # check it called run
 
5187
        called = []
 
5188
        self.cmd._run = lambda: called.append('run') or defer.succeed('finish')
 
5189
 
 
5190
        # monkeypatch to test "while waiting pathlock"
 
5191
        d = defer.Deferred()
 
5192
        self.cmd._acquire_pathlock = lambda: d
 
5193
 
 
5194
        # check handled success
 
5195
        self.cmd.handle_success = lambda a: called.append(a)
 
5196
 
 
5197
        # let the command go
 
5198
        self.cmd.go()
 
5199
 
 
5200
        # before the pathlock is released, we disconnect, and reconnect
 
5201
        self.queue.stop()
 
5202
        self.queue.run()
 
5203
 
 
5204
        # release the pathlock
 
5205
        d.callback(None)
 
5206
 
 
5207
        # all check
 
5208
        self.assertEqual(called, ['run', 'finish'])
 
5209
        self._check_finished_ok()
 
5210
 
 
5211
    @defer.inlineCallbacks
 
5212
    def test_retry_immediate(self):
 
5213
        """Retry the command immediately."""
 
5214
        finished = defer.Deferred()
 
5215
        called = []
 
5216
        exc = twisted_error.ConnectionDone() # retryable!
 
5217
        run_deferreds = [defer.fail(Failure(exc)), defer.succeed('finish')]
 
5218
        self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
 
5219
        self.cmd.handle_retryable = lambda f: called.append(f.value)
 
5220
 
 
5221
        # check handle success (failure is never called because it's retried)
 
5222
        self.cmd.handle_success = lambda a: called.append(a)
 
5223
 
 
5224
        # need to wait finish() called, to be sure all ended ok, because of
 
5225
        # the callLater for the retry
 
5226
        def fake_finish():
 
5227
            ActionQueueCommand.finish(self.cmd)
 
5228
            finished.callback(True)
 
5229
        self.cmd.finish = fake_finish
 
5230
 
 
5231
        # let the command go
 
5232
        self.cmd.go()
 
5233
 
 
5234
        # need to wait the callLater
 
5235
        yield finished
 
5236
 
 
5237
        # all check
 
5238
        self.assertEqual(called, ['run', exc, 'run', 'finish'])
 
5239
        self._check_finished_ok()
 
5240
 
 
5241
    def test_retry_conditions_solved(self):
 
5242
        """Retry the command because conditions solved later."""
 
5243
        finished = defer.Deferred()
 
5244
        called = []
 
5245
 
 
5246
        def f1():
 
5247
            """Fail and make conditions not ok to run."""
 
5248
            self.cmd.is_runnable = False
 
5249
            failure = Failure(twisted_error.ConnectionDone()) # retryable!
 
5250
            return defer.fail(failure)
 
5251
 
 
5252
        def f2():
 
5253
            """Run ok."""
 
5254
            return defer.succeed('finish')
 
5255
 
 
5256
        run_functions = [f1, f2]
 
5257
        self.cmd._run = lambda: called.append('run') or run_functions.pop(0)()
 
5258
 
 
5259
        # check handle success (failure is never called because it's retried)
 
5260
        self.cmd.handle_success = lambda a: called.append(a)
 
5261
 
 
5262
        # need to wait finish() called, to be sure all ended ok, because of
 
5263
        # the callLater for the retry
 
5264
        def fake_finish():
 
5265
            ActionQueueCommand.finish(self.cmd)
 
5266
            finished.callback(True)
 
5267
        self.cmd.finish = fake_finish
 
5268
 
 
5269
        # let the command go, it will fail and wait for conditions
 
5270
        self.cmd.go()
 
5271
        self.assertEqual(called, ['run'])
 
5272
 
 
5273
        # fix conditions
 
5274
        self.cmd.is_runnable = True
 
5275
        self.action_queue.conditions_locker.check_conditions()
 
5276
 
 
5277
        # need to wait the callLater
 
5278
        yield finished
 
5279
 
 
5280
        # all check
 
5281
        self.assertEqual(called, ['run', 'run', 'finish'])
 
5282
        self._check_finished_ok()
 
5283
 
 
5284
    def test_cancel_while_running(self):
 
5285
        """Cancel the command while running."""
 
5286
        # monkeypatch _run to flag called and test "while running"
 
5287
        called = []
 
5288
        d = defer.Deferred()
 
5289
        self.cmd._run = lambda: called.append(1) or d
 
5290
 
 
5291
        # check cleanup
 
5292
        self.cmd.cleanup = lambda: called.append(2)
 
5293
        def fake_finish():
 
5294
            """Flag and call the real one."""
 
5295
            called.append(3)
 
5296
            ActionQueueCommand.finish(self.cmd)
 
5297
        self.cmd.finish = fake_finish
 
5298
 
 
5299
        # let the command go
 
5300
        self.cmd.go()
 
5301
 
 
5302
        # before it finishes, cancel
 
5303
        self.cmd.cancel()
 
5304
 
 
5305
        # all check
 
5306
        self.assertTrue(self.cmd.cancelled)
 
5307
        self.assertEqual(called, [1, 2, 3])
 
5308
        self._check_finished_ok()
 
5309
 
 
5310
    def test_cancel_while_pathclocked(self):
 
5311
        """Cancel the command while running."""
 
5312
        # monkeypatch _run to flag called and test "while running"
 
5313
        called = []
 
5314
        self.cmd.run = lambda: called.append('should not')
 
5315
 
 
5316
        # monkeypatch to test "while waiting pathlock"
 
5317
        d = defer.Deferred()
 
5318
        self.cmd._acquire_pathlock = lambda: d
 
5319
 
 
5320
        # let the command go, and cancel in the middle
 
5321
        self.cmd.go()
 
5322
        self.cmd.cancel()
 
5323
 
 
5324
        # unlock the pathlock
 
5325
        d.callback(lambda: called.append(1))
 
5326
 
 
5327
        # all check
 
5328
        self.assertEqual(called, [1])
 
5329
        self._check_finished_ok()
 
5330
 
 
5331
    def test_cancel_while_waiting_conditions(self):
 
5332
        """Cancel the command while waiting for conditions."""
 
5333
        # make it not runnable, and fake the pathlock to test releasing
 
5334
        self.cmd.is_runnable = False
 
5335
        released = []
 
5336
        self.cmd._acquire_pathlock = lambda: defer.succeed(
 
5337
                                                lambda: released.append(True))
 
5338
 
 
5339
        # let the command go (will stuck because not runnable), and
 
5340
        # cancel in the middle
 
5341
        self.cmd.go()
 
5342
        self.cmd.cancel()
 
5343
 
 
5344
        # all check
 
5345
        self._check_finished_ok()
 
5346
        self.assertTrue(released)
 
5347
 
 
5348
    def test_cancel_while_waiting_queue(self):
 
5349
        """Cancel the command while waiting for queue."""
 
5350
        # stop the queue, and fake the pathlock to test releasing
 
5351
        self.queue.stop()
 
5352
        released = []
 
5353
        self.cmd._acquire_pathlock = lambda: defer.succeed(
 
5354
                                                lambda: released.append(True))
 
5355
 
 
5356
        # let the command go (will stuck because not runnable), and
 
5357
        # cancel in the middle
 
5358
        self.cmd.go()
 
5359
        self.cmd.cancel()
 
5360
 
 
5361
        # now, set the queue active again, it should release everything
 
5362
        # even if was cancelled before
 
5363
        self.queue.run()
 
5364
 
 
5365
        # all check
 
5366
        self._check_finished_ok()
 
5367
        self.assertTrue(released)
 
5368
 
 
5369
    def test_marker_error_while_pathclocked(self):
 
5370
        """The marker errbacks while the command is waiting the pathlock."""
 
5371
        # monkeypatch methods to flag called and test "while running"
 
5372
        called = []
 
5373
        self.cmd.cleanup = lambda: called.append('cleanup')
 
5374
        self.cmd.handle_failure = lambda f: called.append('handle_failure')
 
5375
        self.cmd.run = lambda: called.append('should not')
 
5376
 
 
5377
        # finish is special as we need to really run it
 
5378
        def fake_finish():
 
5379
            """Flag and call the real one."""
 
5380
            called.append('finish')
 
5381
            ActionQueueCommand.finish(self.cmd)
 
5382
        self.cmd.finish = fake_finish
 
5383
 
 
5384
        # do not let demark callback the marker
 
5385
        self.cmd.demark = lambda: None
 
5386
 
 
5387
        # monkeypatch to test "while waiting pathlock"
 
5388
        d = defer.Deferred()
 
5389
        self.cmd._acquire_pathlock = lambda: d
 
5390
 
 
5391
        # let the command go, and errback the marker deferred
 
5392
        self.cmd.go()
 
5393
        self.cmd.markers_resolved_deferred.errback(ValueError('foo'))
 
5394
 
 
5395
        # unlock the pathlock
 
5396
        d.callback(lambda: True)
 
5397
 
 
5398
        # all check
 
5399
        self.assertEqual(called, ['cleanup', 'handle_failure', 'finish'])
 
5400
        self._check_finished_ok()
 
5401
 
 
5402
    def test_cancel_while_transfer_locked(self):
 
5403
        """Cancel the command while waiting for transfer semaphore.
 
5404
 
 
5405
        The semaphore lock must be released! Of course, this test is on
 
5406
        download/upload commands.
 
5407
        """
 
5408
        cmd = Upload(self.queue, share_id='a_share_id', node_id='a_node_id',
 
5409
                     previous_hash='prev_hash', hash='yadda', crc32=0, size=0,
 
5410
                     path='path', fileobj_factory=lambda: None)
 
5411
 
 
5412
        # patch the command to simulate a request to an already full
 
5413
        # transfer semaphore in _start
 
5414
        transfers_semaphore = self.queue.transfers_semaphore
 
5415
        semaphores = []
 
5416
        user_config = config.get_user_config()
 
5417
        for i in xrange(user_config.get_simult_transfers()):
 
5418
            s = transfers_semaphore.acquire()
 
5419
            s.addCallback(semaphores.append)
 
5420
 
 
5421
        # let the command go, and cancel in the middle
 
5422
        cmd.go()
 
5423
        cmd.cancel()
 
5424
 
 
5425
        # release previous semaphores
 
5426
        for s in semaphores:
 
5427
            s.release()
 
5428
 
 
5429
        # semaphore released
 
5430
        self.assertIdentical(cmd.tx_semaphore, None)
 
5431
        self._check_finished_ok(cmd)
 
5432
 
 
5433
    def test_disconnect_connect_running_no_error(self):
 
5434
        """Simulate a disconnection and connection while running.
 
5435
 
 
5436
        Sometimes there's no error on the command (ConnectionLost) because the
 
5437
        command got into running after the network was lost :(
 
5438
        """
 
5439
        called = []
 
5440
        d1 = defer.Deferred()
 
5441
        d2 = defer.Deferred()
 
5442
        run_deferreds = [d1, d2]
 
5443
        self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
 
5444
 
 
5445
        # check handled success and cleanup
 
5446
        self.cmd.handle_success = lambda a: called.append(a)
 
5447
        self.cmd.cleanup = lambda: called.append('clean')
 
5448
 
 
5449
        # let the command go, it will stuck in d1
 
5450
        self.cmd.go()
 
5451
 
 
5452
        # disconnect and connect, and then trigger d2 for the command to finish
 
5453
        self.queue.stop()
 
5454
        self.queue.run()
 
5455
        d2.callback('finish')
 
5456
 
 
5457
        # all check
 
5458
        self.assertEqual(called, ['run', 'clean', 'run', 'finish'])
 
5459
        self._check_finished_ok()
 
5460
 
 
5461
 
 
5462
class InterruptibleDeferredTests(TwistedTestCase):
 
5463
    """Test the InterruptibleDeferred behaviour."""
 
5464
 
 
5465
    @defer.inlineCallbacks
 
5466
    def test_original_callbacked(self):
 
5467
        """Original deferred is callbacked."""
 
5468
        origdef = defer.Deferred()
 
5469
        intrdef = InterruptibleDeferred(origdef)
 
5470
        origdef.callback('foo')
 
5471
        result = yield intrdef
 
5472
        self.assertEqual(result, 'foo')
 
5473
 
 
5474
        # later we can interrupt, nothing happens
 
5475
        intrdef.interrupt()
 
5476
        self.assertFalse(intrdef.interrupted)
 
5477
 
 
5478
    @defer.inlineCallbacks
 
5479
    def test_original_errbacked(self):
 
5480
        """Original deferred is errbacked."""
 
5481
        origdef = defer.Deferred()
 
5482
        intrdef = InterruptibleDeferred(origdef)
 
5483
        origdef.errback(ValueError('foo'))
 
5484
        try:
 
5485
            yield intrdef
 
5486
        except ValueError, e:
 
5487
            self.assertEqual(str(e), 'foo')
 
5488
        else:
 
5489
            self.fail("Test should have raised an exception")
 
5490
 
 
5491
        # later we can interrupt, nothing happens
 
5492
        intrdef.interrupt()
 
5493
        self.assertFalse(intrdef.interrupted)
 
5494
 
 
5495
 
 
5496
    def test_interrupt_except(self):
 
5497
        """Interrupt!"""
 
5498
        intrdef = InterruptibleDeferred(defer.Deferred())
 
5499
        intrdef.interrupt()
 
5500
 
 
5501
        try:
 
5502
            yield intrdef
 
5503
        except DeferredInterrupted:
 
5504
            self.assertTrue(intrdef.interrupted)
 
5505
        else:
 
5506
            self.fail("Test should have raised an exception")
 
5507
 
 
5508
    def test_interrupt_callback_original(self):
 
5509
        """Interrupt silences further original callbacks."""
 
5510
        origdef = defer.Deferred()
 
5511
        intrdef = InterruptibleDeferred(origdef)
 
5512
        intrdef.interrupt()
 
5513
 
 
5514
        try:
 
5515
            yield intrdef
 
5516
        except DeferredInterrupted:
 
5517
            pass  # just silecen the exception
 
5518
        else:
 
5519
            self.fail("Test should have raised an exception")
 
5520
 
 
5521
        # further callback to original deferred is harmless
 
5522
        origdef.callback("foo")
 
5523
 
 
5524
    def test_interrupt_errback_original(self):
 
5525
        """Interrupt silences further original errbacks."""
 
5526
        origdef = defer.Deferred()
 
5527
        intrdef = InterruptibleDeferred(origdef)
 
5528
        intrdef.interrupt()
 
5529
 
 
5530
        try:
 
5531
            yield intrdef
 
5532
        except DeferredInterrupted:
 
5533
            pass  # just silecen the exception
 
5534
        else:
 
5535
            self.fail("Test should have raised an exception")
 
5536
 
 
5537
        # further callback to original deferred is harmless
 
5538
        origdef.errback(ValueError('foo'))
 
5539
 
 
5540
 
 
5541
class ConditionsLockerTests(TwistedTestCase):
 
5542
    """Test the ConditionsLocker."""
 
5543
 
 
5544
    def setUp(self):
 
5545
        """Set up."""
 
5546
        self.cl = ConditionsLocker()
 
5547
 
 
5548
    def test_get_locking_deferred_returns_deferred(self):
 
5549
        """The locking is done by a deferred."""
 
5550
        d = self.cl.get_lock('command')
 
5551
        d.callback(True)
 
5552
        return d
 
5553
 
 
5554
    def test_get_locking_different_commands_different_deferreds(self):
 
5555
        """Asked by two commands, get two deferreds."""
 
5556
        d1 = self.cl.get_lock('command1')
 
5557
        d2 = self.cl.get_lock('command2')
 
5558
        self.assertNotIdentical(d1, d2)
 
5559
 
 
5560
    def test_get_locking_same_command_same_deferred(self):
 
5561
        """If asked twice by the same command, return the same deferred.
 
5562
 
 
5563
        This is more a safe guard than a feature; if misused by the same
 
5564
        command we're assuring than we will not overwrite a second deferred
 
5565
        over the first one (so, never releasing the first one).
 
5566
        """
 
5567
        d1 = self.cl.get_lock('command')
 
5568
        d2 = self.cl.get_lock('command')
 
5569
        self.assertIdentical(d1, d2)
 
5570
 
 
5571
    def test_check_conditions_simple_runnable(self):
 
5572
        """Release the command."""
 
5573
        cmd = FakeCommand()
 
5574
        locking_d = self.cl.get_lock(cmd)
 
5575
        self.assertFalse(locking_d.called)
 
5576
        self.assertIn(cmd, self.cl.locked)
 
5577
 
 
5578
        # release it!
 
5579
        assert cmd.is_runnable
 
5580
        self.cl.check_conditions()
 
5581
        self.assertTrue(locking_d.called)
 
5582
        self.assertNotIn(cmd, self.cl.locked)
 
5583
 
 
5584
    def test_check_conditions_simple_notrunnable_then_ok(self):
 
5585
        """First don't release the command, then release it."""
 
5586
        cmd = FakeCommand()
 
5587
        locking_d = self.cl.get_lock(cmd)
 
5588
        self.assertFalse(locking_d.called)
 
5589
 
 
5590
        # check for conditions, do not release
 
5591
        cmd.is_runnable = False
 
5592
        self.cl.check_conditions()
 
5593
        self.assertFalse(locking_d.called)
 
5594
 
 
5595
        # conditions are ok now, release
 
5596
        cmd.is_runnable = True
 
5597
        self.cl.check_conditions()
 
5598
        self.assertTrue(locking_d.called)
 
5599
 
 
5600
    def test_check_conditions_mixed(self):
 
5601
        """Several commands, mixed situation."""
 
5602
        cmd1 = FakeCommand()
 
5603
        cmd1.is_runnable = False
 
5604
        cmd2 = FakeCommand()
 
5605
        assert cmd2.is_runnable
 
5606
 
 
5607
        # get lock for both, and check conditions
 
5608
        locking_d1 = self.cl.get_lock(cmd1)
 
5609
        locking_d2 = self.cl.get_lock(cmd2)
 
5610
        self.cl.check_conditions()
 
5611
 
 
5612
        # one should be released, the other should not
 
5613
        self.assertFalse(locking_d1.called)
 
5614
        self.assertTrue(locking_d2.called)
 
5615
 
 
5616
    def test_cancel_command_nothold(self):
 
5617
        """It's ok to cancel a command not there."""
 
5618
        self.cl.cancel_command('command')
 
5619
 
 
5620
    def test_cancel_releases_cancelled_command(self):
 
5621
        """It releases the cancelled command, even not runnable."""
 
5622
        cmd1 = FakeCommand()
 
5623
        cmd1.is_runnable = False
 
5624
        cmd2 = FakeCommand()
 
5625
        assert cmd2.is_runnable
 
5626
 
 
5627
        # get lock for both, and cancel only 1
 
5628
        locking_d1 = self.cl.get_lock(cmd1)
 
5629
        locking_d2 = self.cl.get_lock(cmd2)
 
5630
        self.cl.cancel_command(cmd1)
 
5631
 
 
5632
        # 1 should be released, 2 should not (even with conditions ok)
 
5633
        self.assertTrue(locking_d1.called)
 
5634
        self.assertFalse(locking_d2.called)
 
5635
 
 
5636
 
 
5637
class OsIntegrationTests(BasicTestCase, MockerTestCase):
 
5638
    """Ensure that the correct os_helper methods are used."""
 
5639
 
 
5640
    @defer.inlineCallbacks
 
5641
    def setUp(self):
 
5642
        """Set up for tests."""
 
5643
        yield super(OsIntegrationTests, self).setUp()
 
5644
        self.fdopen = self.mocker.replace('os.fdopen')
 
5645
 
 
5646
    def test_fdopen(self):
 
5647
        """Ensure that we are calling fdopen correctly."""
 
5648
        # set expectations
 
5649
        self.fdopen(ANY, 'w+b')
 
5650
        self.mocker.replay()
 
5651
        NamedTemporaryFile()
 
5652
 
 
5653
    def test_fdopen_real(self):
 
5654
        """Do test that the NamedTeporaryFile can read and write."""
 
5655
        data = 'test'
 
5656
        self.mocker.replay()
 
5657
        tmp = NamedTemporaryFile()
 
5658
        tmp.write(data)
 
5659
        tmp.seek(0)
 
5660
        self.assertEqual(data, tmp.read(len(data)))
 
5661
        tmp.close()
 
5662
 
 
5663
 
 
5664
class PingManagerTestCase(TwistedTestCase):
 
5665
    """Test the Ping manager."""
 
5666
 
 
5667
    def setUp(self):
 
5668
        """Set up."""
 
5669
        class FakeActionQueueProtocol(object):
 
5670
            """Fake object for the tests."""
 
5671
            log = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
 
5672
            log.setLevel(logger.TRACE)
 
5673
            ping = lambda self: defer.Deferred()
 
5674
 
 
5675
        self.fake_aqp = FakeActionQueueProtocol()
 
5676
        self.handler = MementoHandler()
 
5677
        self.fake_aqp.log.addHandler(self.handler)
 
5678
        self.pm = PingManager(self.fake_aqp)
 
5679
 
 
5680
    def tearDown(self):
 
5681
        """Tear down."""
 
5682
        self.fake_aqp.log.removeHandler(self.handler)
 
5683
        self.pm.stop()
 
5684
 
 
5685
    def test_init(self):
 
5686
        """On start values."""
 
5687
        self.assertTrue(self.pm._running)
 
5688
        self.assertTrue(self.pm._loop.running)
 
5689
        self.assertIdentical(self.pm._timeout_call, None)
 
5690
 
 
5691
    def test_ping_do_ping(self):
 
5692
        """Ping and log."""
 
5693
        # mock the request
 
5694
        mocker = Mocker()
 
5695
        req = mocker.mock()
 
5696
        expect(req.rtt).result(1.123123)
 
5697
        d = defer.Deferred()
 
5698
        self.pm.client.ping = lambda: d
 
5699
 
 
5700
        # call and check all is started when the ping is done
 
5701
        self.pm._do_ping()
 
5702
        self.assertTrue(self.pm._timeout_call.active())
 
5703
        self.handler.debug  = True
 
5704
        self.assertTrue(self.handler.check(logger.TRACE, 'Sending ping'))
 
5705
 
 
5706
        # answer the ping, and check
 
5707
        with mocker:
 
5708
            d.callback(req)
 
5709
        self.assertFalse(self.pm._timeout_call.active())
 
5710
        self.assertTrue(self.handler.check_debug('Ping! rtt: 1.123 segs'))
 
5711
 
 
5712
    def test_stop_if_running(self):
 
5713
        """Normal stop."""
 
5714
        assert self.pm._running
 
5715
        called = []
 
5716
        self.patch(self.pm, '_stop', lambda: called.append(True))
 
5717
        self.pm.stop()
 
5718
        self.assertFalse(self.pm._running)
 
5719
        self.assertTrue(called)
 
5720
        self.pm._running = True  # back to True so it's properly stopped later
 
5721
 
 
5722
    def test_stop_if_not_running(self):
 
5723
        """Stopping the stopped."""
 
5724
        self.pm.stop()
 
5725
        assert not self.pm._running
 
5726
        called = []
 
5727
        self.pm._stop = lambda: called.append(True)
 
5728
        self.pm.stop()
 
5729
        self.assertFalse(self.pm._running)
 
5730
        self.assertFalse(called)
 
5731
 
 
5732
    def test_real_stop_loop(self):
 
5733
        """The loop is stopped."""
 
5734
        assert self.pm._loop.running
 
5735
        self.pm.stop()
 
5736
        assert not self.pm._loop.running
 
5737
 
 
5738
    def test_real_stop_timeout_call_None(self):
 
5739
        """It can be stopped while not having a timeout call."""
 
5740
        assert self.pm._timeout_call is None
 
5741
        self.pm.stop()
 
5742
 
 
5743
    def test_real_stop_timeout_call_active(self):
 
5744
        """The timeout call is cancelled if active."""
 
5745
        self.pm._do_ping()
 
5746
        assert self.pm._timeout_call.active()
 
5747
        self.pm.stop()
 
5748
        self.assertFalse(self.pm._timeout_call.active())
 
5749
 
 
5750
    def test_real_stop_timeout_call_not_active(self):
 
5751
        """It can be stopped while the timeout call is not active."""
 
5752
        self.pm._do_ping()
 
5753
        self.pm._timeout_call.cancel()
 
5754
        assert not self.pm._timeout_call.active()
 
5755
        self.pm.stop()
 
5756
        self.assertFalse(self.pm._timeout_call.active())
 
5757
 
 
5758
    def test_disconnect(self):
 
5759
        """Stop machinery, log and disconnect."""
 
5760
        mocker = Mocker()
 
5761
 
 
5762
        # mock the transport
 
5763
        transport = mocker.mock()
 
5764
        expect(transport.loseConnection())
 
5765
        self.pm.client.transport = transport
 
5766
 
 
5767
        # mock the stop
 
5768
        stop = mocker.mock()
 
5769
        expect(stop())
 
5770
        self.patch(self.pm, 'stop', stop)
 
5771
 
 
5772
        # ping will be called, and req accessed, otherwise mocker will complain
 
5773
        with mocker:
 
5774
            self.pm._disconnect()
 
5775
 
 
5776
        self.assertTrue(self.handler.check_info("No Pong response"))