59
59
from ubuntuone.syncdaemon.action_queue import (
60
60
ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
61
61
DeleteVolume, Download, ListVolumes, ActionQueueProtocol,
62
RequestQueue, UploadProgressWrapper, Upload,
62
RequestQueue, UploadProgressWrapper, Upload, SIMULT_TRANSFERS,
63
63
CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
64
64
TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
65
65
ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
66
InterruptibleDeferred, DeferredInterrupted,
67
68
from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
68
69
from ubuntuone.syncdaemon.marker import MDMarker
561
562
self.assertFalse(self.rq.active)
563
def test_stop_clean_commands(self):
564
"""Clean up all queued commands on stop."""
564
def test_stop_pauses_commands(self):
565
"""Pauses all queued commands on stop."""
566
567
cmd1 = FakeCommand()
567
568
cmd2 = FakeCommand()
568
569
self.rq.waiting.extend((cmd1, cmd2))
569
assert not cmd1.cleaned and not cmd2.cleaned
570
assert not cmd1.paused and not cmd2.paused
573
self.assertTrue(cmd1.cleaned)
574
self.assertTrue(cmd2.cleaned)
574
self.assertTrue(cmd1.paused)
575
self.assertTrue(cmd2.paused)
576
577
def test_check_conditions(self):
577
578
"""Check all conditions on the commands."""
1964
1965
self.assertFalse(called)
1967
def test_run_retry_on_commandpaused(self):
1968
"""Command retried because of pausing."""
1970
self.cmd.finish = lambda: called.append(True)
1971
self.cmd.markers_resolved_deferred = defer.succeed(True)
1972
assert self.rq.active
1974
# deferreds, first one stucks, the second allows to continue
1975
deferreds = [defer.Deferred(), defer.succeed(True)]
1978
"""Set the queue inactive to avoid retry loop and fail."""
1979
self.rq.active = False
1980
return deferreds.pop(0)
1983
self.cmd._run = fake_run
1985
# run and check finish was not called
1987
self.assertFalse(called)
1989
# pause, still nothing called
1992
# resume, now it finished!
1993
self.rq.active = True
1995
self.assertTrue(called)
1966
1997
@defer.inlineCallbacks
1967
1998
def test_start_default(self):
1968
1999
"""Default _start just returns a triggered deferred and sets done."""
1993
2024
self.assertEqual(called, [self.cmd])
1994
2025
self.assertFalse(self.cmd.running)
2027
@defer.inlineCallbacks
2028
def test_pause_running(self):
2029
"""Pause while running."""
2030
self.cmd.running_deferred = InterruptibleDeferred(defer.Deferred())
2032
self.cmd.cleanup = lambda: called.append(True)
2035
self.assertTrue(self.handler.check_debug("pausing"))
2036
self.assertTrue(called)
2039
yield self.cmd.running_deferred
2040
except DeferredInterrupted:
2041
pass # this is handled by run() to retry
2043
self.fail("Test should have raised an exception")
2045
def test_pause_norunning(self):
2046
"""Pause while not running."""
2047
assert self.cmd.running_deferred is None
2049
self.cmd.cleanup = lambda: called.append(True)
2052
self.assertTrue(self.handler.check_debug("pausing"))
2053
self.assertTrue(called)
1996
2055
def test_resume(self):
1997
2056
"""Trigger the deferred only if there."""
1998
2057
# nothing called when no deferred
2767
2826
self.assertTrue(self.handler.check_debug('semaphore acquired'))
2768
2827
self.assertIdentical(o, self.command.tx_semaphore)
2829
def test_start_releases_semaphore_if_cancelled(self):
2830
"""Release the semaphore if cancelled while locking."""
2831
lock = defer.Deferred()
2832
self.rq.transfers_semaphore.acquire = lambda: lock
2834
# call start and cancel the command
2835
self.command._start()
2836
self.command.cancelled = True
2845
# check it released the semaphore
2846
self.assertTrue(self.handler.check_debug('semaphore released',
2848
self.assertIdentical(self.command.tx_semaphore, None)
2770
2850
def test_finish_releases_semaphore_if_acquired(self):
2771
2851
"""Test semaphore is released on finish if it was acquired."""
2772
2852
s = FakeSemaphore()
3181
3261
self.assertTrue(self.handler.check_debug('semaphore acquired'))
3182
3262
self.assertIdentical(o, self.command.tx_semaphore)
3264
def test_start_releases_semaphore_if_cancelled(self):
3265
"""Release the semaphore if cancelled while locking."""
3266
lock = defer.Deferred()
3267
self.rq.transfers_semaphore.acquire = lambda: lock
3269
# call start and cancel the command
3270
self.command._start()
3271
self.command.cancelled = True
3280
# check it released the semaphore
3281
self.assertTrue(self.handler.check_debug('semaphore released',
3283
self.assertIdentical(self.command.tx_semaphore, None)
3184
3285
def test_finish_releases_semaphore_if_acquired(self):
3185
3286
"""Test semaphore is released on finish if it was acquired."""
3186
3287
s = FakeSemaphore()
4880
4983
self.assertEqual(called, ['cleanup', 'handle_failure', 'finish'])
4881
4984
self._check_finished_ok()
4986
def test_cancel_while_transfer_locked(self):
4987
"""Cancel the command while waiting for transfer semaphore.
4989
The semaphore lock must be released! Of course, this test is on
4990
download/upload commands.
4992
cmd = Upload(self.queue, share_id='a_share_id', node_id='a_node_id',
4993
previous_hash='prev_hash', hash='yadda', crc32=0, size=0,
4994
path='path', fileobj_factory=lambda: None,
4995
tempfile_factory=lambda: None)
4997
# patch the command to simulate a request to an already full
4998
# transfer semaphore in _start
4999
transfers_semaphore = self.queue.transfers_semaphore
5001
for i in xrange(SIMULT_TRANSFERS):
5002
s = transfers_semaphore.acquire()
5003
s.addCallback(semaphores.append)
5005
# let the command go, and cancel in the middle
5009
# release previous semaphores
5010
for s in semaphores:
5013
# semaphore released
5014
self.assertIdentical(cmd.tx_semaphore, None)
5015
self._check_finished_ok(cmd)
5017
def test_disconnect_connect_running_no_error(self):
5018
"""Simulate a disconnection and connection while running.
5020
Sometimes there's no error on the command (ConnectionLost) because the
5021
command got into running after the network was lost :(
5024
d1 = defer.Deferred()
5025
d2 = defer.Deferred()
5026
run_deferreds = [d1, d2]
5027
self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
5029
# check handled success and cleanup
5030
self.cmd.handle_success = lambda a: called.append(a)
5031
self.cmd.cleanup = lambda: called.append('clean')
5033
# let the command go, it will stuck in d1
5036
# disconnect and connect, and then trigger d2 for the command to finish
5039
d2.callback('finish')
5042
self.assertEqual(called, ['run', 'clean', 'run', 'finish'])
5043
self._check_finished_ok()
5046
class InterruptibleDeferredTests(TwistedTestCase):
5047
"""Test the InterruptibleDeferred behaviour."""
5049
@defer.inlineCallbacks
5050
def test_original_callbacked(self):
5051
"""Original deferred is callbacked."""
5052
origdef = defer.Deferred()
5053
intrdef = InterruptibleDeferred(origdef)
5054
origdef.callback('foo')
5055
result = yield intrdef
5056
self.assertEqual(result, 'foo')
5058
# later we can interrupt, nothing happens
5060
self.assertFalse(intrdef.interrupted)
5062
@defer.inlineCallbacks
5063
def test_original_errbacked(self):
5064
"""Original deferred is errbacked."""
5065
origdef = defer.Deferred()
5066
intrdef = InterruptibleDeferred(origdef)
5067
origdef.errback(ValueError('foo'))
5070
except ValueError, e:
5071
self.assertEqual(str(e), 'foo')
5073
self.fail("Test should have raised an exception")
5075
# later we can interrupt, nothing happens
5077
self.assertFalse(intrdef.interrupted)
5080
def test_interrupt_except(self):
5082
intrdef = InterruptibleDeferred(defer.Deferred())
5087
except DeferredInterrupted:
5088
self.assertTrue(intrdef.interrupted)
5090
self.fail("Test should have raised an exception")
5092
def test_interrupt_callback_original(self):
5093
"""Interrupt silences further original callbacks."""
5094
origdef = defer.Deferred()
5095
intrdef = InterruptibleDeferred(origdef)
5100
except DeferredInterrupted:
5101
pass # just silecen the exception
5103
self.fail("Test should have raised an exception")
5105
# further callback to original deferred is harmless
5106
origdef.callback("foo")
5108
def test_interrupt_errback_original(self):
5109
"""Interrupt silences further original errbacks."""
5110
origdef = defer.Deferred()
5111
intrdef = InterruptibleDeferred(origdef)
5116
except DeferredInterrupted:
5117
pass # just silecen the exception
5119
self.fail("Test should have raised an exception")
5121
# further callback to original deferred is harmless
5122
origdef.errback(ValueError('foo'))