~thisfred/ubuntuone-client/lp-737150

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_action_queue.py

Resolved two situations that hung some commands (LP: #724058)

Show diffs side-by-side

added added

removed removed

Lines of Context:
59
59
from ubuntuone.syncdaemon.action_queue import (
60
60
    ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
61
61
    DeleteVolume, Download, ListVolumes, ActionQueueProtocol,
62
 
    RequestQueue, UploadProgressWrapper, Upload,
 
62
    RequestQueue, UploadProgressWrapper, Upload, SIMULT_TRANSFERS,
63
63
    CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
64
64
    TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
65
65
    ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
 
66
    InterruptibleDeferred, DeferredInterrupted,
66
67
)
67
68
from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
68
69
from ubuntuone.syncdaemon.marker import MDMarker
103
104
    """Yet another fake action queue command."""
104
105
 
105
106
    is_runnable = True
106
 
    cleaned = False
 
107
    paused = False
107
108
    resumed = False
108
109
    conditions_checked = False
109
110
 
115
116
 
116
117
    run = lambda self: defer.succeed(True)
117
118
 
118
 
    def cleanup(self):
119
 
        """Mark as cleaned."""
120
 
        self.cleaned = True
 
119
    def pause(self):
 
120
        """Mark as paused."""
 
121
        self.paused = True
121
122
 
122
123
    def resume(self):
123
124
        """Mark as resumed."""
560
561
        self.rq.stop()
561
562
        self.assertFalse(self.rq.active)
562
563
 
563
 
    def test_stop_clean_commands(self):
564
 
        """Clean up all queued commands on stop."""
 
564
    def test_stop_pauses_commands(self):
 
565
        """Pauses all queued commands on stop."""
565
566
        # set up
566
567
        cmd1 = FakeCommand()
567
568
        cmd2 = FakeCommand()
568
569
        self.rq.waiting.extend((cmd1, cmd2))
569
 
        assert not cmd1.cleaned and not cmd2.cleaned
 
570
        assert not cmd1.paused and not cmd2.paused
570
571
 
571
572
        # stop and test
572
573
        self.rq.stop()
573
 
        self.assertTrue(cmd1.cleaned)
574
 
        self.assertTrue(cmd2.cleaned)
 
574
        self.assertTrue(cmd1.paused)
 
575
        self.assertTrue(cmd2.paused)
575
576
 
576
577
    def test_check_conditions(self):
577
578
        """Check all conditions on the commands."""
1778
1779
 
1779
1780
        self.rq.active = False
1780
1781
        called = []
1781
 
        self.cmd._run = lambda: called.append(True)
 
1782
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
1782
1783
 
1783
1784
        # run first time
1784
1785
        self.cmd.run()
1797
1798
 
1798
1799
        self.cmd.is_runnable = False
1799
1800
        called = []
1800
 
        self.cmd._run = lambda: called.append(True)
 
1801
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
1801
1802
 
1802
1803
        # run first time
1803
1804
        self.cmd.run()
1815
1816
        assert self.cmd.is_runnable
1816
1817
        self.rq.active = False
1817
1818
        called = []
1818
 
        self.cmd._run = lambda: called.append(True)
 
1819
        self.cmd._run = lambda: called.append(True) or defer.Deferred()
1819
1820
 
1820
1821
        # run first time
1821
1822
        self.cmd.run()
1963
1964
        self.cmd.run()
1964
1965
        self.assertFalse(called)
1965
1966
 
 
1967
    def test_run_retry_on_commandpaused(self):
 
1968
        """Command retried because of pausing."""
 
1969
        called = []
 
1970
        self.cmd.finish = lambda: called.append(True)
 
1971
        self.cmd.markers_resolved_deferred = defer.succeed(True)
 
1972
        assert self.rq.active
 
1973
 
 
1974
        # deferreds, first one stucks, the second allows to continue
 
1975
        deferreds = [defer.Deferred(), defer.succeed(True)]
 
1976
 
 
1977
        def fake_run():
 
1978
            """Set the queue inactive to avoid retry loop and fail."""
 
1979
            self.rq.active = False
 
1980
            return deferreds.pop(0)
 
1981
 
 
1982
        # set up and test
 
1983
        self.cmd._run = fake_run
 
1984
 
 
1985
        # run and check finish was not called
 
1986
        self.cmd.run()
 
1987
        self.assertFalse(called)
 
1988
 
 
1989
        # pause, still nothing called
 
1990
        self.cmd.pause()
 
1991
 
 
1992
        # resume, now it finished!
 
1993
        self.rq.active = True
 
1994
        self.cmd.resume()
 
1995
        self.assertTrue(called)
 
1996
 
1966
1997
    @defer.inlineCallbacks
1967
1998
    def test_start_default(self):
1968
1999
        """Default _start just returns a triggered deferred and sets done."""
1993
2024
        self.assertEqual(called, [self.cmd])
1994
2025
        self.assertFalse(self.cmd.running)
1995
2026
 
 
2027
    @defer.inlineCallbacks
 
2028
    def test_pause_running(self):
 
2029
        """Pause while running."""
 
2030
        self.cmd.running_deferred = InterruptibleDeferred(defer.Deferred())
 
2031
        called = []
 
2032
        self.cmd.cleanup = lambda: called.append(True)
 
2033
 
 
2034
        self.cmd.pause()
 
2035
        self.assertTrue(self.handler.check_debug("pausing"))
 
2036
        self.assertTrue(called)
 
2037
 
 
2038
        try:
 
2039
            yield self.cmd.running_deferred
 
2040
        except DeferredInterrupted:
 
2041
            pass   # this is handled by run() to retry
 
2042
        else:
 
2043
            self.fail("Test should have raised an exception")
 
2044
 
 
2045
    def test_pause_norunning(self):
 
2046
        """Pause while not running."""
 
2047
        assert self.cmd.running_deferred is None
 
2048
        called = []
 
2049
        self.cmd.cleanup = lambda: called.append(True)
 
2050
 
 
2051
        self.cmd.pause()
 
2052
        self.assertTrue(self.handler.check_debug("pausing"))
 
2053
        self.assertTrue(called)
 
2054
 
1996
2055
    def test_resume(self):
1997
2056
        """Trigger the deferred only if there."""
1998
2057
        # nothing called when no deferred
2767
2826
        self.assertTrue(self.handler.check_debug('semaphore acquired'))
2768
2827
        self.assertIdentical(o, self.command.tx_semaphore)
2769
2828
 
 
2829
    def test_start_releases_semaphore_if_cancelled(self):
 
2830
        """Release the semaphore if cancelled while locking."""
 
2831
        lock = defer.Deferred()
 
2832
        self.rq.transfers_semaphore.acquire = lambda: lock
 
2833
 
 
2834
        # call start and cancel the command
 
2835
        self.command._start()
 
2836
        self.command.cancelled = True
 
2837
 
 
2838
        # release the lock
 
2839
        mocker = Mocker()
 
2840
        req = mocker.mock()
 
2841
        req.release()
 
2842
        with mocker:
 
2843
            lock.callback(req)
 
2844
 
 
2845
        # check it released the semaphore
 
2846
        self.assertTrue(self.handler.check_debug('semaphore released',
 
2847
                                                 'cancelled'))
 
2848
        self.assertIdentical(self.command.tx_semaphore, None)
 
2849
 
2770
2850
    def test_finish_releases_semaphore_if_acquired(self):
2771
2851
        """Test semaphore is released on finish if it was acquired."""
2772
2852
        s = FakeSemaphore()
3181
3261
        self.assertTrue(self.handler.check_debug('semaphore acquired'))
3182
3262
        self.assertIdentical(o, self.command.tx_semaphore)
3183
3263
 
 
3264
    def test_start_releases_semaphore_if_cancelled(self):
 
3265
        """Release the semaphore if cancelled while locking."""
 
3266
        lock = defer.Deferred()
 
3267
        self.rq.transfers_semaphore.acquire = lambda: lock
 
3268
 
 
3269
        # call start and cancel the command
 
3270
        self.command._start()
 
3271
        self.command.cancelled = True
 
3272
 
 
3273
        # release the lock
 
3274
        mocker = Mocker()
 
3275
        req = mocker.mock()
 
3276
        req.release()
 
3277
        with mocker:
 
3278
            lock.callback(req)
 
3279
 
 
3280
        # check it released the semaphore
 
3281
        self.assertTrue(self.handler.check_debug('semaphore released',
 
3282
                                                 'cancelled'))
 
3283
        self.assertIdentical(self.command.tx_semaphore, None)
 
3284
 
3184
3285
    def test_finish_releases_semaphore_if_acquired(self):
3185
3286
        """Test semaphore is released on finish if it was acquired."""
3186
3287
        s = FakeSemaphore()
4537
4638
        self.cmd = MyCommand(self.queue)
4538
4639
        return res
4539
4640
 
4540
 
    def _check_finished_ok(self):
 
4641
    def _check_finished_ok(self, cmd=None):
4541
4642
        """Check that command finished ok."""
4542
 
        self.assertFalse(self.cmd.running)
4543
 
        self.assertNotIn(self.cmd, self.queue.waiting)
 
4643
        if cmd is None:
 
4644
            cmd = self.cmd
 
4645
        self.assertFalse(cmd.running)
 
4646
        self.assertNotIn(cmd, self.queue.waiting)
4544
4647
        self.assertFalse(self.action_queue.pathlock.count)
4545
4648
 
4546
4649
    def test_simple_start_end_ok_queue_active(self):
4670
4773
        self.assertEqual(called, [1, 2])
4671
4774
        self._check_finished_ok()
4672
4775
 
4673
 
    def test_disconnect_connect_running(self):
 
4776
    def test_disconnect_connect_running_with_error(self):
4674
4777
        """Simulate a disconnection and connection while command running."""
4675
4778
 
4676
4779
        def f1():
4879
4982
        # all check
4880
4983
        self.assertEqual(called, ['cleanup', 'handle_failure', 'finish'])
4881
4984
        self._check_finished_ok()
 
4985
 
 
4986
    def test_cancel_while_transfer_locked(self):
 
4987
        """Cancel the command while waiting for transfer semaphore.
 
4988
 
 
4989
        The semaphore lock must be released! Of course, this test is on
 
4990
        download/upload commands.
 
4991
        """
 
4992
        cmd = Upload(self.queue, share_id='a_share_id', node_id='a_node_id',
 
4993
                     previous_hash='prev_hash', hash='yadda', crc32=0, size=0,
 
4994
                     path='path', fileobj_factory=lambda: None,
 
4995
                     tempfile_factory=lambda: None)
 
4996
 
 
4997
        # patch the command to simulate a request to an already full
 
4998
        # transfer semaphore in _start
 
4999
        transfers_semaphore = self.queue.transfers_semaphore
 
5000
        semaphores = []
 
5001
        for i in xrange(SIMULT_TRANSFERS):
 
5002
            s = transfers_semaphore.acquire()
 
5003
            s.addCallback(semaphores.append)
 
5004
 
 
5005
        # let the command go, and cancel in the middle
 
5006
        cmd.go()
 
5007
        cmd.cancel()
 
5008
 
 
5009
        # release previous semaphores
 
5010
        for s in semaphores:
 
5011
            s.release()
 
5012
 
 
5013
        # semaphore released
 
5014
        self.assertIdentical(cmd.tx_semaphore, None)
 
5015
        self._check_finished_ok(cmd)
 
5016
 
 
5017
    def test_disconnect_connect_running_no_error(self):
 
5018
        """Simulate a disconnection and connection while running.
 
5019
 
 
5020
        Sometimes there's no error on the command (ConnectionLost) because the
 
5021
        command got into running after the network was lost :(
 
5022
        """
 
5023
        called = []
 
5024
        d1 = defer.Deferred()
 
5025
        d2 = defer.Deferred()
 
5026
        run_deferreds = [d1, d2]
 
5027
        self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
 
5028
 
 
5029
        # check handled success and cleanup
 
5030
        self.cmd.handle_success = lambda a: called.append(a)
 
5031
        self.cmd.cleanup = lambda: called.append('clean')
 
5032
 
 
5033
        # let the command go, it will stuck in d1
 
5034
        self.cmd.go()
 
5035
 
 
5036
        # disconnect and connect, and then trigger d2 for the command to finish
 
5037
        self.queue.stop()
 
5038
        self.queue.run()
 
5039
        d2.callback('finish')
 
5040
 
 
5041
        # all check
 
5042
        self.assertEqual(called, ['run', 'clean', 'run', 'finish'])
 
5043
        self._check_finished_ok()
 
5044
 
 
5045
 
 
5046
class InterruptibleDeferredTests(TwistedTestCase):
 
5047
    """Test the InterruptibleDeferred behaviour."""
 
5048
 
 
5049
    @defer.inlineCallbacks
 
5050
    def test_original_callbacked(self):
 
5051
        """Original deferred is callbacked."""
 
5052
        origdef = defer.Deferred()
 
5053
        intrdef = InterruptibleDeferred(origdef)
 
5054
        origdef.callback('foo')
 
5055
        result = yield intrdef
 
5056
        self.assertEqual(result, 'foo')
 
5057
 
 
5058
        # later we can interrupt, nothing happens
 
5059
        intrdef.interrupt()
 
5060
        self.assertFalse(intrdef.interrupted)
 
5061
 
 
5062
    @defer.inlineCallbacks
 
5063
    def test_original_errbacked(self):
 
5064
        """Original deferred is errbacked."""
 
5065
        origdef = defer.Deferred()
 
5066
        intrdef = InterruptibleDeferred(origdef)
 
5067
        origdef.errback(ValueError('foo'))
 
5068
        try:
 
5069
            yield intrdef
 
5070
        except ValueError, e:
 
5071
            self.assertEqual(str(e), 'foo')
 
5072
        else:
 
5073
            self.fail("Test should have raised an exception")
 
5074
 
 
5075
        # later we can interrupt, nothing happens
 
5076
        intrdef.interrupt()
 
5077
        self.assertFalse(intrdef.interrupted)
 
5078
 
 
5079
 
 
5080
    def test_interrupt_except(self):
 
5081
        """Interrupt!"""
 
5082
        intrdef = InterruptibleDeferred(defer.Deferred())
 
5083
        intrdef.interrupt()
 
5084
 
 
5085
        try:
 
5086
            yield intrdef
 
5087
        except DeferredInterrupted:
 
5088
            self.assertTrue(intrdef.interrupted)
 
5089
        else:
 
5090
            self.fail("Test should have raised an exception")
 
5091
 
 
5092
    def test_interrupt_callback_original(self):
 
5093
        """Interrupt silences further original callbacks."""
 
5094
        origdef = defer.Deferred()
 
5095
        intrdef = InterruptibleDeferred(origdef)
 
5096
        intrdef.interrupt()
 
5097
 
 
5098
        try:
 
5099
            yield intrdef
 
5100
        except DeferredInterrupted:
 
5101
            pass  # just silecen the exception
 
5102
        else:
 
5103
            self.fail("Test should have raised an exception")
 
5104
 
 
5105
        # further callback to original deferred is harmless
 
5106
        origdef.callback("foo")
 
5107
 
 
5108
    def test_interrupt_errback_original(self):
 
5109
        """Interrupt silences further original errbacks."""
 
5110
        origdef = defer.Deferred()
 
5111
        intrdef = InterruptibleDeferred(origdef)
 
5112
        intrdef.interrupt()
 
5113
 
 
5114
        try:
 
5115
            yield intrdef
 
5116
        except DeferredInterrupted:
 
5117
            pass  # just silecen the exception
 
5118
        else:
 
5119
            self.fail("Test should have raised an exception")
 
5120
 
 
5121
        # further callback to original deferred is harmless
 
5122
        origdef.errback(ValueError('foo'))