1
# -*- coding: utf-8 -*-
3
# Author: John R. Lenton <john.lenton@canonical.com>
4
# Author: Natalia B. Bidart <natalia.bidart@canonical.com>
5
# Author: Facundo Batista <facundo@canonical.com>
7
# Copyright 2009, 2010, 2011 Canonical Ltd.
9
# This program is free software: you can redistribute it and/or modify it
10
# under the terms of the GNU General Public License version 3, as published
11
# by the Free Software Foundation.
13
# This program is distributed in the hope that it will be useful, but
14
# WITHOUT ANY WARRANTY; without even the implied warranties of
15
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16
# PURPOSE. See the GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License along
19
# with this program. If not, see <http://www.gnu.org/licenses/>.
20
"""Tests for the action queue module."""
22
from __future__ import with_statement
33
from functools import wraps
34
from StringIO import StringIO
38
from mocker import Mocker, MockerTestCase, ANY, expect
39
from oauth import oauth
40
from twisted.internet import defer, threads, reactor
41
from twisted.internet import error as twisted_error
42
from twisted.python.failure import DefaultException, Failure
43
from twisted.web import server
44
from twisted.trial.unittest import TestCase as TwistedTestCase
45
from zope.interface.verify import verifyObject, verifyClass
47
from contrib.testing.testcase import (
48
BaseTwistedTestCase, DummyClass, FakeMain, FakeActionQueue,
50
from ubuntuone.devtools import handlers
51
from ubuntuone import logger, clientdefs
52
from ubuntuone.platform import open_file, platform, path_exists
53
from ubuntuone.storageprotocol import (
60
from ubuntuone.syncdaemon import states, interfaces, config
61
from ubuntuone.syncdaemon import action_queue
62
from ubuntuone.syncdaemon.action_queue import (
63
ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
64
DeleteVolume, Download, ListVolumes, ActionQueueProtocol, ListShares,
65
RequestQueue, UploadProgressWrapper, Upload,
66
CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
67
TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
68
ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
69
InterruptibleDeferred, DeferredInterrupted, ConditionsLocker,
70
NamedTemporaryFile, PingManager,
72
from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
73
from ubuntuone.syncdaemon.marker import MDMarker
74
from tests.platform import setup_action_queue_test
76
PATH = os.path.join(u'~', u'Documents', u'pdfs', u'moño', u'')
78
VOLUME = uuid.UUID('12345678-1234-1234-1234-123456789abc')
79
NODE = uuid.UUID('FEDCBA98-7654-3211-2345-6789ABCDEF12')
84
def fire_and_check(f, deferred, check):
85
"""Callback a deferred."""
87
def inner(*args, **kwargs):
88
"""Execute f and fire the deferred."""
89
result = f(*args, **kwargs)
92
deferred.callback(True)
94
deferred.errback(error)
99
class MementoHandler(handlers.MementoHandler):
100
"""Wrapper to handle custom logger levels."""
102
def check_note(self, *msgs):
103
"""Shortcut for checking in ERROR."""
104
return self.check(logger.NOTE, *msgs)
107
class FakeTempFile(object):
108
"""Fake temporary file."""
109
def __init__(self, tmpdir):
110
self.name = os.path.join(tmpdir, 'remove-me.zip')
111
open_file(self.name, 'w').close()
114
class FakeCommand(object):
115
"""Yet another fake action queue command."""
119
conditions_checked = False
121
def __init__(self, share_id=None, node_id=None):
122
self.share_id = share_id
123
self.node_id = node_id
124
self.cancelled = False
125
self.log = logging.getLogger('ubuntuone.SyncDaemon')
127
run = lambda self: defer.succeed(True)
130
"""Mark as paused."""
134
def uniqueness(self):
135
"""Fake uniqueness."""
136
if self.share_id is None and self.node_id is None:
139
return (self.__class__.__name__, self.share_id, self.node_id)
143
self.cancelled = True
146
class FakedEventQueue(EventQueue):
147
"""Faked event queue."""
149
def __init__(self, fs=None):
150
"""Initialize a faked event queue."""
151
super(FakedEventQueue, self).__init__(fs=fs)
154
def push(self, event_name, **kwargs):
155
"""Faked event pushing."""
156
self.events.append((event_name, kwargs))
157
super(FakedEventQueue, self).push(event_name, **kwargs)
160
class FakedVolume(object):
167
class FakeSemaphore(object):
168
"""Fake semaphore."""
174
"""Increase the count."""
178
"""Decrease the count."""
182
class FakeRequest(object):
185
self.deferred = defer.succeed(True)
186
self.cancelled = False
189
"""Mark cancelled."""
190
self.cancelled = True
193
class FakeClient(object):
198
def put_content_request(self, *args, **kwargs):
199
"""Fake a put content request with its deferred."""
200
self.called.append(('put_content_request', args, kwargs))
203
def get_content_request(self, *args, **kwargs):
204
"""Fake a get content request with its deferred."""
205
self.called.append(('get_content_request', args, kwargs))
209
class TestingProtocol(ActionQueue.protocol):
210
"""Protocol for testing."""
212
max_payload_size = 65536
214
def connectionMade(self):
215
"""connectionMade."""
216
ActionQueue.protocol.connectionMade(self)
218
# assure we're connected
219
events = [x[0] for x in self.factory.event_queue.events]
220
assert 'SYS_CONNECTION_MADE' in events
222
self.factory.event_queue.events = [] # reset events
223
if hasattr(self, 'testing_deferred'):
224
self.testing_deferred.callback(True)
227
class TestActionQueue(ActionQueue):
228
"""AQ class that uses the testing protocol."""
229
protocol = TestingProtocol
232
class BasicTestCase(BaseTwistedTestCase):
233
"""Basic test case for ActionQueue."""
237
@defer.inlineCallbacks
240
yield super(BasicTestCase, self).setUp()
242
self.root = self.mktemp('root')
243
self.home = self.mktemp('home')
244
self.data = self.mktemp('data')
245
self.shares = self.mktemp('shares')
246
self.partials = self.mktemp('partials')
248
# set up FakeMain to use the testing AQ, the port will be decided later
249
self.patch(FakeMain, '_fake_AQ_class', TestActionQueue)
250
self.patch(FakeMain, '_fake_AQ_params', ('127.0.0.1', 0, False))
252
self.main = FakeMain(root_dir=self.root, shares_dir=self.shares,
253
data_dir=self.data, partials_dir=self.partials)
254
self.addCleanup(self.main.shutdown)
256
self.action_queue = self.main.action_q
257
self.action_queue.connection_timeout = 3
258
self.action_queue.event_queue.events = []
261
"""Keep a copy of the pushed events."""
263
def recording(event_name, **kwargs):
264
"""Keep a copy of the pushed events."""
265
value = (event_name, kwargs)
266
if event_name != 'SYS_STATE_CHANGED' and \
267
not event_name.startswith('VM_'):
268
self.action_queue.event_queue.events.append(value)
269
return f(event_name, **kwargs)
272
self.main.event_q.push = keep_a_copy(self.main.event_q.push)
274
self.handler = MementoHandler()
275
self.handler.setLevel(logging.DEBUG)
276
self._logger = logging.getLogger('ubuntuone.SyncDaemon')
277
self._logger.addHandler(self.handler)
278
self.addCleanup(self._logger.removeHandler, self.handler)
280
# setup done according to the platform
281
setup_action_queue_test(self)
283
@defer.inlineCallbacks
286
for record in self.handler.records:
287
exc_info = getattr(record, 'exc_info', None)
288
if exc_info is not None:
289
raise exc_info[0], exc_info[1], exc_info[2]
291
yield super(BasicTestCase, self).tearDown()
293
def user_connect(self):
294
"""User requested to connect to server."""
295
token = {'token': 'bla', 'token_secret': 'ble',
296
'consumer_key': 'foo', 'consumer_secret': 'bar'}
297
self.action_queue.event_queue.push('SYS_USER_CONNECT',
301
class BasicTests(BasicTestCase):
302
"""Basic tests to check ActionQueue."""
304
def test_implements_interface(self):
305
"""Verify ActionQueue and FakeActionQueue interface."""
306
verifyObject(interfaces.IActionQueue, self.action_queue)
307
verifyClass(interfaces.IActionQueue, FakeActionQueue)
309
@defer.inlineCallbacks
310
def test_get_root_and_demark(self):
311
"""Get the received Root and demark its mdid."""
313
d = self.action_queue.uuid_map.get('mdid')
315
# we received a root!
316
self.main.event_q.push('SYS_ROOT_RECEIVED',
317
root_id='node_id', mdid='mdid')
319
# it should be demarked with the root node_id
320
root_node_id = yield d
321
self.assertEqual(root_node_id, 'node_id')
323
def test_cancelupload_calls_cancelop(self):
324
"""cancel_upload passes the correct args to the generic method."""
326
self.action_queue._cancel_op = lambda *a: called.append(a)
327
self.action_queue.cancel_upload('share', 'node')
328
self.assertEqual(called, [('share', 'node', Upload)])
330
def test_canceldownload_calls_cancelop(self):
331
"""cancel_download passes the correct args to the generic method."""
333
self.action_queue._cancel_op = lambda *a: called.append(a)
334
self.action_queue.cancel_download('share', 'node')
335
self.assertEqual(called, [('share', 'node', Download)])
337
def test_cancelop_nothing(self):
338
"""It does not cancel anything, because all empty."""
339
assert not self.action_queue.queue.waiting
340
self.action_queue._cancel_op('shr', 'node', Upload)
341
self.assertFalse(self.handler.check_debug('cancel', 'shr', 'node'))
343
def _set_queue(self, *waiting):
344
"""Set the content queue content."""
345
cq = self.action_queue.queue
347
cq.waiting.append(cmd)
348
cq.hashed_waiting[cmd.uniqueness] = cmd
350
def test_cancelop_different_sharenode(self):
351
"""It does not cancel anything, because queue with different stuff."""
352
cmd1 = FakeCommand('sh', 'nd1')
353
cmd2 = FakeCommand('sh', 'nd2')
354
self._set_queue(cmd1, cmd2)
355
self.action_queue._cancel_op('sh', 'nd3', FakeCommand)
356
self.assertFalse(self.handler.check_debug('external cancel attempt'))
357
self.assertFalse(cmd1.cancelled)
358
self.assertFalse(cmd2.cancelled)
360
def test_cancelop_different_operation(self):
361
"""It does not cancel anything, because queue with different stuff."""
362
cmd1 = FakeCommand('sh', 'nd')
363
cmd2 = FakeCommand('sh', 'nd')
364
self._set_queue(cmd1, cmd2)
365
self.action_queue._cancel_op('sh', 'nd', Upload)
366
self.assertFalse(self.handler.check_debug('external cancel attempt'))
367
self.assertFalse(cmd1.cancelled)
368
self.assertFalse(cmd2.cancelled)
370
def test_cancelop_inwaiting(self):
371
"""Cancel something that is in the waiting queue."""
372
cmd = FakeCommand('sh', 'nd')
374
self.action_queue._cancel_op('sh', 'nd', FakeCommand)
375
self.assertTrue(self.handler.check_debug('external cancel attempt',
377
self.assertTrue(cmd.cancelled)
379
def test_node_is_queued_move_api(self):
380
"""Test that it calls the queue method."""
382
aq = self.action_queue
383
aq.queue.node_is_queued = lambda *a: called.append(a)
384
aq.node_is_with_queued_move('share', 'node')
385
self.assertEqual(called, [(Move, 'share', 'node')])
387
def test_node_is_queued_move_integration(self):
388
"""Kind of integration test for this method."""
389
aq = self.action_queue
390
cmd = Move(aq.queue, VOLUME, 'node', 'o_p', 'n_p', 'n_name',
391
"pathfrom", "pathto")
392
self.assertFalse(aq.node_is_with_queued_move(VOLUME, 'node'))
393
aq.queue.waiting.append(cmd)
394
aq.queue.hashed_waiting[cmd.uniqueness] = cmd
395
self.assertTrue(aq.node_is_with_queued_move(VOLUME, 'node'))
397
def test_event_listener(self):
398
"""All event listeners should define methods with correct signature."""
399
for evtname, evtargs in EVENTS.iteritems():
400
meth = getattr(ActionQueue, 'handle_' + evtname, None)
402
defined_args = inspect.getargspec(meth)[0]
403
self.assertEqual(defined_args[0], 'self')
404
self.assertEqual(set(defined_args[1:]), set(evtargs))
407
class TestLoggingStorageClient(TwistedTestCase):
408
"""Tests for ensuring magic hash dont show in logs."""
410
def get_message(self):
411
"""Produce an upload message."""
412
message = protocol_pb2.Message()
413
message.type = protocol_pb2.Message.PUT_CONTENT
414
message.put_content.share = "share"
415
message.put_content.node = "node"
416
message.put_content.previous_hash = "previous hash"
417
message.put_content.magic_hash = "magic!"
418
message.put_content.hash = "hash"
421
def test_sanitize_messages(self):
422
"""Messages get sanitized and magic hash is removed."""
423
message = self.get_message()
424
result = action_queue.sanitize_message(message)
425
text = result[0].__mod__(result[1:])
426
self.assertTrue("magic!" not in text)
427
self.assertTrue("share" in text)
428
self.assertTrue("hash" in text)
429
self.assertTrue("previous_hash" in text)
431
def test_logging_storage_client(self):
432
"""LoggingStorageClient sanitizes messages."""
433
message = self.get_message()
435
lsc = action_queue.LoggingStorageClient()
436
lsc.log.setLevel(action_queue.TRACE)
437
lsc.log_trace = lambda *args: result.append(args)
438
lsc.log_message(message)
439
self.assertTrue(result, [action_queue.sanitize_message(message)])
442
class TestRequestQueue(TwistedTestCase):
443
"""Tests for the RequestQueue."""
448
class FakeAQ(object):
450
event_queue = self.eq = FakedEventQueue()
452
self.rq = RequestQueue(action_queue=FakeAQ())
453
self.addCleanup(self.eq.shutdown)
455
# add a Memento handler to the logger
456
self.log_handler = MementoHandler()
457
self.log_handler.setLevel(logging.DEBUG)
458
logger = logging.getLogger('ubuntuone.SyncDaemon')
459
logger.addHandler(self.log_handler)
460
self.addCleanup(logger.removeHandler, self.log_handler)
462
def _add_to_rq(self, *cmds):
463
"""Add the commands to rq.waiting and hashed_waiting."""
465
self.rq.waiting.append(cmd)
466
self.rq.hashed_waiting[cmd.uniqueness] = cmd
468
def test_len_nothing(self):
469
"""Len with nothing queued."""
470
self.assertEqual(len(self.rq), 0)
472
def test_len_waiting(self):
473
"""Len with something in the queue."""
474
self.rq.waiting.append(1)
475
self.assertEqual(len(self.rq), 1)
477
def test_len_bigger(self):
478
"""Len with several commands."""
479
self.rq.waiting.append(1)
480
self.rq.waiting.append(1)
481
self.assertEqual(len(self.rq), 2)
483
def test_queue_adds_to_waiting(self):
484
"""Command queues is appended to waiting."""
488
self.rq.waiting.append(cmd1)
492
self.assertEqual(list(self.rq.waiting), [cmd1, cmd2])
494
def test_queue_sends_changed_event(self):
495
"""Alert something changed."""
498
evt = ('SYS_QUEUE_ADDED', dict(command=cmd))
499
self.assertIn(evt, self.eq.events)
501
def test_queue_waiting_if_first(self):
502
"""It should send the WAITING signal ."""
508
self.assertEqual(list(self.rq.waiting), [cmd])
509
self.assertIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
511
def test_queue_nowaiting_if_not_first(self):
512
"""It should not send the WAITING signal if no first cmd."""
516
self.rq.waiting.append(cmd1)
520
self.assertEqual(list(self.rq.waiting), [cmd1, cmd2])
521
self.assertNotIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
523
def test_with_one_run(self):
524
"""Run will execute the command."""
527
self.assertIn(('SYS_QUEUE_WAITING', {}), self.eq.events)
528
self.assertNotIn(('SYS_QUEUE_DONE', {}), self.eq.events)
530
self.assertIn(('SYS_QUEUE_DONE', {}), self.eq.events)
532
def test_with_two_run(self):
533
"""Run will execute both commands."""
534
# first queuing, get the event
537
self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
538
self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
540
# second queuing, don't get the event
544
self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
545
self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
547
# first run, no new events
548
self.rq.unqueue(cmd1)
549
self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
550
self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 0)
552
# second run, now we're done
553
self.rq.unqueue(cmd2)
554
self.assertEqual(self.eq.events.count(('SYS_QUEUE_WAITING', {})), 1)
555
self.assertEqual(self.eq.events.count(('SYS_QUEUE_DONE', {})), 1)
557
def test_init_notactive(self):
558
"""RQ borns not active."""
559
self.assertFalse(self.rq.active)
561
def test_init_activedef(self):
562
"""Just instanced queue has the deferred to take."""
563
self.assertTrue(isinstance(self.rq.active_deferred, defer.Deferred))
565
def test_run_goes_active(self):
566
"""Activate on run."""
568
self.assertTrue(self.rq.active)
570
def test_run_triggers_activedef(self):
571
"""Trigger the active_deferred on run."""
572
assert not self.rq.active_deferred.called
574
self.assertTrue(self.rq.active_deferred.called)
576
def test_stop_goes_inactive(self):
577
"""Desactivate on stop."""
578
self.rq.active = True
580
self.assertFalse(self.rq.active)
582
def test_stop_pauses_commands(self):
583
"""Pauses all queued commands on stop."""
587
self.rq.waiting.extend((cmd1, cmd2))
588
assert not cmd1.paused and not cmd2.paused
592
self.assertTrue(cmd1.paused)
593
self.assertTrue(cmd2.paused)
595
def test_stop_pause_useful_activedef(self):
596
"""Refresh the active_deferred before pausing."""
597
checked = defer.Deferred()
600
"""Check that RQ has a useful active_deferred."""
601
self.assertTrue(isinstance(self.rq.active_deferred,
603
self.assertFalse(self.rq.active_deferred.called)
604
checked.callback(True)
607
cmd.pause = fake_pause
608
self.rq.waiting.append(cmd)
614
def test_unqueue_remove(self):
615
"""Remove the command from queue on unqueue."""
616
# set up a couple of commands
619
self.rq.waiting.extend((cmd1, cmd2))
620
self.rq.hashed_waiting[cmd1] = cmd1
621
self.rq.hashed_waiting[cmd2] = cmd2
623
# unqueue and check that 1 was removed from both structures and
624
# that cmd2 is still there untouched
625
self.rq.unqueue(cmd1)
626
self.assertNotIn(cmd1, self.rq.waiting)
627
self.assertIn(cmd2, self.rq.waiting)
628
self.assertNotIn(cmd1, self.rq.hashed_waiting)
629
self.assertIn(cmd2, self.rq.hashed_waiting)
631
def test_unqueue_sysqueuedone_if_empty(self):
632
"""Send SYS_QUEUE_DONE if empty after unqueue."""
635
self.rq.waiting.append(cmd)
636
self.rq.hashed_waiting[cmd] = cmd
638
# unqueue it and check
640
self.assertIn(('SYS_QUEUE_DONE', {}), self.eq.events)
642
def test_unqueue_sysqueuedone_if_not_empty(self):
643
"""Do not send SYS_QUEUE_DONE if not empty after unqueue."""
644
# set up a couple of commands
647
self.rq.waiting.extend((cmd1, cmd2))
648
self.rq.hashed_waiting[cmd1] = cmd1
649
self.rq.hashed_waiting[cmd2] = cmd2
651
# unqueue only one and check
652
self.rq.unqueue(cmd1)
653
self.assertNotIn(('SYS_QUEUE_DONE', {}), self.eq.events)
655
def test_unqueue_sends_changed_event(self):
656
"""Alert something changed."""
658
self.rq.waiting.append(cmd)
660
evt = ('SYS_QUEUE_REMOVED', dict(command=cmd))
661
self.assertIn(evt, self.eq.events)
663
def test_remove_empty(self):
664
"""Don't remove if waiting is empty."""
665
assert not self.rq.waiting, "test badly set up"
668
self.assertFalse(self.rq.waiting)
669
self.assertFalse(self.rq.hashed_waiting)
671
def test_remove_other(self):
672
"""Don't remove if waiting has other command."""
673
cmd1 = FakeCommand(1, 2)
674
self._add_to_rq(cmd1)
675
cmd2 = FakeCommand(2, 3)
677
self.assertEqual(list(self.rq.waiting), [cmd1])
678
self.assertEqual(self.rq.hashed_waiting.values(), [cmd1])
680
def test_remove_command(self):
681
"""Remove for the command."""
685
self.assertFalse(self.rq.waiting)
686
self.assertFalse(self.rq.hashed_waiting)
688
def test_remove_mixed(self):
689
"""Remove ok in a mixed situation."""
690
cmd1 = FakeCommand(1, 2)
691
cmd2 = FakeCommand(2, 3)
692
cmd3 = FakeCommand(3, 4)
693
self._add_to_rq(cmd1, cmd2, cmd3)
695
self.assertEqual(list(self.rq.waiting), [cmd1, cmd3])
696
self.assertEqual(set(self.rq.hashed_waiting.values()),
699
def test_hashedwaiting_queue(self):
700
"""Queue a command and it will be added to hashed waiting."""
703
self.assertTrue(self.rq.hashed_waiting.values(), [cmd])
705
def test_node_is_queued_nothing(self):
706
"""Test with empty queues."""
707
self.assertFalse(self.rq.node_is_queued(Move, 'share', 'node'))
709
def test_node_is_queued_waiting(self):
710
"""Test with a command in waiting."""
711
cmd = FakeCommand('share', 'node')
713
self.assertTrue(self.rq.node_is_queued(FakeCommand, 'share', 'node'))
715
def test_node_is_queued_different_command(self):
716
"""The node is queued, but other command on it."""
717
cmd = FakeCommand('share', 'node')
719
self.assertFalse(self.rq.node_is_queued(Move, 'share', 'node'))
721
def test_node_is_queued_different_node(self):
722
"""The command is queued, but on other node."""
723
cmd = FakeCommand('share', 'node')
725
self.assertFalse(self.rq.node_is_queued(FakeCommand, 'share', 'other'))
727
def test_len_empty(self):
728
"""Counter return that it's empty."""
729
self.assertEqual(len(self.rq), 0)
731
def test_len_with_one(self):
732
"""Counter return that it has one."""
735
self.assertEqual(len(self.rq), 1)
737
def test_len_with_two(self):
738
"""Counter return that it has two."""
742
self.assertEqual(len(self.rq), 2)
744
def test_len_run_decreases(self):
745
"""Counter behaviour when adding/running."""
749
self.assertEqual(len(self.rq), 1)
751
self.assertEqual(len(self.rq), 2)
752
self.rq.unqueue(cmd1)
753
self.assertEqual(len(self.rq), 1)
754
self.rq.unqueue(cmd2)
755
self.assertEqual(len(self.rq), 0)
757
def test_init_simult_transfers(self):
758
"""Configure the transfers semaphore according to config."""
759
user_config = config.get_user_config()
760
user_config.set_simult_transfers(12345)
761
rq = RequestQueue(action_queue=None)
762
self.assertEqual(rq.transfers_semaphore.tokens, 12345)
765
class TestDeferredMap(TwistedTestCase):
766
"""Test the deferred map."""
770
self.dm = DeferredMap()
772
def test_one_get_returns_stored_deferred(self):
773
"""Get will return the stored deferred."""
774
d = self.dm.get('foo')
775
self.assertEqual(self.dm.waiting, {'foo': [d]})
777
def test_two_gets_returns_second_deferred_other_key(self):
778
"""A second get for other key will return other deferred."""
779
d1 = self.dm.get('foo')
780
d2 = self.dm.get('bar')
781
self.assertEqual(self.dm.waiting, {'foo': [d1], 'bar': [d2]})
783
def test_two_gets_returns_second_deferred_same_key(self):
784
"""A second get for the same key will return other deferred."""
785
d1 = self.dm.get('foo')
786
d2 = self.dm.get('foo')
787
self.assertEqual(self.dm.waiting, {'foo': [d1, d2]})
789
def test_mixed_gets(self):
790
"""Several gets with different keys."""
791
d1 = self.dm.get('foo')
792
d2 = self.dm.get('bar')
793
d3 = self.dm.get('foo')
794
d4 = self.dm.get('baz')
795
self.assertEqual(self.dm.waiting,
796
{'foo': [d1, d3], 'bar': [d2], 'baz': [d4]})
798
def test_set_to_nothing(self):
799
"""It's ok to set a key that is not being waited."""
800
self.dm.set('not there', 'value')
802
@defer.inlineCallbacks
803
def test_set_fires_deferred_single(self):
804
"""The set fires the unique waiting deferred with the value."""
805
d1 = self.dm.get('foo')
806
d2 = self.dm.get('bar')
807
d3 = self.dm.get('foo')
808
self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
810
self.dm.set('bar', 'value')
812
self.assertEqual(res, 'value')
813
self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
815
@defer.inlineCallbacks
816
def test_set_fires_deferred_multiple(self):
817
"""The set fires the multiple waiting deferreds with the value."""
818
d1 = self.dm.get('foo')
819
d2 = self.dm.get('bar')
820
d3 = self.dm.get('foo')
821
self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
823
self.dm.set('foo', 'value')
826
self.assertEqual(res1, 'value')
827
self.assertEqual(res2, 'value')
828
self.assertEqual(self.dm.waiting, {'bar': [d2]})
830
def test_err_to_nothing(self):
831
"""It's ok to err a key that is not being waited."""
832
self.dm.err('not there', 'failure')
834
@defer.inlineCallbacks
835
def test_err_fires_deferred_single(self):
836
"""The set fires the unique waiting deferred with the failure."""
837
d1 = self.dm.get('foo')
838
d2 = self.dm.get('bar')
839
d3 = self.dm.get('foo')
840
self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
842
exc = Exception('problem!')
843
self.dm.err('bar', Failure(exc))
847
self.assertEqual(e, exc)
849
self.fail("It didn't fired the deferred with a failure!")
850
self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
853
class TestZipQueue(TwistedTestCase):
854
"""Test the zipping queue."""
860
@defer.inlineCallbacks
861
def test_zip_calls_compress_in_thread(self):
862
"""Test that self._compress is called in another thread."""
863
upload = FakeCommand()
865
fake_fileobj = mocker.mock()
867
upload.fileobj_factory = lambda: fake_fileobj
869
def fake_compress(deferred, _upload, fileobj):
870
"""Fake the _compress method."""
871
self.assertEqual(upload, _upload)
872
deferred.callback(True)
874
self.zq._compress = fake_compress
876
yield self.zq.zip(upload)
878
@defer.inlineCallbacks
879
def test_zip_calls_compress_with_file_object(self):
880
"""Test that _compress is called with the result of fileobj factory."""
881
upload = FakeCommand()
882
upload.fileobj_factory = lambda: Mocker().mock().close()
884
def fake_compress(deferred, upload, fileobj):
885
"""Fake the _compress method."""
886
deferred.callback(True)
888
self.zq._compress = fake_compress
889
yield self.zq.zip(upload)
891
@defer.inlineCallbacks
892
def test_fileobj_factory_error_is_logged(self):
893
"""Log the error when fileobj_factory fails."""
896
raise ValueError("foo")
898
upload = FakeCommand()
899
upload.fileobj_factory = crash
902
self.handler = MementoHandler()
903
self.handler.setLevel(logging.DEBUG)
904
upload.log.addHandler(self.handler)
906
yield self.zq.zip(upload)
907
self.assertTrue(self.handler.check_warning("Unable to build fileobj",
908
"ValueError", "foo"))
910
@defer.inlineCallbacks
911
def test_fileobj_factory_error_cancels_upload(self):
912
"""Cancel the upload when fileobj_factory fails."""
913
upload = FakeCommand()
915
yield self.zq.zip(upload)
916
self.assertTrue(upload.cancelled)
918
@defer.inlineCallbacks
919
def test_fileobj_factory_error_dont_call_compress(self):
920
"""Stop the execution if fileobj_factory fails."""
921
upload = FakeCommand()
923
self.zq._compress = lambda *a: called.append(True)
924
yield self.zq.zip(upload)
925
self.assertFalse(called)
927
@defer.inlineCallbacks
928
def test_zip_acquire_lock(self):
929
"""Test that it acquires the lock."""
931
self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
934
"""Fake the acquire method."""
937
return defer.succeed(True)
939
self.zq.acquire = fake_acquire
940
upload = FakeCommand()
941
upload.fileobj_factory = lambda: Mocker().mock().close()
942
yield self.zq.zip(upload)
943
self.assertTrue(called)
945
@defer.inlineCallbacks
946
def test_zip_release_lock_ok(self):
947
"""Test that it releases the lock when all ok."""
949
self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
950
self.zq.release = lambda: called.append(True)
952
upload = FakeCommand()
953
upload.fileobj_factory = lambda: Mocker().mock().close()
954
yield self.zq.zip(upload)
955
self.assertTrue(called)
957
@defer.inlineCallbacks
958
def test_zip_release_lock_compression_error(self):
959
"""Test that it releases the lock even on compression error."""
961
exc = Exception('bad')
962
self.zq._compress = lambda deferred, upl, fobj: deferred.errback(exc)
963
self.zq.release = lambda: called.append(True)
964
upload = FakeCommand()
965
upload.fileobj_factory = lambda: Mocker().mock().close()
968
yield self.zq.zip(upload)
970
# need to silent the exception we're generating in the test
971
self.assertEqual(e, exc)
973
self.fail("It should have raised the exception!")
974
self.assertTrue(called)
976
@defer.inlineCallbacks
977
def test_zip_release_lock_fileobjfactory_error(self):
978
"""Test that it releases the lock even on file factory error."""
980
self.zq.release = lambda: called.append(True)
981
upload = FakeCommand()
983
yield self.zq.zip(upload)
984
self.assertTrue(called)
986
@defer.inlineCallbacks
987
def test_fileobj_closed_ok(self):
988
"""Close the fileobj after compressing ok."""
990
fake_fileobj = mocker.mock()
993
self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
995
upload = FakeCommand()
996
upload.fileobj_factory = lambda: fake_fileobj
998
yield self.zq.zip(upload)
1000
@defer.inlineCallbacks
1001
def test_fileobj_closed_error(self):
1002
"""Close the fileobj after compressing with error."""
1004
fake_fileobj = mocker.mock()
1005
fake_fileobj.close()
1007
exc = Exception('bad')
1008
self.zq._compress = lambda deferred, upl, fobj: deferred.errback(exc)
1010
upload = FakeCommand()
1011
upload.fileobj_factory = lambda: fake_fileobj
1014
yield self.zq.zip(upload)
1015
except Exception, e:
1016
# need to silent the exception we're generating in the test
1017
self.assertEqual(e, exc)
1019
self.fail("It should have raised the exception!")
1021
@defer.inlineCallbacks
1022
def test_compress_gets_compressed_data(self):
1023
"""Compressed data is generated by _compress."""
1024
upload = FakeCommand()
1025
data = "a lot of data to compress"
1026
fileobj = StringIO(data)
1029
d = defer.Deferred()
1030
reactor.callInThread(self.zq._compress, d, upload, fileobj)
1033
compressed = open_file(upload.tempfile.name).read()
1034
self.assertEqual(compressed, data.encode('zip'))
1036
@defer.inlineCallbacks
1037
def test_compress_gets_magic_hash(self):
1038
"""The magic hash is generated by _compress."""
1039
upload = FakeCommand()
1040
data = "a lot of data to hash"
1041
fileobj = StringIO(data)
1043
# what the hash should give us
1044
mh = content_hash.magic_hash_factory()
1046
should_hash = mh.content_hash()._magic_hash
1049
d = defer.Deferred()
1050
reactor.callInThread(self.zq._compress, d, upload, fileobj)
1053
hashed = upload.magic_hash._magic_hash
1054
self.assertEqual(hashed, should_hash)
1056
def test_compress_release_deferred_cancelled_command(self):
1057
"""Release the deferred if the command is cancelled."""
1058
upload = FakeCommand()
1059
upload.cancelled = True
1062
d = defer.Deferred()
1063
reactor.callInThread(self.zq._compress, d, upload, None)
1066
@defer.inlineCallbacks
1067
def test_compress_release_deferred_on_error(self):
1068
"""Release the deferred on an error."""
1069
upload = None # _compress will fail because want to access .cancelled
1072
d = defer.Deferred()
1073
reactor.callInThread(self.zq._compress, d, upload, None)
1074
yield self.assertFailure(d, AttributeError)
1077
class FactoryBaseTestCase(BasicTestCase):
1078
"""Helper for by-pass Twisted."""
1080
def _start_sample_webserver(self):
1081
"""Start a web server serving content at its root"""
1082
# start listening on `decide yourself` port, and fix AQ with it
1083
website = server.Site(None)
1084
webport = reactor.listenTCP(0, website)
1085
self.action_queue.port = webport.getHost().port
1087
transport_class = webport.transport
1088
def save_an_instance(skt, protocol, addr, sself, s, sreactor):
1089
self.server_transport = transport_class(skt, protocol, addr, sself,
1091
return self.server_transport
1092
webport.transport = save_an_instance
1094
self.addCleanup(webport.stopListening)
1097
def _connect_factory(self):
1098
"""Connect the instance factory."""
1099
self.server = self._start_sample_webserver()
1100
orig = self.action_queue.buildProtocol
1102
d = defer.Deferred()
1103
def faked_buildProtocol(*args, **kwargs):
1104
"""Override buildProtocol to hook a deferred."""
1105
protocol = orig(*args, **kwargs)
1106
protocol.testing_deferred = d
1109
self.action_queue.buildProtocol = faked_buildProtocol
1110
self.action_queue.connect()
1113
def _disconnect_factory(self):
1114
"""Disconnect the instance factory."""
1115
if self.action_queue.client is not None:
1116
orig = self.action_queue.client.connectionLost
1118
d = defer.Deferred()
1119
def faked_connectionLost(reason):
1120
"""Receive connection lost and fire tearDown."""
1124
self.action_queue.client.connectionLost = faked_connectionLost
1126
d = defer.succeed(True)
1128
if self.action_queue.connect_in_progress:
1129
self.action_queue.disconnect()
1134
class ConnectionTestCase(FactoryBaseTestCase):
1135
"""Test TCP/SSL connection mechanism for ActionQueue."""
1137
def assert_connection_state_reset(self):
1138
"""Test connection state is properly reset."""
1139
self.assertTrue(self.action_queue.client is None)
1140
self.assertTrue(self.action_queue.connector is None)
1141
self.assertEqual(False, self.action_queue.connect_in_progress)
1143
def test_init(self):
1144
"""Test connection init state."""
1145
self.assert_connection_state_reset()
1147
@defer.inlineCallbacks
1148
def test_connect_if_already_connected(self):
1149
"""Test that double connections are avoided."""
1150
yield self._connect_factory()
1152
assert self.action_queue.connector is not None
1153
assert self.action_queue.connect_in_progress == True
1154
# double connect, it returns None instead of a Deferred
1155
result = self.action_queue.connect()
1156
self.assertTrue(result is None, 'not connecting again')
1158
yield self._disconnect_factory()
1160
@defer.inlineCallbacks
1161
def test_disconnect_if_connected(self):
1162
"""self.action_queue.connector.disconnect was called."""
1163
yield self._connect_factory()
1165
self.action_queue.event_queue.events = [] # cleanup events
1166
assert self.action_queue.connector.state == 'connected'
1167
self.action_queue.disconnect()
1169
self.assert_connection_state_reset()
1170
self.assertEqual([], self.action_queue.event_queue.events)
1172
yield self._disconnect_factory()
1174
@defer.inlineCallbacks
1175
def test_clientConnectionFailed(self):
1176
"""Test clientConnectionFailed.
1178
The connection will not be completed since the server will be down. So,
1179
self.action_queue.connector will never leave the 'connecting' state.
1180
When interrupting the connection attempt, twisted automatically calls
1181
self.action_queue.clientConnectionFailed.
1184
self.action_queue.event_queue.events = []
1185
orig = self.action_queue.clientConnectionFailed
1187
d = defer.Deferred()
1188
def faked_clientConnectionFailed(connector, reason):
1189
"""Receive connection failed and check."""
1190
orig(connector, reason)
1191
self.assert_connection_state_reset()
1192
self.assertEqual([('SYS_CONNECTION_FAILED', {})],
1193
self.action_queue.event_queue.events)
1194
self.action_queue.clientConnectionFailed = orig
1197
self.action_queue.clientConnectionFailed = faked_clientConnectionFailed
1198
# factory will never finish the connection, server was never started
1199
self.action_queue.connect()
1200
# stopConnecting() will be called since the connection is in progress
1201
assert self.action_queue.connector.state == 'connecting'
1202
self.action_queue.connector.disconnect()
1206
@defer.inlineCallbacks
1207
def test_clientConnectionLost(self):
1208
"""Test clientConnectionLost
1210
The connection will be completed successfully.
1211
So, self.action_queue.connector will be in the 'connected' state.
1212
When disconnecting the connector, twisted automatically calls
1213
self.action_queue.clientConnectionLost.
1216
yield self._connect_factory()
1218
self.action_queue.event_queue.events = []
1219
orig = self.action_queue.clientConnectionLost
1221
d = defer.Deferred()
1222
def faked_clientConnectionLost(connector, reason):
1223
"""Receive connection lost and check."""
1224
orig(connector, reason)
1225
self.assert_connection_state_reset()
1226
self.assertEqual([('SYS_CONNECTION_LOST', {})],
1227
self.action_queue.event_queue.events)
1228
self.action_queue.clientConnectionLost = orig
1231
self.action_queue.clientConnectionLost = faked_clientConnectionLost
1232
# loseConnection() will be called since the connection was completed
1233
assert self.action_queue.connector.state == 'connected'
1234
self.action_queue.connector.disconnect()
1237
yield self._disconnect_factory()
1239
@defer.inlineCallbacks
1240
def test_server_disconnect(self):
1241
"""Test factory's connection when the server goes down."""
1243
yield self._connect_factory()
1245
self.action_queue.event_queue.events = []
1246
orig = self.action_queue.clientConnectionLost
1248
d = defer.Deferred()
1249
def faked_connectionLost(*args, **kwargs):
1250
"""Receive connection lost and check."""
1251
orig(*args, **kwargs)
1252
self.assert_connection_state_reset()
1253
self.assertEqual([('SYS_CONNECTION_LOST', {})],
1254
self.action_queue.event_queue.events)
1255
self.action_queue.clientConnectionLost = orig
1258
self.action_queue.clientConnectionLost = faked_connectionLost
1259
# simulate a server failure!
1260
yield self.server_transport.loseConnection()
1262
yield self._disconnect_factory()
1264
def test_buildProtocol(self):
1265
"""Test buildProtocol."""
1266
protocol = self.action_queue.buildProtocol(addr=None)
1267
self.assertTrue(protocol is self.action_queue.client)
1268
self.assertTrue(self.action_queue is self.action_queue.client.factory)
1270
# callbacks are connected
1271
# pylint: disable-msg=W0212
1272
aq = self.action_queue
1273
self.assertEqual(aq.client._share_change_callback,
1274
aq._share_change_callback)
1275
self.assertEqual(aq.client._share_answer_callback,
1276
aq._share_answer_callback)
1277
self.assertEqual(aq.client._free_space_callback,
1278
aq._free_space_callback)
1279
self.assertEqual(aq.client._account_info_callback,
1280
aq._account_info_callback)
1281
self.assertEqual(aq.client._volume_created_callback,
1282
aq._volume_created_callback)
1283
self.assertEqual(aq.client._volume_deleted_callback,
1284
aq._volume_deleted_callback)
1285
self.assertEqual(aq.client._volume_new_generation_callback,
1286
aq._volume_new_generation_callback)
1288
@defer.inlineCallbacks
1289
def test_connector_gets_assigned_on_connect(self):
1290
"""Test factory's connector gets assigned on connect."""
1291
yield self._connect_factory()
1293
self.assertTrue(self.action_queue.connector is not None)
1295
yield self._disconnect_factory()
1297
def test_connection_started_logging(self):
1298
"""Test that the connection started logs connector info, not AQ's."""
1299
assert self.action_queue.host == '127.0.0.1'
1300
assert self.action_queue.port == 0
1302
class FakeConnector(object):
1303
"""Fake connector."""
1307
self.action_queue.startedConnecting(FakeConnector())
1308
self.assertTrue(self.handler.check_info("Connection started",
1309
"host 1.2.3.4", "port 4321"))
1312
class NetworkmanagerTestCase(FactoryBaseTestCase):
1313
"""Base test case generating a connected factory."""
1317
@defer.inlineCallbacks
1320
yield super(NetworkmanagerTestCase, self).setUp()
1321
self.action_queue.event_queue.push('SYS_NET_CONNECTED')
1324
@defer.inlineCallbacks
1325
def test_wrong_disconnect(self):
1326
"""Test factory's connection when SYS_NET_DISCONNECTED."""
1328
d1 = self.main.wait_for('SYS_CONNECTION_MADE')
1329
d2 = self.main.wait_for('SYS_CONNECTION_LOST')
1331
self.server = self._start_sample_webserver()
1335
self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
1338
@defer.inlineCallbacks
1339
def test_disconnect_twice(self):
1340
"""Test connection when SYS_NET_DISCONNECTED is received twice."""
1342
d1 = self.main.wait_for('SYS_CONNECTION_MADE')
1343
d2 = self.main.wait_for('SYS_CONNECTION_LOST')
1345
self.server = self._start_sample_webserver()
1349
self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
1352
self.action_queue.event_queue.events = []
1353
self.action_queue.event_queue.push('SYS_NET_DISCONNECTED')
1354
self.assertEqual([('SYS_NET_DISCONNECTED', {})],
1355
self.action_queue.event_queue.events,
1356
'No new events after a misplaced SYS_NET_DISCONNECTED')
1358
@defer.inlineCallbacks
1359
def test_net_connected_if_already_connected(self):
1360
"""Test connection when SYS_NET_CONNECTED is received twice."""
1362
d1 = self.main.wait_for('SYS_CONNECTION_MADE')
1364
self.server = self._start_sample_webserver()
1368
self.action_queue.event_queue.events = []
1369
self.action_queue.event_queue.push('SYS_NET_CONNECTED')
1370
self.assertEqual([('SYS_NET_CONNECTED', {})],
1371
self.action_queue.event_queue.events,
1372
'No new events after a misplaced SYS_NET_CONNECTED')
1374
@defer.inlineCallbacks
1375
def test_messy_mix(self):
1376
"""Test connection when a messy mix of events is received."""
1377
orig_waiting = states.MAX_WAITING
1378
states.MAX_WAITING = 1
1380
self.action_queue.event_queue.events = []
1381
self.server = self._start_sample_webserver()
1383
conn_made = self.main.wait_for('SYS_CONNECTION_MADE')
1387
events = ['SYS_NET_CONNECTED', 'SYS_NET_DISCONNECTED',
1388
'SYS_NET_CONNECTED', 'SYS_NET_CONNECTED',
1389
'SYS_NET_DISCONNECTED', 'SYS_NET_DISCONNECTED',
1390
'SYS_NET_CONNECTED']
1393
self.action_queue.event_queue.push(i)
1395
yield self.main.wait_for_nirvana()
1397
expected = ['SYS_NET_CONNECTED', # from the DBus fake NetworkManager
1398
'SYS_NET_CONNECTED', 'SYS_NET_DISCONNECTED',
1399
'SYS_CONNECTION_LOST', 'SYS_CONNECTION_RETRY',
1400
'SYS_NET_CONNECTED', 'SYS_NET_CONNECTED',
1401
'SYS_CONNECTION_MADE', 'SYS_NET_DISCONNECTED',
1402
'SYS_NET_DISCONNECTED']
1404
avoid = ('SYS_STATE_CHANGED', 'SYS_LOCAL_RESCAN_DONE',
1405
'SYS_PROTOCOL_VERSION_OK', 'SYS_SET_CAPABILITIES_OK',
1406
'SYS_AUTH_OK', 'SYS_SERVER_RESCAN_DONE')
1407
actual = [event for (event, kwargs) in
1408
self.action_queue.event_queue.events
1409
if event not in avoid]
1410
self.assertEqual(sorted(expected), sorted(actual))
1412
states.MAX_WAITING = orig_waiting
1415
class ConnectedBaseTestCase(FactoryBaseTestCase):
1416
"""Base test case generating a connected factory."""
1418
@defer.inlineCallbacks
1421
yield super(ConnectedBaseTestCase, self).setUp()
1422
yield self._connect_factory()
1423
assert self.action_queue.connector.state == 'connected'
1425
@defer.inlineCallbacks
1428
yield self._disconnect_factory()
1429
yield super(ConnectedBaseTestCase, self).tearDown()
1431
def silent_connection_lost(self, failure):
1432
"""Some tests will generate connection lost, support it."""
1433
if not failure.check(twisted_error.ConnectionDone,
1434
twisted_error.ConnectionLost):
1438
class VolumeManagementTestCase(ConnectedBaseTestCase):
1439
"""Test Volume managemenr for ActionQueue."""
1441
@defer.inlineCallbacks
1444
yield super(VolumeManagementTestCase, self).setUp()
1446
# silence the event to avoid propagation
1447
listener_map = self.action_queue.event_queue.listener_map
1448
del listener_map['SV_VOLUME_DELETED']
1450
def test_volume_created_push_event(self):
1451
"""Volume created callback push proper event."""
1452
volume = FakedVolume()
1453
self.action_queue._volume_created_callback(volume)
1454
self.assertEqual([('SV_VOLUME_CREATED', {'volume': volume})],
1455
self.action_queue.event_queue.events)
1457
def test_volume_deleted_push_event(self):
1458
"""Volume deleted callback push proper event."""
1460
self.action_queue._volume_deleted_callback(volume_id)
1461
self.assertEqual([('SV_VOLUME_DELETED', {'volume_id': volume_id})],
1462
self.action_queue.event_queue.events)
1464
def test_volume_new_generation_push_event_root(self):
1465
"""Volume New Generation callback push proper event with root."""
1466
volume = request.ROOT
1467
self.action_queue._volume_new_generation_callback(volume, 77)
1468
event = ('SV_VOLUME_NEW_GENERATION',
1469
{'volume_id': volume, 'generation': 77})
1470
self.assertTrue(event in self.action_queue.event_queue.events)
1472
def test_volume_new_generation_push_event_uuid(self):
1473
"""Volume New Generation callback push proper event with uuid."""
1474
volume = uuid.uuid4()
1475
self.action_queue._volume_new_generation_callback(volume, 77)
1476
event = ('SV_VOLUME_NEW_GENERATION',
1477
{'volume_id': volume, 'generation': 77})
1478
self.assertTrue(event in self.action_queue.event_queue.events)
1480
def test_valid_events(self):
1481
"""Volume events are valid in EventQueue."""
1482
new_events = ('SV_VOLUME_CREATED', 'SV_VOLUME_DELETED',
1483
'AQ_CREATE_UDF_OK', 'AQ_CREATE_UDF_ERROR',
1484
'AQ_LIST_VOLUMES', 'AQ_LIST_VOLUMES_ERROR',
1485
'AQ_DELETE_VOLUME_OK', 'AQ_DELETE_VOLUME_ERROR',
1486
'SV_VOLUME_NEW_GENERATION')
1487
for event in new_events:
1488
self.assertTrue(event in EVENTS)
1490
self.assertEqual(('volume',), EVENTS['SV_VOLUME_CREATED'])
1491
self.assertEqual(('volume_id',), EVENTS['SV_VOLUME_DELETED'])
1492
self.assertEqual(('volume_id', 'node_id', 'marker'),
1493
EVENTS['AQ_CREATE_UDF_OK'])
1494
self.assertEqual(('error', 'marker'), EVENTS['AQ_CREATE_UDF_ERROR'])
1495
self.assertEqual(('volumes',), EVENTS['AQ_LIST_VOLUMES'])
1496
self.assertEqual(('error',), EVENTS['AQ_LIST_VOLUMES_ERROR'])
1497
self.assertEqual(('volume_id',), EVENTS['AQ_DELETE_VOLUME_OK'])
1498
self.assertEqual(('volume_id', 'error',),
1499
EVENTS['AQ_DELETE_VOLUME_ERROR'])
1500
self.assertEqual(('volume_id', 'generation',),
1501
EVENTS['SV_VOLUME_NEW_GENERATION'])
1503
def test_create_udf(self):
1504
"""Test volume creation."""
1507
self.action_queue.create_udf(path, name, marker=None)
1509
def test_list_volumes(self):
1510
"""Test volume listing."""
1511
self.action_queue.list_volumes()
1513
def test_delete_volume(self):
1514
"""Test volume deletion."""
1516
self.action_queue.delete_volume(volume_id, 'path')
1519
class ActionQueueCommandTestCase(ConnectedBaseTestCase):
1520
"""Test for the generic functionality of ActionQueueCommand."""
1522
@defer.inlineCallbacks
1525
yield super(ActionQueueCommandTestCase, self).setUp()
1527
class MyCommand(ActionQueueCommand):
1528
logged_attrs = ('a', 'b', 'c', 'd')
1534
return defer.succeed(True)
1537
def uniqueness(self):
1538
return (self.a, self.b, self.c)
1540
self.rq = RequestQueue(action_queue=self.action_queue)
1541
self.rq.active = True
1542
self.cmd = MyCommand(self.rq)
1543
self.cmd.make_logger()
1545
def test_runnable(self):
1546
"""All commands are runnable by default."""
1547
self.assertTrue(self.cmd.is_runnable)
1549
def test_cancelled(self):
1550
"""All commands are not cancelled by default."""
1551
self.assertFalse(self.cmd.cancelled)
1553
def test_dump_to_dict(self):
1554
"""Test to dict dumping."""
1555
d = self.cmd.to_dict()
1556
self.assertEqual(d, dict(a=3, b='foo', c=u'año', d=None))
1558
@defer.inlineCallbacks
1559
def test_demark_not_marker(self):
1560
"""Test demark with not a marker."""
1561
self.cmd.possible_markers = 'foo',
1562
self.cmd.foo = 'not a marker'
1563
yield self.cmd.demark()
1564
self.assertEqual(self.cmd.foo, 'not a marker')
1566
@defer.inlineCallbacks
1567
def test_demark_with_marker_future(self):
1568
"""Test demark with a marker not ready.
1570
Here, on purpose, set up everything and trigger later.
1572
marker = MDMarker('foo')
1573
self.cmd.possible_markers = 'foo',
1574
self.cmd.foo = marker
1575
d = self.cmd.demark()
1577
self.action_queue.uuid_map.set(marker, 'node_id')
1579
self.assertEqual(self.cmd.foo, 'node_id')
1580
self.assertTrue(self.handler.check_debug(
1581
"waiting for the real value of marker:foo"))
1583
@defer.inlineCallbacks
1584
def test_demark_with_marker_ready(self):
1585
"""Test demark with a marker that had data."""
1586
marker = MDMarker('foo')
1587
self.cmd.possible_markers = 'foo',
1588
self.cmd.foo = marker
1589
d = self.cmd.demark()
1590
self.action_queue.uuid_map.set(marker, 'node_id')
1592
self.assertEqual(self.cmd.foo, 'node_id')
1593
self.assertTrue(self.handler.check_debug(
1594
"waiting for the real value of marker:foo"))
1596
@defer.inlineCallbacks
1597
def test_demark_mixed_markers(self):
1598
"""Test demark with both a marker and not."""
1599
# call demark with both
1600
marker = MDMarker('foo')
1601
self.cmd.possible_markers = 'foo', 'bar'
1602
self.cmd.foo = 'notamarker'
1603
self.cmd.bar = marker
1604
d = self.cmd.demark()
1605
self.action_queue.uuid_map.set(marker, 'node_id')
1609
self.assertEqual(self.cmd.foo, 'notamarker')
1610
self.assertEqual(self.cmd.bar, 'node_id')
1611
self.assertTrue(self.handler.check_debug(
1612
"waiting for the real value of marker:foo"))
1613
self.assertFalse(self.handler.check_debug(
1614
"waiting for the real value of 'notamarker'"))
1616
@defer.inlineCallbacks
1617
def test_demark_marker_future_got_ok(self):
1618
"""Test demark getting a marker triggered ok later."""
1619
# don't have the info now
1620
marker = MDMarker('foo')
1621
self.cmd.possible_markers = 'foo',
1622
self.cmd.foo = marker
1623
d = self.cmd.demark()
1624
self.assertFalse(self.handler.check_debug("for marker:foo"))
1627
self.action_queue.uuid_map.set(marker, 'node_id')
1629
self.assertEqual(self.cmd.foo, 'node_id')
1630
self.assertTrue(self.handler.check_debug(
1631
"for marker:foo got value 'node_id'"))
1633
@defer.inlineCallbacks
1634
def test_demark_marker_future_got_failure(self):
1635
"""Test demark getting a marker triggered with failure later."""
1636
# don't have the info now
1637
marker = MDMarker('foo')
1638
self.cmd.possible_markers = 'foo',
1639
self.cmd.foo = marker
1640
d = self.cmd.demark()
1641
self.assertFalse(self.handler.check_error("failed marker:foo"))
1643
# set the marker and check
1644
self.action_queue.uuid_map.err(marker, Failure(Exception('bad')))
1646
self.assertTrue(self.handler.check_error("failed marker:foo"))
1648
yield self.cmd.markers_resolved_deferred
1649
except Exception, e:
1650
self.assertEqual(str(e), 'bad')
1652
self.fail("An exception should have been raised!")
1654
@defer.inlineCallbacks
1655
def test_demark_two_markers_ok(self):
1656
"""Test demark with two markers that finish ok."""
1657
# call demark with both
1658
marker1 = MDMarker('foo')
1659
marker2 = MDMarker('bar')
1660
self.cmd.possible_markers = 'foo', 'bar'
1661
self.cmd.foo = marker1
1662
self.cmd.bar = marker2
1663
d = self.cmd.demark()
1664
self.action_queue.uuid_map.set(marker1, 'data1')
1665
self.action_queue.uuid_map.set(marker2, 'data2')
1669
self.assertEqual(self.cmd.foo, 'data1')
1670
self.assertEqual(self.cmd.bar, 'data2')
1671
yield self.cmd.markers_resolved_deferred
1673
@defer.inlineCallbacks
1674
def test_demark_two_markers_one_fail(self):
1675
"""Test demark with two markers that one ends in failure."""
1676
# call demark with both
1677
marker1 = MDMarker('foo')
1678
marker2 = MDMarker('bar')
1679
self.cmd.possible_markers = 'foo', 'bar'
1680
self.cmd.foo = marker1
1681
self.cmd.bar = marker2
1682
d = self.cmd.demark()
1683
self.action_queue.uuid_map.set(marker1, 'data ok')
1684
self.action_queue.uuid_map.err(marker2, Failure(Exception('data bad')))
1689
yield self.cmd.markers_resolved_deferred
1690
except Exception, e:
1691
self.assertEqual(str(e), 'data bad')
1693
self.fail("An exception should have been raised!")
1695
@defer.inlineCallbacks
1696
def test_demark_fixes_hashedwaiting_active(self):
1697
"""The attribute changes: it also need to change the hashed_waiting."""
1698
marker = MDMarker('b')
1699
self.cmd.possible_markers = 'b',
1701
queue = self.cmd._queue
1702
queue.hashed_waiting[self.cmd.uniqueness] = self.cmd
1703
d = self.cmd.demark()
1705
self.action_queue.uuid_map.set(marker, 'node_id')
1707
self.assertTrue(queue.hashed_waiting[self.cmd.uniqueness], self.cmd)
1709
@defer.inlineCallbacks
1710
def test_demark_fixes_hashedwaiting_cancelled(self):
1711
"""The attribute changes: no change because cancelled."""
1712
marker = MDMarker('b')
1713
self.cmd.possible_markers = 'b',
1715
self.cmd.cancelled = True
1716
queue = self.cmd._queue
1717
queue.hashed_waiting[self.cmd.uniqueness] = self.cmd
1718
d = self.cmd.demark()
1720
self.action_queue.uuid_map.set(marker, 'node_id')
1722
self.assertFalse(self.cmd.uniqueness in queue.hashed_waiting)
1724
def test_go_makes_logger_and_demarks(self):
1725
"""Make the logger and demark."""
1727
self.cmd.make_logger = lambda: called.append(1)
1728
self.cmd.demark = lambda: called.append(2)
1731
self.assertEqual(called, [1, 2])
1733
def test_go_stop_if_shouldnt_execute_command(self):
1734
"""Don't queue and don't acquire the pathlock."""
1736
self.cmd._acquire_pathlock = lambda: called.append(True)
1737
self.rq.queue = lambda c: called.append(True)
1738
self.cmd._should_be_queued = lambda: False
1741
self.assertFalse(called)
1743
def test_go_queue_pathlock_run(self):
1744
"""Queue the command, acquire the pathlock and run."""
1746
self.rq.queue = lambda c: called.append(1)
1747
self.cmd._acquire_pathlock = lambda: called.append(2)
1748
self.cmd.run = lambda: called.append(3)
1751
self.assertEqual(called, [1, 2, 3])
1753
def test_go_stop_cancels_while_pathlocking(self):
1754
"""If the command is cancelled while locked, stop."""
1756
self.cmd.run = lambda: called.append(True)
1757
d = defer.Deferred()
1758
self.cmd._acquire_pathlock = lambda: d
1764
self.assertFalse(called)
1766
def test_go_release_cancelled_while_pathlocking(self):
1767
"""If the command is cancelled while locked, release the pathlock."""
1769
self.cmd.run = lambda: called.append(1)
1770
d = defer.Deferred()
1771
self.cmd._acquire_pathlock = lambda: d
1775
d.callback(lambda: called.append(2))
1777
self.assertEqual(called, [2])
1778
self.assertTrue(self.handler.check_debug(
1779
'releasing the pathlock because of cancelled'))
1781
def test_go_run_ok_release_pathlock(self):
1782
"""If run went ok, release the pathlock."""
1784
self.cmd.run = lambda: defer.succeed(True)
1785
self.cmd._acquire_pathlock = lambda: defer.succeed(
1786
lambda: called.append(True))
1789
self.assertTrue(called)
1791
@defer.inlineCallbacks
1792
def test_go_run_bad_release_pathlock(self):
1793
"""If run went bad, release the pathlock."""
1795
self.cmd.run = lambda: defer.fail(ValueError("error message"))
1796
self.cmd._acquire_pathlock = lambda: defer.succeed(
1797
lambda: called.append(True))
1800
self.assertTrue(called)
1802
# check exception to assure a traceback was logged, and check the
1803
# messages in ERROR (the real logging level); finally, clean the
1804
# records as if we leave them with the exception the test will fail
1805
self.assertTrue(self.handler.check_exception(ValueError))
1806
self.assertTrue(self.handler.check_error("Error running the command",
1808
self.handler.records = []
1810
def test_run_initial(self):
1811
"""Call ._start, log, and set running."""
1813
d = defer.Deferred()
1814
self.cmd._start = lambda: called.append(True) or d
1816
# run, and will lock in the _start
1818
self.assertTrue(called)
1819
self.assertTrue(self.handler.check_debug('starting'))
1821
# release the _start, check log and that it still not running
1823
self.assertTrue(self.handler.check_debug('started'))
1824
self.assertFalse(self.cmd.running)
1826
def test_run_stop_if_cancelled_while_start(self):
1827
"""Cancelled while _start."""
1828
self.rq.queue(self.cmd)
1829
assert self.rq.active
1830
assert self.cmd.is_runnable
1831
self.cmd.markers_resolved_deferred.callback(True)
1834
self.cmd._run = lambda: called.append(True)
1835
d = defer.Deferred()
1836
self.cmd._start = lambda: d
1838
# run, cancel, and unlock start
1843
self.assertFalse(called)
1844
self.assertTrue(self.handler.check_debug(
1845
'cancelled before trying to run'))
1847
def test_run_queue_not_active(self):
1848
"""Waiting cycle for queue not active."""
1849
self.rq.queue(self.cmd)
1850
assert self.cmd.is_runnable
1851
self.cmd.markers_resolved_deferred.callback(True)
1853
self.rq.active = False
1855
self.cmd._run = lambda: called.append(True) or defer.Deferred()
1859
self.assertFalse(called)
1860
self.assertTrue(self.handler.check_debug(
1861
'not running because of inactive queue'))
1862
self.assertFalse(self.handler.check_debug('unblocked: queue active'))
1866
self.assertTrue(called)
1867
self.assertTrue(self.handler.check_debug('unblocked: queue active'))
1869
def test_run_command_not_runnable(self):
1870
"""Waiting cycle for command not runnable."""
1871
self.rq.queue(self.cmd)
1872
assert self.rq.active
1873
self.cmd.markers_resolved_deferred.callback(True)
1875
self.cmd.is_runnable = False
1877
self.cmd._run = lambda: called.append(True) or defer.Deferred()
1881
self.assertFalse(called)
1882
self.assertTrue(self.handler.check_debug(
1883
'not running because of conditions'))
1884
self.assertFalse(self.handler.check_debug('unblocked: conditions ok'))
1886
# active the command
1887
self.cmd.is_runnable = True
1888
self.action_queue.conditions_locker.check_conditions()
1889
self.assertTrue(called)
1890
self.assertTrue(self.handler.check_debug('unblocked: conditions ok'))
1892
def test_run_notrunnable_inactivequeue(self):
1893
"""Mixed behaviour between both stoppers."""
1894
self.rq.queue(self.cmd)
1895
self.cmd.markers_resolved_deferred.callback(True)
1896
assert self.cmd.is_runnable
1897
self.rq.active = False
1899
self.cmd._run = lambda: called.append(True) or defer.Deferred()
1903
self.assertFalse(called)
1905
# active the queue but inactive the command
1906
self.cmd.is_runnable = False
1908
self.assertFalse(called)
1910
# active the command but inactive the queue again!
1912
self.cmd.is_runnable = True
1913
self.action_queue.conditions_locker.check_conditions()
1914
self.assertFalse(called)
1916
# finally resume the queue
1918
self.assertTrue(called)
1920
def test_run_inactivequeue_cancel(self):
1921
"""Got cancelled while waiting the queue to resume."""
1922
self.rq.queue(self.cmd)
1923
assert self.cmd.is_runnable
1924
self.cmd.markers_resolved_deferred.callback(True)
1926
self.rq.active = False
1928
self.cmd._run = lambda: called.append(True)
1936
self.assertFalse(called)
1937
self.assertTrue(self.handler.check_debug(
1938
'cancelled before trying to run'))
1940
def test_run_notrunnable_cancel(self):
1941
"""Got cancelled while waiting the conditions to run."""
1942
self.rq.queue(self.cmd)
1943
assert self.rq.active
1944
self.cmd.markers_resolved_deferred.callback(True)
1946
self.cmd.is_runnable = False
1948
self.cmd._run = lambda: called.append(True)
1954
# active the command
1955
self.cmd.is_runnable = True
1956
self.action_queue.conditions_locker.check_conditions()
1957
self.assertFalse(called)
1958
self.handler.debug = True
1959
self.assertTrue(self.handler.check_debug(
1960
'cancelled before trying to run'))
1962
def test_run_waits_markers_dereferencing(self):
1963
"""Don't call _run_command until have the markers."""
1964
self.rq.queue(self.cmd)
1965
assert self.cmd.is_runnable
1966
assert self.rq.active
1969
self.cmd._run = lambda: called.append(True) or defer.Deferred()
1973
self.assertFalse(called)
1975
# resolve the markers
1976
self.cmd.markers_resolved_deferred.callback(True)
1977
self.assertTrue(called)
1978
self.assertTrue(self.cmd.running)
1980
def test_run_endok_calls_finishing_stuff_not_cancelled(self):
1981
"""Call finish on end ok."""
1982
self.rq.queue(self.cmd)
1984
self.cmd.finish = lambda: called.append(2)
1985
self.cmd.handle_success = lambda a: called.append(a)
1986
self.cmd._run = lambda: defer.succeed(1)
1987
self.cmd.markers_resolved_deferred = defer.succeed(True)
1990
# check that handle_success was called *before* finish
1991
self.assertEqual(called, [1, 2])
1992
self.assertTrue(self.handler.check_debug('success'))
1994
def test_run_endok_calls_finishing_stuff_cancelled(self):
1995
"""Call finish on end ok, cancelled while running."""
1996
self.rq.queue(self.cmd)
1998
self.cmd.handle_success = lambda a: called.append(a)
1999
d = defer.Deferred()
2000
self.cmd._run = lambda: d
2001
self.cmd.markers_resolved_deferred = defer.succeed(True)
2004
# cancel and let _run finish
2007
self.assertFalse(called)
2008
self.assertTrue(self.handler.check_debug('cancelled while running'))
2010
def test_run_enderr_calls_finish(self):
2011
"""Call finish on end_errback."""
2013
self.cmd.finish = lambda: called.append(1)
2014
self.cmd.handle_failure = lambda f: called.append(f.value)
2015
self.cmd.markers_resolved_deferred = defer.succeed(True)
2016
self.cmd.suppressed_error_messages.append(ValueError)
2018
self.cmd._run = lambda: defer.fail(exc)
2021
# check that handle_failure was called *before* finish
2022
self.assertEqual(called, [exc, 1])
2024
def test_run_enderr_retry(self):
2025
"""Command retried, call the handle and retry."""
2027
self.cmd.finish = lambda: called.append('should not')
2028
self.cmd.handle_failure = lambda: called.append('should not')
2029
self.cmd.handle_retryable = lambda f: called.append('ok')
2030
self.cmd.markers_resolved_deferred = defer.succeed(True)
2031
assert self.rq.active
2034
"""Set the queue inactive to avoid retry loop and fail."""
2035
self.rq.active = False
2036
raise twisted_error.ConnectionDone()
2039
self.cmd._run = fake_run
2041
# run and check finish was not called
2043
self.assertEqual(called, ['ok'])
2045
def test_run_retry_on_commandpaused(self):
2046
"""Command retried because of pausing."""
2048
self.cmd.finish = lambda: called.append(True)
2049
self.cmd.markers_resolved_deferred = defer.succeed(True)
2050
self.rq.waiting.append(self.cmd)
2051
assert self.rq.active
2053
# deferreds, first one stucks, the second allows to continue
2054
deferreds = [defer.Deferred(), defer.succeed(True)]
2055
self.cmd._run = lambda: deferreds.pop(0)
2057
# run and check finish was not called
2059
self.assertFalse(called)
2061
# pause, still nothing called
2063
self.assertFalse(called)
2065
# resume, now it finished!
2067
self.assertTrue(called)
2069
@defer.inlineCallbacks
2070
def test_start_default(self):
2071
"""Default _start just returns a triggered deferred and sets done."""
2072
yield self.cmd._start()
2074
def test_possible_markers_default(self):
2075
"""Default value for possible markers."""
2076
self.assertEqual(self.cmd.possible_markers, ())
2078
@defer.inlineCallbacks
2079
def test_path_locking(self):
2080
"""Test it has a generic _acquire_pathlock."""
2081
r = yield self.cmd._acquire_pathlock()
2082
self.assertIdentical(r, None)
2084
def test_finish_running(self):
2085
"""Set running to False when finish."""
2086
self.cmd.running = True
2087
self.rq.unqueue = lambda c: None # don't do anything
2089
self.assertFalse(self.cmd.running)
2091
def test_finish_unqueue(self):
2092
"""Unqueue the command when finish."""
2094
self.rq.unqueue = lambda c: called.append(c)
2096
self.assertEqual(called, [self.cmd])
2097
self.assertFalse(self.cmd.running)
2099
@defer.inlineCallbacks
2100
def test_pause_running(self):
2101
"""Pause while running."""
2102
self.cmd.running_deferred = InterruptibleDeferred(defer.Deferred())
2104
self.cmd.cleanup = lambda: called.append(True)
2107
self.assertTrue(self.handler.check_debug("pausing"))
2108
self.assertTrue(called)
2111
yield self.cmd.running_deferred
2112
except DeferredInterrupted:
2113
pass # this is handled by run() to retry
2115
self.fail("Test should have raised an exception")
2117
def test_pause_norunning(self):
2118
"""Pause while not running."""
2119
assert self.cmd.running_deferred is None
2121
self.cmd.cleanup = lambda: called.append(True)
2124
self.assertTrue(self.handler.check_debug("pausing"))
2125
self.assertTrue(called)
2127
def test_cancel_works(self):
2128
"""Do default cleaning."""
2130
self.cmd.cleanup = lambda: called.append(1)
2131
self.cmd.finish = lambda: called.append(2)
2132
assert not self.cmd.cancelled
2133
did_cancel = self.cmd.cancel()
2134
self.assertTrue(did_cancel)
2135
self.assertEqual(called, [1, 2])
2136
self.assertTrue(self.cmd.cancelled)
2137
self.assertTrue(self.handler.check_debug('cancelled'))
2139
def test_cancel_releases_conditions(self):
2140
"""Cancel calls the conditions locker for the command."""
2141
self.cmd.finish = lambda: None # don't try to unqueue!
2142
d = self.action_queue.conditions_locker.get_lock(self.cmd)
2144
self.assertTrue(d.called)
2146
def test_cancel_cancelled(self):
2147
"""Don't do anything if command already cancelled."""
2149
self.cmd.cleanup = lambda: called.append(True)
2150
self.cmd.finish = lambda: called.append(True)
2151
self.cmd.cancelled = True
2152
did_cancel = self.cmd.cancel()
2153
self.assertFalse(did_cancel)
2154
self.assertFalse(called)
2155
self.assertTrue(self.cmd.cancelled)
2157
def test_slots(self):
2158
"""Inherited commands must have __slot__ (that is not inherited)."""
2159
for obj_name in dir(action_queue):
2160
obj = getattr(action_queue, obj_name)
2161
if isinstance(obj, type) and issubclass(obj, ActionQueueCommand) \
2162
and obj is not ActionQueueCommand:
2163
self.assertNotIdentical(obj.__slots__,
2164
ActionQueueCommand.__slots__,
2165
"class %s has no __slots__" % obj)
2168
class CreateUDFTestCase(ConnectedBaseTestCase):
2169
"""Test for CreateUDF ActionQueueCommand."""
2171
@defer.inlineCallbacks
2174
yield super(CreateUDFTestCase, self).setUp()
2176
request_queue = RequestQueue(action_queue=self.action_queue)
2177
self.marker = VOLUME
2178
self.command = CreateUDF(request_queue, PATH, NAME, marker=self.marker)
2180
# silence the event to avoid propagation
2181
listener_map = self.action_queue.event_queue.listener_map
2182
del listener_map['AQ_CREATE_UDF_OK']
2183
del listener_map['AQ_CREATE_UDF_ERROR']
2185
def test_is_action_queue_command(self):
2186
"""Test proper inheritance."""
2187
self.assertTrue(isinstance(self.command, ActionQueueCommand))
2189
def test_init(self):
2190
"""Test creation."""
2191
self.assertEqual(PATH, self.command.path)
2192
self.assertEqual(NAME, self.command.name)
2193
self.assertEqual(self.marker, self.command.marker)
2195
def test_run_returns_a_deferred(self):
2196
"""Test a deferred is returned."""
2197
res = self.command._run()
2198
self.assertIsInstance(res, defer.Deferred)
2199
res.addErrback(self.silent_connection_lost)
2201
def test_run_calls_protocol(self):
2202
"""Test protocol's create_udf is called."""
2203
original = self.command.action_queue.client.create_udf
2206
def check(path, name):
2207
"""Take control over client's feature."""
2209
self.assertEqual(PATH, path)
2210
self.assertEqual(NAME, name)
2212
self.command.action_queue.client.create_udf = check
2216
self.assertTrue(self.called, 'command was called')
2218
self.command.action_queue.client.create_udf = original
2220
def test_handle_success_push_event(self):
2221
"""Test AQ_CREATE_UDF_OK is pushed on success."""
2222
request = client.CreateUDF(self.action_queue.client, PATH, NAME)
2223
request.volume_id = VOLUME
2224
request.node_id = NODE
2225
self.command.handle_success(success=request)
2226
events = [('AQ_CREATE_UDF_OK', {'volume_id': VOLUME,
2228
'marker': self.marker})]
2229
self.assertEqual(events, self.command.action_queue.event_queue.events)
2231
def test_handle_failure_push_event(self):
2232
"""Test AQ_CREATE_UDF_ERROR is pushed on failure."""
2233
msg = 'Something went wrong'
2234
failure = Failure(DefaultException(msg))
2235
self.command.handle_failure(failure=failure)
2236
events = [('AQ_CREATE_UDF_ERROR',
2237
{'error': msg, 'marker': self.marker})]
2238
self.assertEqual(events, self.command.action_queue.event_queue.events)
2240
def test_path_locking(self):
2241
"""Test that it acquires correctly the path lock."""
2243
self.patch(PathLockingTree, 'acquire',
2244
lambda s, *a, **k: t.extend((a, k)))
2245
self.command._acquire_pathlock()
2246
self.assertEqual(t, [tuple(PATH.split(os.path.sep)), {'logger': None}])
2249
class ActionQueueCommandErrorsTestCase(ConnectedBaseTestCase):
2250
"""Test the error handling in ActionQueueCommand."""
2252
@defer.inlineCallbacks
2254
yield super(ActionQueueCommandErrorsTestCase, self).setUp()
2256
self.deferred = defer.Deferred()
2258
class MyLogger(object):
2259
"""Fake logger that just stores error and warning calls."""
2263
def exception(self, *a):
2264
"""Mark that this method was called."""
2265
self.logged = "exception"
2268
"""Mark that this method was called."""
2269
self.logged = "warn"
2271
def debug(self, *a):
2274
class MyCommand(ActionQueueCommand):
2277
self.rq = RequestQueue(action_queue=self.action_queue)
2278
self.rq.unqueue = lambda c: None
2279
self.rq.active = True
2280
self.command = MyCommand(self.rq)
2281
self.command.markers_resolved_deferred = defer.succeed(True)
2282
self.command.log = MyLogger()
2284
def test_suppressed_yes_knownerrors(self):
2285
"""Check that the log is in warning for the known errors."""
2286
def send_failure_and_check(errnum, exception_class):
2287
"""Send the failure."""
2288
# prepare what to send
2289
protocol_msg = protocol_pb2.Message()
2290
protocol_msg.type = protocol_pb2.Message.ERROR
2291
protocol_msg.error.type = errnum
2292
err = exception_class("request", protocol_msg)
2295
"""Set the queue inactive to avoid retry loops and fail."""
2296
self.rq.active = False
2300
self.command.log.logged = None
2301
self.command._run = fake_run
2303
self.assertEqual(self.command.log.logged, "warn",
2304
"Bad log in exception %s" % (exception_class,))
2306
known_errors = [x for x in errors._error_mapping.items()
2307
if x[1] != errors.InternalError]
2308
for errnum, exception_class in known_errors:
2309
self.rq.active = True
2310
send_failure_and_check(errnum, exception_class)
2312
def test_suppressed_no_internalerror(self):
2313
"""Check that the log is in error for InternalError."""
2314
# prepare what to send
2315
protocol_msg = protocol_pb2.Message()
2316
protocol_msg.type = protocol_pb2.Message.ERROR
2317
protocol_msg.error.type = protocol_pb2.Error.INTERNAL_ERROR
2318
err = errors.InternalError("request", protocol_msg)
2320
self.command._run = lambda: defer.fail(err)
2322
self.assertEqual(self.command.log.logged, "exception")
2324
def test_suppressed_yes_cancelled(self):
2325
"""Check that the log is in warning for Cancelled."""
2326
err = errors.RequestCancelledError("CANCELLED")
2327
self.command._run = lambda: defer.fail(err)
2329
self.assertEqual(self.command.log.logged, "warn")
2331
def test_suppressed_yes_and_retry_when_connectiondone(self):
2332
"""Check that the log is in warning and retries for ConnectionDone."""
2333
self.handle_success = self.deferred.callback(True)
2334
err = twisted_error.ConnectionDone()
2335
runs = [defer.fail(err), defer.succeed(True)]
2336
self.command._run = lambda: runs.pop(0)
2338
self.assertEqual(self.command.log.logged, "warn")
2339
return self.deferred
2341
def test_retry_connectionlost(self):
2342
"""Check that it retries when ConnectionLost."""
2343
self.handle_success = self.deferred.callback(True)
2344
err = twisted_error.ConnectionLost()
2345
runs = [defer.fail(err), defer.succeed(True)]
2346
self.command._run = lambda: runs.pop(0)
2348
return self.deferred
2350
def test_retry_tryagain(self):
2351
"""Check that it retries when TryAgain."""
2352
self.handle_success = self.deferred.callback(True)
2353
protocol_msg = protocol_pb2.Message()
2354
protocol_msg.type = protocol_pb2.Message.ERROR
2355
protocol_msg.error.type = protocol_pb2.Error.TRY_AGAIN
2356
err = errors.TryAgainError("request", protocol_msg)
2357
runs = [defer.fail(err), defer.succeed(True)]
2358
self.command._run = lambda: runs.pop(0)
2360
return self.deferred
2363
class ListSharesTestCase(ConnectedBaseTestCase):
2364
"""Test for ListShares ActionQueueCommand."""
2366
@defer.inlineCallbacks
2369
yield super(ListSharesTestCase, self).setUp()
2370
self.rq = RequestQueue(action_queue=self.action_queue)
2372
def test_queued_mixed_types(self):
2373
"""Command gets queued if other command is waiting."""
2374
cmd1 = FakeCommand()
2376
cmd2 = ListShares(self.rq)
2377
self.assertTrue(cmd2._should_be_queued())
2379
def test_queued_two(self):
2380
"""Two queued commands is not ok."""
2381
cmd1 = ListShares(self.rq)
2383
cmd2 = ListShares(self.rq)
2384
self.assertFalse(cmd2._should_be_queued())
2386
def test_uniqueness(self):
2387
"""Info used for uniqueness."""
2388
cmd = ListShares(self.rq)
2389
self.assertEqual(cmd.uniqueness, 'ListShares')
2392
class ListVolumesTestCase(ConnectedBaseTestCase):
2393
"""Test for ListVolumes ActionQueueCommand."""
2395
@defer.inlineCallbacks
2398
yield super(ListVolumesTestCase, self).setUp()
2399
self.rq = RequestQueue(action_queue=self.action_queue)
2400
self.command = ListVolumes(self.rq)
2402
def test_is_action_queue_command(self):
2403
"""Test proper inheritance."""
2404
self.assertTrue(isinstance(self.command, ActionQueueCommand))
2406
def test_run_returns_a_deferred(self):
2407
"""Test a deferred is returned."""
2408
res = self.command._run()
2409
self.assertIsInstance(res, defer.Deferred)
2410
res.addErrback(self.silent_connection_lost)
2412
def test_run_calls_protocol(self):
2413
"""Test protocol's list_volumes is called."""
2414
original = self.command.action_queue.client.list_volumes
2418
"""Take control over client's feature."""
2421
self.command.action_queue.client.list_volumes = check
2425
self.assertTrue(self.called, 'command was called')
2427
self.command.action_queue.client.list_volumes = original
2429
def test_handle_success_push_event(self):
2430
"""Test AQ_LIST_VOLUMES is pushed on success."""
2431
request = client.ListVolumes(self.action_queue.client)
2432
request.volumes = [FakedVolume(), FakedVolume()]
2433
self.command.handle_success(success=request)
2434
event = ('AQ_LIST_VOLUMES', {'volumes': request.volumes})
2435
self.assertIn(event, self.command.action_queue.event_queue.events)
2437
def test_handle_failure_push_event(self):
2438
"""Test AQ_LIST_VOLUMES_ERROR is pushed on failure."""
2439
msg = 'Something went wrong'
2440
failure = Failure(DefaultException(msg))
2441
self.command.handle_failure(failure=failure)
2442
event = ('AQ_LIST_VOLUMES_ERROR', {'error': msg})
2443
self.assertIn(event, self.command.action_queue.event_queue.events)
2445
def test_queued_mixed_types(self):
2446
"""Command gets queued if other command is waiting."""
2447
cmd1 = FakeCommand()
2449
cmd2 = ListVolumes(self.rq)
2450
self.assertTrue(cmd2._should_be_queued())
2452
def test_queued_two(self):
2453
"""Two queued commands is not ok."""
2454
cmd1 = ListVolumes(self.rq)
2456
cmd2 = ListVolumes(self.rq)
2457
self.assertFalse(cmd2._should_be_queued())
2459
def test_uniqueness(self):
2460
"""Info used for uniqueness."""
2461
cmd = ListVolumes(self.rq)
2462
self.assertEqual(cmd.uniqueness, 'ListVolumes')
2465
class DeleteVolumeTestCase(ConnectedBaseTestCase):
2466
"""Test for DeleteVolume ActionQueueCommand."""
2468
@defer.inlineCallbacks
2471
yield super(DeleteVolumeTestCase, self).setUp()
2473
request_queue = RequestQueue(action_queue=self.action_queue)
2474
self.command = DeleteVolume(request_queue, VOLUME, PATH)
2476
# silence the event to avoid propagation
2477
listener_map = self.action_queue.event_queue.listener_map
2478
del listener_map['AQ_DELETE_VOLUME_OK']
2479
del listener_map['AQ_DELETE_VOLUME_ERROR']
2481
def test_is_action_queue_command(self):
2482
"""Test proper inheritance."""
2483
self.assertTrue(isinstance(self.command, ActionQueueCommand))
2485
def test_init(self):
2486
"""Test creation."""
2487
self.assertEqual(VOLUME, self.command.volume_id)
2489
def test_run_returns_a_deferred(self):
2490
"""Test a deferred is returned."""
2491
res = self.command._run()
2492
self.assertIsInstance(res, defer.Deferred)
2493
res.addErrback(self.silent_connection_lost)
2495
def test_run_calls_protocol(self):
2496
"""Test protocol's delete_volume is called."""
2497
original = self.command.action_queue.client.delete_volume
2500
def check(volume_id):
2501
"""Take control over client's feature."""
2503
self.assertEqual(VOLUME, volume_id)
2505
self.command.action_queue.client.delete_volume = check
2509
self.assertTrue(self.called, 'command was called')
2511
self.command.action_queue.client.delete_volume = original
2513
def test_handle_success_push_event(self):
2514
"""Test AQ_DELETE_VOLUME_OK is pushed on success."""
2515
request = client.DeleteVolume(self.action_queue.client,
2517
self.command.handle_success(success=request)
2518
events = [('AQ_DELETE_VOLUME_OK', {'volume_id': VOLUME})]
2519
self.assertEqual(events, self.command.action_queue.event_queue.events)
2521
def test_handle_failure_push_event(self):
2522
"""Test AQ_DELETE_VOLUME_ERROR is pushed on failure."""
2523
msg = 'Something went wrong'
2524
failure = Failure(DefaultException(msg))
2525
self.command.handle_failure(failure=failure)
2526
event = ('AQ_DELETE_VOLUME_ERROR', {'volume_id': VOLUME, 'error': msg})
2527
self.assertTrue(event in self.command.action_queue.event_queue.events)
2529
def test_path_locking(self):
2530
"""Test that it acquires correctly the path lock."""
2532
self.patch(PathLockingTree, 'acquire',
2533
lambda s, *a, **k: t.extend((a, k)))
2534
self.command._acquire_pathlock()
2535
self.assertEqual(t, [tuple(PATH.split(os.path.sep)), {'logger': None}])
2538
class FilterEventsTestCase(BasicTestCase):
2539
"""Tests for event filtering when a volume is not of our interest."""
2541
@defer.inlineCallbacks
2544
yield super(FilterEventsTestCase, self).setUp()
2545
self.vm = self.main.vm
2546
self.old_home = os.environ.get('HOME', None)
2547
os.environ['HOME'] = self.home
2549
@defer.inlineCallbacks
2552
if self.old_home is None:
2553
os.environ.pop('HOME')
2555
os.environ['HOME'] = self.old_home
2557
yield super(FilterEventsTestCase, self).tearDown()
2560
class ChangePublicAccessTests(ConnectedBaseTestCase):
2561
"""Tests for the ChangePublicAccess ActionQueueCommand."""
2563
@defer.inlineCallbacks
2565
yield super(ChangePublicAccessTests, self).setUp()
2567
request_queue = RequestQueue(action_queue=self.action_queue)
2568
self.command = ChangePublicAccess(request_queue, VOLUME, NODE, True)
2570
def test_change_public_access(self):
2571
"""Test the change_public_access method.."""
2572
self.action_queue.change_public_access(VOLUME, NODE, True)
2574
def test_is_action_queue_command(self):
2575
"""Test proper inheritance."""
2576
self.assertTrue(isinstance(self.command, ActionQueueCommand))
2578
def test_init(self):
2579
"""Test creation."""
2580
self.assertEqual(VOLUME, self.command.share_id)
2581
self.assertEqual(NODE, self.command.node_id)
2582
self.assertEqual(True, self.command.is_public)
2584
def test_run_defers_work_to_thread(self):
2585
"""Test that work is deferred to a thread."""
2586
original = threads.deferToThread
2589
def check(function):
2592
self.command._change_public_access_http, function)
2593
return defer.Deferred()
2595
threads.deferToThread = check
2597
res = self.command._run()
2599
threads.deferToThread = original
2601
self.assertIsInstance(res, defer.Deferred)
2602
self.assertTrue(self.called, "deferToThread was called")
2604
def test_change_public_access_http(self):
2605
"""Test the blocking portion of the command."""
2609
url = 'https://one.ubuntu.com/files/api/set_public/%s:%s' % (
2610
base64.urlsafe_b64encode(VOLUME.bytes).strip("="),
2611
base64.urlsafe_b64encode(NODE.bytes).strip("="))
2612
self.assertEqual(url, request.get_full_url())
2613
self.assertEqual("is_public=True", request.get_data())
2615
'{"is_public": true, "public_url": "http://example.com"}')
2617
from ubuntuone.syncdaemon import action_queue
2618
action_queue.urlopen = check
2620
res = self.command._change_public_access_http()
2622
action_queue.urlopen = urllib2.urlopen
2625
{'is_public': True, 'public_url': 'http://example.com'}, res)
2627
def test_handle_success_push_event(self):
2628
"""Test AQ_CHANGE_PUBLIC_ACCESS_OK is pushed on success."""
2629
response = {'is_public': True, 'public_url': 'http://example.com'}
2630
self.command.handle_success(success=response)
2631
event = ('AQ_CHANGE_PUBLIC_ACCESS_OK',
2632
{'share_id': VOLUME, 'node_id': NODE, 'is_public': True,
2633
'public_url': 'http://example.com'})
2634
self.assertIn(event, self.command.action_queue.event_queue.events)
2636
def test_handle_failure_push_event(self):
2637
"""Test AQ_CHANGE_PUBLIC_ACCESS_ERROR is pushed on failure."""
2638
msg = 'Something went wrong'
2639
failure = Failure(urllib2.HTTPError(
2640
"http://example.com", 500, "Error", [], StringIO(msg)))
2641
self.command.handle_failure(failure=failure)
2642
event = ('AQ_CHANGE_PUBLIC_ACCESS_ERROR',
2643
{'share_id': VOLUME, 'node_id': NODE, 'error': msg})
2644
self.assertIn(event, self.command.action_queue.event_queue.events)
2646
def test_possible_markers(self):
2647
"""Test that it returns the correct values."""
2648
res = [getattr(self.command, x) for x in self.command.possible_markers]
2649
self.assertEqual(res, [NODE])
2652
class GetPublicFilesTestCase(ConnectedBaseTestCase):
2653
"""Tests for GetPublicFiles ActionQueueCommand."""
2655
@defer.inlineCallbacks
2657
yield super(GetPublicFilesTestCase, self).setUp()
2659
self.rq = RequestQueue(action_queue=self.action_queue)
2660
self.command = GetPublicFiles(self.rq)
2662
def test_init(self):
2663
"""Test __init__ method."""
2664
default_url = 'https://one.ubuntu.com/files/api/public_files'
2665
request_queue = RequestQueue(action_queue=self.action_queue)
2666
command = GetPublicFiles(request_queue)
2667
self.assertEqual(command._url, default_url)
2668
custom_url = 'http://example.com:1234/files/api/public_files'
2669
command_2 = GetPublicFiles(request_queue,
2670
base_url='http://example.com:1234')
2671
self.assertEqual(command_2._url, custom_url)
2673
def test_change_public_access(self):
2674
"""Test the get_public_files method.."""
2675
self.action_queue.get_public_files()
2677
def test_is_action_queue_command(self):
2678
"""Test proper inheritance."""
2679
self.assertTrue(isinstance(self.command, ActionQueueCommand))
2681
def test_run_defers_work_to_thread(self):
2682
"""Test that work is deferred to a thread."""
2683
original = threads.deferToThread
2686
def check(function):
2689
self.command._get_public_files_http, function)
2690
return defer.Deferred()
2692
threads.deferToThread = check
2694
res = self.command._run()
2696
threads.deferToThread = original
2698
self.assertIsInstance(res, defer.Deferred)
2699
self.assertTrue(self.called, "deferToThread was called")
2701
def test_get_public_files_http(self):
2702
"""Test the blocking portion of the command."""
2704
node_id = uuid.uuid4()
2705
nodekey = '%s' % (base64.urlsafe_b64encode(node_id.bytes).strip("="))
2706
node_id_2 = uuid.uuid4()
2707
nodekey_2 = '%s' % (base64.urlsafe_b64encode(
2708
node_id_2.bytes).strip("="))
2709
volume_id = uuid.uuid4()
2712
url = 'https://one.ubuntu.com/files/api/public_files'
2713
self.assertEqual(url, request.get_full_url())
2715
'[{"nodekey": "%s", "volume_id": null,"public_url": '
2716
'"http://example.com"}, '
2717
'{"nodekey": "%s", "volume_id": "%s", "public_url": '
2718
'"http://example.com"}]' % (nodekey, nodekey_2, volume_id))
2720
from ubuntuone.syncdaemon import action_queue
2721
action_queue.urlopen = check
2723
res = self.command._get_public_files_http()
2725
action_queue.urlopen = urllib2.urlopen
2726
self.assertEqual([{'node_id': str(node_id), 'volume_id': '',
2727
'public_url': 'http://example.com'},
2728
{'node_id': str(node_id_2),
2729
'volume_id': str(volume_id),
2730
'public_url': 'http://example.com'}], res)
2732
def test_handle_success_push_event(self):
2733
"""Test AQ_PUBLIC_FILES_LIST_OK is pushed on success."""
2734
response = [{'node_id': uuid.uuid4(), 'volume_id':None,
2735
'public_url': 'http://example.com'}]
2736
self.command.handle_success(success=response)
2737
event = ('AQ_PUBLIC_FILES_LIST_OK', {'public_files': response,})
2738
self.assertIn(event, self.command.action_queue.event_queue.events)
2740
def test_handle_failure_push_event(self):
2741
"""Test AQ_PUBLIC_FILES_LIST_ERROR is pushed on failure."""
2742
msg = 'Something went wrong'
2743
failure = Failure(urllib2.HTTPError(
2744
"http://example.com", 500, "Error", [], StringIO(msg)))
2745
self.command.handle_failure(failure=failure)
2746
event = ('AQ_PUBLIC_FILES_LIST_ERROR', {'error': msg})
2747
self.assertIn(event, self.command.action_queue.event_queue.events)
2749
def test_queued_mixed_types(self):
2750
"""Command gets queued if other command is waiting."""
2751
cmd1 = FakeCommand()
2753
cmd2 = GetPublicFiles(self.rq)
2754
self.assertTrue(cmd2._should_be_queued())
2756
def test_queued_two(self):
2757
"""Two queued commands is not ok."""
2758
cmd1 = GetPublicFiles(self.rq)
2760
cmd2 = GetPublicFiles(self.rq)
2761
self.assertFalse(cmd2._should_be_queued())
2763
def test_uniqueness(self):
2764
"""Info used for uniqueness."""
2765
cmd = GetPublicFiles(self.rq)
2766
self.assertEqual(cmd.uniqueness, 'GetPublicFiles')
2769
class DownloadUnconnectedTestCase(FactoryBaseTestCase):
2770
"""Test for Download ActionQueueCommand, no connection"""
2772
@defer.inlineCallbacks
2775
yield super(DownloadUnconnectedTestCase, self).setUp()
2777
self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
2778
self.command = Download(request_queue, share_id='a_share_id',
2779
node_id='a_node_id', server_hash='server_hash',
2780
path='path', fileobj_factory=lambda: None)
2781
self.command.make_logger()
2783
def test_progress_information_setup(self):
2784
"""Test the setting up of the progress information in ._run()."""
2785
self.command.action_queue.connect_in_progress = False
2786
self.command.action_queue.client = FakeClient()
2788
self.assertEqual(self.command.n_bytes_read_last, 0)
2790
self.command.n_bytes_read = 20
2792
self.assertEqual(len(self.action_queue.client.called), 2)
2793
meth, args, kwargs = self.action_queue.client.called[1]
2794
self.assertEqual(meth, 'get_content_request')
2795
self.assertEqual(kwargs['offset'], 20)
2798
class DownloadTestCase(ConnectedBaseTestCase):
2799
"""Test for Download ActionQueueCommand."""
2801
@defer.inlineCallbacks
2804
yield super(DownloadTestCase, self).setUp()
2806
self.rq = RequestQueue(action_queue=self.action_queue)
2807
self.rq.transfers_semaphore = FakeSemaphore()
2809
class MyDownload(Download):
2810
"""Just to allow monkeypatching."""
2811
sync = lambda s: None
2812
self.command = MyDownload(self.rq, share_id='a_share_id',
2813
node_id='a_node_id',
2814
server_hash='server_hash',
2815
path= os.path.join(os.path.sep, 'foo', 'bar'),
2816
fileobj_factory=StringIO)
2817
self.command.make_logger()
2818
self.rq.waiting.append(self.command)
2820
def test_AQ_DOWNLOAD_FILE_PROGRESS_is_valid_event(self):
2821
"""AQ_DOWNLOAD_FILE_PROGRESS is a valid event."""
2822
event = 'AQ_DOWNLOAD_FILE_PROGRESS'
2823
self.assertTrue(event in EVENTS)
2824
self.assertEqual(('share_id', 'node_id', 'n_bytes_read',
2825
'deflated_size'), EVENTS[event])
2827
def test_progress(self):
2828
"""Test the progress machinery."""
2829
# would first get the node attribute including this
2830
class FakeDecompressor(object):
2831
"""Fake decompressor."""
2833
def decompress(self, data):
2838
self.command.gunzip = FakeDecompressor()
2839
self.assertEqual(self.command.n_bytes_read, 0)
2840
self.assertEqual(self.command.n_bytes_read_last, 0)
2841
self.command.node_attr_cb(
2842
deflated_size = TRANSFER_PROGRESS_THRESHOLD * 2)
2844
self.command.downloaded_cb('x' * 5)
2845
events = self.command.action_queue.event_queue.events
2846
self.assertFalse('AQ_DOWNLOAD_FILE_PROGRESS' in [x[0] for x in events])
2847
self.assertEqual(self.command.n_bytes_read, 5)
2848
self.assertEqual(self.command.n_bytes_read_last, 0)
2850
self.command.downloaded_cb('x' * (TRANSFER_PROGRESS_THRESHOLD - 10))
2851
self.assertFalse('AQ_DOWNLOAD_FILE_PROGRESS' in [x[0] for x in events])
2852
self.assertEqual(self.command.n_bytes_read,
2853
TRANSFER_PROGRESS_THRESHOLD - 5)
2854
self.assertEqual(self.command.n_bytes_read_last, 0)
2856
self.command.downloaded_cb('x' * 10)
2857
kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id',
2858
'deflated_size': TRANSFER_PROGRESS_THRESHOLD * 2,
2859
'n_bytes_read': TRANSFER_PROGRESS_THRESHOLD + 5}
2860
expected = ('AQ_DOWNLOAD_FILE_PROGRESS', kwargs)
2861
self.assertTrue(expected in events)
2862
self.assertEqual(self.command.n_bytes_read,
2863
TRANSFER_PROGRESS_THRESHOLD + 5)
2864
self.assertEqual(self.command.n_bytes_read_last,
2865
self.command.n_bytes_read)
2867
def test_possible_markers(self):
2868
"""Test that it returns the correct values."""
2869
res = [getattr(self.command, x) for x in self.command.possible_markers]
2870
self.assertEqual(res, ['a_node_id'])
2872
def test_cancel_set_cancelled(self):
2873
"""Set the command to cancelled."""
2874
assert not self.command.cancelled, "test badly set up"
2875
did_cancel = self.command.cancel()
2876
self.assertTrue(did_cancel)
2877
self.assertTrue(self.command.cancelled)
2879
def test_cancel_download_req_is_none(self):
2880
"""It's ok to have download_req in None when cancelling."""
2881
assert self.command.download_req is None, "test badly set up"
2882
did_cancel = self.command.cancel()
2883
self.assertTrue(did_cancel)
2885
def test_cancel_download_req_is_something(self):
2886
"""download_req is also cancelled."""
2894
self.command.download_req = obj
2895
self.command.cancel()
2897
def test_cancel_remove(self):
2898
"""Remove the command from the queue."""
2902
obj.unqueue(self.command)
2906
self.command._queue = obj
2907
self.command.cancel()
2909
def test_cancel_clean_up(self):
2912
self.command.cleanup = lambda: called.append(True)
2913
self.command.cancel()
2914
self.assertTrue(called)
2916
def test_uniqueness(self):
2917
"""Info used for uniqueness."""
2918
u = self.command.uniqueness
2919
self.assertEqual(u, ('MyDownload', 'a_share_id', 'a_node_id'))
2921
def test_path_locking(self):
2922
"""Test that it acquires correctly the path lock."""
2924
self.patch(PathLockingTree, 'acquire',
2925
lambda s, *a, **k: t.extend((a, k)))
2926
self.command._acquire_pathlock()
2927
self.assertEqual(t, [("", "foo", "bar"), {'logger': self.command.log}])
2929
def test_upload_download_uniqueness(self):
2930
"""There should be only one upload/download for a specific node."""
2931
# totally fake, we don't care: the messages are only validated on run
2932
self.action_queue.download('foo', 'bar', 0, 'path', 0)
2933
first_cmd = self.action_queue.queue.waiting[0]
2934
self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
2935
self.assertEqual(len(self.action_queue.queue.waiting), 1)
2936
self.assertTrue(first_cmd.cancelled)
2938
def test_uniqueness_upload(self):
2939
"""There should be only one upload/download for a specific node."""
2940
# totally fake, we don't care: the messages are only validated on run
2941
self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
2942
first_cmd = self.action_queue.queue.waiting[0]
2943
self.action_queue.download('foo', 'bar', 0, 'path', 0)
2944
self.assertEqual(len(self.action_queue.queue.waiting), 1)
2945
self.assertTrue(first_cmd.cancelled)
2946
self.assertTrue(self.handler.check_debug("Previous command cancelled",
2947
"Upload", "foo", "bar"))
2949
def test_uniqueness_download(self):
2950
"""There should be only one upload/download for a specific node."""
2951
# totally fake, we don't care: the messages are only validated on run
2952
self.action_queue.download('foo', 'bar', 0, 'path0', 0)
2953
first_cmd = self.action_queue.queue.waiting[0]
2954
self.action_queue.download('foo', 'bar', 1, 'path1', 1)
2955
self.assertEqual(len(self.action_queue.queue.waiting), 1)
2956
self.assertTrue(first_cmd.cancelled)
2957
self.assertTrue(self.handler.check_debug("Previous command cancelled",
2958
"Download", "foo", "bar"))
2960
def test_uniqueness_even_with_markers(self):
2961
"""Only one upload/download per node, even using markers."""
2963
self.action_queue.download('share', m, 0, 'path', 0)
2964
first_cmd = self.action_queue.queue.waiting[0]
2965
self.action_queue.uuid_map.set('bar', 'bah')
2966
self.action_queue.download('share', 'bah', 0, 'path', 0)
2967
self.assertEqual(len(self.action_queue.queue.waiting), 1)
2968
self.assertTrue(first_cmd.cancelled)
2970
def test_uniqueness_tried_to_cancel_but_no(self):
2971
"""Previous command didn't cancel even if we tried it."""
2972
# the first command will refuse to cancel (patch the class because
2973
# the instance is not patchable)
2974
self.action_queue.download('foo', 'bar', 0, 'path0', 0)
2975
self.action_queue.queue.waiting[0]
2976
self.patch(Download, 'cancel', lambda instance: False)
2978
self.action_queue.download('foo', 'bar', 1, 'path1', 1)
2979
self.assertEqual(len(self.action_queue.queue.waiting), 2)
2980
self.assertTrue(self.handler.check_debug("Tried to cancel", "couldn't",
2981
"Download", "foo", "bar"))
2983
def test_start_locks_on_semaphore(self):
2984
"""_start acquire the semaphore and locks."""
2985
lock = defer.Deferred()
2986
self.rq.transfers_semaphore.acquire = lambda: lock
2988
# _start and check it locked
2989
started = self.command._start()
2990
self.assertFalse(started.called)
2992
# release the lock and check it finished
2995
self.assertTrue(started.called)
2996
self.assertTrue(self.handler.check_debug('semaphore acquired'))
2997
self.assertIdentical(o, self.command.tx_semaphore)
2999
def test_start_releases_semaphore_if_cancelled(self):
3000
"""Release the semaphore if cancelled while locking."""
3001
lock = defer.Deferred()
3002
self.rq.transfers_semaphore.acquire = lambda: lock
3004
# call start and cancel the command
3005
self.command._start()
3006
self.command.cancelled = True
3015
# check it released the semaphore
3016
self.assertTrue(self.handler.check_debug('semaphore released',
3018
self.assertIdentical(self.command.tx_semaphore, None)
3020
def test_finish_releases_semaphore_if_acquired(self):
3021
"""Test semaphore is released on finish if it was acquired."""
3024
self.command.tx_semaphore = s
3027
self.command.finish()
3028
self.assertEqual(s.count, 0)
3029
self.assertTrue(self.handler.check_debug('semaphore released'))
3031
def test_finish_releases_semaphore_not_there(self):
3032
"""Test semaphore is not released on finish if it was not acquired.
3034
This tests the situation where the command is finished before the lock
3035
was acquired (cancelled even before its _start).
3037
assert self.command.tx_semaphore is None
3038
self.command.finish()
3040
def test_decompressor_restarted(self):
3041
"""Restart the decompressor on each _run (because of retries!)."""
3042
# don't use the real protocol
3043
obj = Mocker().mock()
3045
self.action_queue.client.get_content_request = lambda *a, **k: obj
3048
decompressor1 = self.command.gunzip
3050
decompressor2 = self.command.gunzip
3051
self.assertNotIdentical(decompressor1, decompressor2)
3053
def test_fileobj_in_run(self):
3054
"""Create it first time, reset after that."""
3055
# don't use the real protocol
3056
obj = Mocker().mock()
3058
self.action_queue.client.get_content_request = lambda *a, **k: obj
3060
class FakeFileObjFactory(object):
3061
"""Fake class to check behaviour."""
3064
self.truncate_count = 0
3066
def seek(self, a, b):
3068
self.seek_count += 1
3070
def truncate(self, a):
3071
"""Fake truncate."""
3072
self.truncate_count += 1
3074
cmd = Download(self.rq, 'a_share_id','a_node_id', 'server_hash',
3075
os.path.join(os.path.sep, 'foo','bar'),
3078
# first run, it is just instantiated
3080
self.assertTrue(isinstance(cmd.fileobj, FakeFileObjFactory))
3081
self.assertEqual(cmd.fileobj.seek_count, 0)
3082
self.assertEqual(cmd.fileobj.truncate_count, 0)
3084
# next times it is reset
3086
self.assertEqual(cmd.fileobj.seek_count, 1)
3087
self.assertEqual(cmd.fileobj.truncate_count, 1)
3090
self.assertEqual(cmd.fileobj.seek_count, 2)
3091
self.assertEqual(cmd.fileobj.truncate_count, 2)
3094
class UploadUnconnectedTestCase(FactoryBaseTestCase):
3095
"""Test for Upload ActionQueueCommand, no connection"""
3097
@defer.inlineCallbacks
3100
yield super(UploadUnconnectedTestCase, self).setUp()
3102
class FakeMagicHash(object):
3103
"""Fake magic hash."""
3106
self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
3107
self.command = Upload(request_queue, share_id='a_share_id',
3108
node_id='a_node_id', previous_hash='prev_hash',
3109
hash='yadda', crc32=0, size=0, path='path',
3110
fileobj_factory=lambda: None)
3111
self.command.make_logger()
3112
self.command.magic_hash = FakeMagicHash()
3113
self.client = FakeClient()
3114
self.command.action_queue.client = self.client
3116
def test_upload_progress_wrapper_setup(self):
3117
"""Test the setting up of the progress wrapper in ._run()."""
3118
self.command.action_queue.connect_in_progress = False
3121
self.assertEqual(len(self.client.called), 1)
3122
meth, args, kwargs = self.client.called[0]
3123
self.assertEqual(meth, 'put_content_request')
3124
upload_wrapper = args[7]
3125
self.assertEqual(upload_wrapper.command, self.command)
3127
@defer.inlineCallbacks
3128
def test_client_request(self):
3129
"""Request the corrent operation on the client."""
3131
data = "content to be sent in the upload"
3132
self.command.tempfile = StringIO(data)
3133
self.command.deflated_size = 123
3134
yield self.command._run()
3136
self.assertEqual(len(self.client.called), 1)
3137
meth, args, kwargs = self.client.called[0]
3138
self.assertEqual(meth, 'put_content_request')
3139
self.assertEqual(args[0], 'a_share_id')
3140
self.assertEqual(args[1], 'a_node_id')
3141
self.assertEqual(args[2], 'prev_hash')
3142
self.assertEqual(args[3], 'yadda')
3143
self.assertEqual(args[4], 0)
3144
self.assertEqual(args[5], 0)
3145
self.assertEqual(args[6], 123)
3146
self.assertTrue(isinstance(args[7], UploadProgressWrapper))
3147
self.assertEqual(kwargs['magic_hash'], '666')
3150
class UploadProgressWrapperTestCase(BaseTwistedTestCase):
3151
"""Test for the UploadProgressWrapper helper class."""
3153
def test_reset(self):
3154
"""Reset the values at start."""
3155
f = StringIO("x" * 10 + "y" * 5)
3159
UploadProgressWrapper(f, cmd)
3160
self.assertEqual(cmd.n_bytes_written, 0)
3161
self.assertEqual(cmd.n_bytes_written_last, 0)
3163
# fake as it worked a little, was interrupted, and tried again
3164
cmd.n_bytes_written_last = 1234
3165
cmd.n_bytes_written = 1248
3166
UploadProgressWrapper(f, cmd)
3167
self.assertEqual(cmd.n_bytes_written, 0)
3168
self.assertEqual(cmd.n_bytes_written_last, 0)
3170
def test_read(self):
3171
"""Test the read method."""
3172
class FakeCommand(object):
3176
self.n_bytes_written = 0
3177
self._progress_hook_called = 0
3179
def progress_hook(innerself):
3180
"""Count how many times it was called."""
3181
innerself._progress_hook_called += 1
3183
f = StringIO("x" * 10 + "y" * 5)
3185
upw = UploadProgressWrapper(f, cmd)
3188
self.assertEqual(res, "x" * 10)
3189
self.assertEqual(cmd.n_bytes_written, 10)
3190
self.assertEqual(cmd._progress_hook_called, 1)
3193
self.assertEqual(res, "y" * 5)
3194
self.assertEqual(cmd.n_bytes_written, 15)
3195
self.assertEqual(cmd._progress_hook_called, 2)
3197
def test_seek(self):
3198
"""Test the seek method."""
3199
class FakeCommand(object):
3203
self.n_bytes_written = 0
3204
self.n_bytes_written_last = 0
3205
self._progress_hook_called = 0
3207
def progress_hook(innerself):
3208
"""Count how many times it was called."""
3209
innerself._progress_hook_called += 1
3211
f = StringIO("v" * 10 + "w" * 10 + "x" * 5 + "y" * 5)
3213
upw = UploadProgressWrapper(f, cmd)
3216
self.assertEqual(cmd.n_bytes_written, 10)
3217
self.assertEqual(cmd._progress_hook_called, 0)
3219
self.assertEqual(res, "w" * 10)
3220
self.assertEqual(cmd.n_bytes_written, 20)
3221
self.assertEqual(cmd._progress_hook_called, 1)
3224
self.assertEqual(cmd.n_bytes_written, 25)
3225
self.assertEqual(cmd._progress_hook_called, 1)
3227
self.assertEqual(res, "y" * 5)
3228
self.assertEqual(cmd.n_bytes_written, 30)
3229
self.assertEqual(cmd._progress_hook_called, 2)
3232
class UploadTestCase(ConnectedBaseTestCase):
3233
"""Test for Upload ActionQueueCommand."""
3235
@defer.inlineCallbacks
3238
yield super(UploadTestCase, self).setUp()
3240
self.rq = RequestQueue(action_queue=self.action_queue)
3241
self.rq.transfers_semaphore = FakeSemaphore()
3242
self.rq.unqueue = lambda c: None
3243
self.rq.active = True
3245
class MyUpload(Upload):
3246
"""Just to allow monkeypatching."""
3248
self.share_id = str(uuid.uuid4())
3249
self.command = MyUpload(self.rq, share_id=self.share_id,
3250
node_id='a_node_id', previous_hash='prev_hash',
3251
hash='yadda', crc32=0, size=0,
3252
path=os.path.join(os.path.sep, 'foo', 'bar'),
3253
fileobj_factory=StringIO)
3254
self.command.make_logger()
3256
@defer.inlineCallbacks
3257
def test_upload_in_progress(self):
3258
"""Test Upload retries on UploadInProgress."""
3259
# prepare the failure
3260
protocol_msg = protocol_pb2.Message()
3261
protocol_msg.type = protocol_pb2.Message.ERROR
3262
protocol_msg.error.type = protocol_pb2.Error.UPLOAD_IN_PROGRESS
3263
err = errors.UploadInProgressError("request", protocol_msg)
3265
# first fails with UploadInProgress, then finishes ok
3267
run_deferreds = [defer.fail(err), defer.succeed(True)]
3268
self.command._run = lambda: called.append(':)') or run_deferreds.pop(0)
3270
# wait handle_success
3271
d = defer.Deferred()
3272
self.command.handle_success = lambda _: d.callback(True)
3277
self.assertEqual(called, [':)', ':)'])
3279
def test_handle_success_push_event(self):
3280
"""Test AQ_UPLOAD_FINISHED is pushed on success."""
3281
# create a request and fill it with succesful information
3282
aq_client = TestingProtocol()
3283
request = client.PutContent(aq_client, VOLUME, 'node',
3284
'prvhash', 'newhash', 'crc32', 'size',
3286
request.new_generation = 13
3287
self.command.tempfile = FakeTempFile(self.tmpdir)
3289
# trigger success in the command
3290
self.command.handle_success(request)
3292
# check for successful event
3293
kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
3294
hash='yadda', new_generation=13)
3295
events = [('AQ_UPLOAD_FINISHED', kwargs)]
3296
self.assertEqual(events, self.command.action_queue.event_queue.events)
3298
def test_handle_failure_push_event(self):
3299
"""Test AQ_UPLOAD_ERROR is pushed on failure."""
3300
self.command.tempfile = FakeTempFile(self.tmpdir)
3301
msg = 'Something went wrong'
3302
failure = Failure(DefaultException(msg))
3303
self.command.handle_failure(failure=failure)
3304
kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
3305
hash='yadda', error=msg)
3306
events = [('AQ_UPLOAD_ERROR', kwargs)]
3307
self.assertEqual(events, self.command.action_queue.event_queue.events)
3309
def test_handle_failure_removes_temp_file(self):
3310
"""Test temp file is removed on failure."""
3311
self.command.tempfile = FakeTempFile(self.tmpdir)
3312
assert path_exists(self.command.tempfile.name)
3314
msg = 'Something went wrong'
3315
failure = Failure(DefaultException(msg))
3316
self.command.handle_failure(failure=failure)
3317
self.assertFalse(path_exists(self.command.tempfile.name))
3319
def test_retryable_failure_push_quota_exceeded_if_that_error(self):
3320
"""Test SYS_QUOTA_EXCEEDED is pushed on QuotaExceededError."""
3321
protocol_msg = protocol_pb2.Message()
3322
protocol_msg.type = protocol_pb2.Message.ERROR
3323
protocol_msg.error.type = protocol_pb2.Error.QUOTA_EXCEEDED
3324
protocol_msg.free_space_info.share_id = self.command.share_id
3325
protocol_msg.free_space_info.free_bytes = 1331564676
3326
error = errors.QuotaExceededError("request", protocol_msg)
3327
failure = Failure(error)
3329
self.command.handle_retryable(failure)
3330
event = ('SYS_QUOTA_EXCEEDED', {'volume_id': self.command.share_id,
3331
'free_bytes': 1331564676})
3332
self.assertTrue(event in self.command.action_queue.event_queue.events)
3334
def test_retryable_failure_nothing_on_other_errors(self):
3335
"""Test nothing is pushed on other errors."""
3336
failure = Failure(twisted_error.ConnectionLost())
3337
self.command.handle_retryable(failure)
3339
for x in self.command.action_queue.event_queue.events]
3340
self.assertFalse('SYS_QUOTA_EXCEEDED' in event_names)
3342
def test_AQ_UPLOAD_FILE_PROGRESS_is_valid_event(self):
3343
"""AQ_UPLOAD_FILE_PROGRESS is a valid event."""
3344
event = 'AQ_UPLOAD_FILE_PROGRESS'
3345
self.assertTrue(event in EVENTS)
3346
self.assertEqual(('share_id', 'node_id', 'n_bytes_written',
3347
'deflated_size'), EVENTS[event])
3349
def test_progress_hook(self):
3350
"""Test the progress hook."""
3351
self.command.deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD
3352
self.command.n_bytes_written_last = 0
3354
self.command.n_bytes_written = 5
3355
self.command.progress_hook()
3356
self.assertEqual([], self.command.action_queue.event_queue.events)
3357
self.assertEqual(self.command.n_bytes_written_last, 0)
3359
self.command.n_bytes_written = TRANSFER_PROGRESS_THRESHOLD - 5
3360
self.command.progress_hook()
3361
self.assertEqual([], self.command.action_queue.event_queue.events)
3362
self.assertEqual(self.command.n_bytes_written_last, 0)
3364
self.command.n_bytes_written = TRANSFER_PROGRESS_THRESHOLD + 5
3365
self.command.progress_hook()
3366
kwargs = {'share_id': self.command.share_id, 'node_id': 'a_node_id',
3367
'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD,
3368
'n_bytes_written': 5+TRANSFER_PROGRESS_THRESHOLD }
3369
events = [('AQ_UPLOAD_FILE_PROGRESS', kwargs)]
3370
self.assertEqual(events, self.command.action_queue.event_queue.events)
3371
self.assertEqual(self.command.n_bytes_written_last,
3372
TRANSFER_PROGRESS_THRESHOLD + 5)
3374
def test_runnable_space_ok(self):
3375
"""The upload is runnable if space ok."""
3376
self.action_queue.have_sufficient_space_for_upload = lambda *a: True
3377
self.assertTrue(self.command.is_runnable)
3379
def test_runnable_space_bad(self):
3380
"""The upload is not runnable without free space."""
3381
self.action_queue.have_sufficient_space_for_upload = lambda *a: False
3382
self.assertFalse(self.command.is_runnable)
3384
def test_runnable_space_bad_cancelled(self):
3385
"""The upload is runnable if cancelled even with no free space."""
3386
self.action_queue.have_sufficient_space_for_upload = lambda *a: False
3387
self.command.cancelled = True
3388
self.assertTrue(self.command.is_runnable)
3390
def test_possible_markers(self):
3391
"""Test that it returns the correct values."""
3392
res = [getattr(self.command, x) for x in self.command.possible_markers]
3393
self.assertEqual(res, ['a_node_id'])
3395
def test_cancel_set_cancelled(self):
3396
"""Set the command to cancelled."""
3397
assert not self.command.cancelled, "test badly set up"
3398
self.command.cancel()
3399
self.assertTrue(self.command.cancelled)
3401
def test_cancel_upload_req_is_none(self):
3402
"""It's ok to have upload_req in None when cancelling."""
3403
assert self.command.upload_req is None, "test badly set up"
3404
did_cancel = self.command.cancel()
3405
self.assertTrue(did_cancel)
3407
def test_cancel_abort_when_producer_finished(self):
3408
"""If the producer already finished, don't really cancel."""
3410
self.patch(ActionQueueCommand, 'cancel', lambda s: called.append(1))
3412
class FakeProducer(object):
3413
"""Fake producer."""
3416
fake_request = FakeRequest()
3417
fake_request.producer = FakeProducer()
3418
self.command.upload_req = fake_request
3420
did_cancel = self.command.cancel()
3421
self.assertFalse(did_cancel)
3422
self.assertFalse(called)
3423
self.assertFalse(fake_request.cancelled)
3425
def test_cancel_cancels_when_producer_not_finished(self):
3426
"""If the producer didn't finished, really cancel."""
3428
self.patch(ActionQueueCommand, 'cancel',
3429
lambda s: called.append(True) or True)
3431
class FakeProducer(object):
3432
"""Fake producer."""
3435
fake_request = FakeRequest()
3436
fake_request.producer = FakeProducer()
3437
self.command.upload_req = fake_request
3439
did_cancel = self.command.cancel()
3440
self.assertTrue(did_cancel)
3441
self.assertTrue(called)
3442
self.assertTrue(fake_request.cancelled)
3444
def test_cancel_upload_req_is_something(self):
3445
"""upload_req is also cancelled."""
3455
self.command.upload_req = obj
3456
self.command.cancel()
3458
def test_cancel_remove(self):
3459
"""Remove the command from the queue."""
3463
obj.unqueue(self.command)
3467
self.command._queue = obj
3468
self.command.cancel()
3470
def test_cancel_clean_up(self):
3473
self.command.cleanup = lambda: called.append(True)
3474
self.command.cancel()
3475
self.assertTrue(called)
3477
def test_uniqueness(self):
3478
"""Info used for uniqueness."""
3479
u = self.command.uniqueness
3480
self.assertEqual(u, ('MyUpload', self.share_id, 'a_node_id'))
3482
def test_path_locking(self):
3483
"""Test that it acquires correctly the path lock."""
3485
self.patch(PathLockingTree, 'acquire',
3486
lambda s, *a, **k: t.extend((a, k)))
3487
self.command._acquire_pathlock()
3488
self.assertEqual(t, [("", "foo", "bar"), {'logger': self.command.log}])
3490
def test_uniqueness_upload(self):
3491
"""There should be only one upload/download for a specific node."""
3492
# totally fake, we don't care: the messages are only validated on run
3493
self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0', StringIO)
3494
first_cmd = self.action_queue.queue.waiting[0]
3495
self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1', StringIO)
3496
self.assertEqual(len(self.action_queue.queue.waiting), 1)
3497
self.assertTrue(first_cmd.cancelled)
3498
self.assertTrue(self.handler.check_debug("Previous command cancelled",
3499
"Upload", "foo", "bar"))
3501
def test_uniqueness_download(self):
3502
"""There should be only one upload/download for a specific node."""
3503
# totally fake, we don't care: the messages are only validated on run
3504
self.action_queue.download('foo', 'bar', 0, 'path', StringIO)
3505
first_cmd = self.action_queue.queue.waiting[0]
3506
self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
3507
self.assertEqual(len(self.action_queue.queue.waiting), 1)
3508
self.assertTrue(first_cmd.cancelled)
3509
self.assertTrue(self.handler.check_debug("Previous command cancelled",
3510
"Download", "foo", "bar"))
3512
def test_uniqueness_even_with_markers(self):
3513
"""Only one upload/download per node, even using markers."""
3515
self.action_queue.download('share', m, 0, 'path', StringIO)
3516
first_cmd = self.action_queue.queue.waiting[0]
3517
self.action_queue.uuid_map.set('bar', 'bah')
3518
self.action_queue.upload('share', 'bah', 0, 0, 0, 0, 'path', StringIO)
3519
self.assertEqual(len(self.action_queue.queue.waiting), 1)
3520
self.assertTrue(first_cmd.cancelled)
3522
def test_uniqueness_tried_to_cancel_but_no(self):
3523
"""Previous command didn't cancel even if we tried it."""
3524
# the first command will refuse to cancel
3525
self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0', StringIO)
3526
self.action_queue.queue.waiting[0]
3527
self.patch(Upload, 'cancel', lambda instance: False)
3529
self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1', StringIO)
3530
self.assertEqual(len(self.action_queue.queue.waiting), 2)
3531
self.assertTrue(self.handler.check_debug("Tried to cancel", "couldn't",
3532
"Upload", "foo", "bar"))
3534
def test_start_locks_on_semaphore(self):
3535
"""_start acquire the semaphore and locks."""
3536
lock = defer.Deferred()
3537
self.rq.transfers_semaphore.acquire = lambda: lock
3538
self.action_queue.zip_queue.zip = lambda u: defer.succeed(True)
3540
# _start and check it locked
3541
started = self.command._start()
3542
self.assertFalse(started.called)
3544
# release the lock and check it finished
3547
self.assertTrue(started.called)
3548
self.assertTrue(self.handler.check_debug('semaphore acquired'))
3549
self.assertIdentical(o, self.command.tx_semaphore)
3551
def test_start_releases_semaphore_if_cancelled(self):
3552
"""Release the semaphore if cancelled while locking."""
3553
lock = defer.Deferred()
3554
self.rq.transfers_semaphore.acquire = lambda: lock
3556
# call start and cancel the command
3557
self.command._start()
3558
self.command.cancelled = True
3567
# check it released the semaphore
3568
self.assertTrue(self.handler.check_debug('semaphore released',
3570
self.assertIdentical(self.command.tx_semaphore, None)
3572
def test_finish_releases_semaphore_if_acquired(self):
3573
"""Test semaphore is released on finish if it was acquired."""
3576
self.command.tx_semaphore = s
3579
self.command.finish()
3580
self.assertEqual(s.count, 0)
3581
self.assertTrue(self.handler.check_debug('semaphore released'))
3583
def test_finish_releases_semaphore_not_there(self):
3584
"""Test semaphore is not released on finish if it was not acquired.
3586
This tests the situation where the command is finished before the lock
3587
was acquired (cancelled even before its _start).
3589
assert self.command.tx_semaphore is None
3590
self.command.finish()
3592
def test_handle_upload_id(self):
3593
"""Test the handling of upload_id."""
3594
# change the share_id of the command
3595
self.command.share_id = request.ROOT
3597
path = os.path.join(self.main.root_dir, 'foo')
3598
self.main.fs.create(path=path, share_id=self.command.share_id,
3600
self.main.fs.set_node_id(path, self.command.node_id)
3601
self.command._upload_id_cb('hola')
3602
mdobj = self.main.fs.get_by_node_id(self.command.share_id,
3603
self.command.node_id)
3604
self.assertEqual('hola', mdobj.upload_id)
3606
def test_start_paused_use_upload_id(self):
3607
"""Test that starting a paused command make use of the upload_id."""
3608
mh = content_hash.magic_hash_factory()
3609
self.command.magic_hash = mh.content_hash()
3610
# patch the client to check the args
3611
self.command.action_queue.client = FakeClient()
3612
self.command.tempfile = StringIO()
3613
# change the share_id of the command
3614
self.command.share_id = request.ROOT
3616
path = os.path.join(self.main.root_dir, 'foo')
3617
self.main.fs.create(path=path, share_id=self.command.share_id,
3619
self.main.fs.set_node_id(path, self.command.node_id)
3620
self.action_queue.queue.queue(self.command)
3622
# set the producer attribute
3623
self.command.upload_req.producer = None
3624
# upload id is None as this is the first upload
3625
upload_id = self.command.action_queue.client.called[0][2]['upload_id']
3626
self.assertEqual(upload_id, None)
3627
# set the upload id via the callback
3628
self.command._upload_id_cb('hola')
3630
self.command.pause()
3634
upload_id = self.command.action_queue.client.called[1][2]['upload_id']
3635
self.assertEqual(upload_id, 'hola')
3637
self.command.action_queue.client = None
3639
def test_uses_rb_flags_when_creating_temp_file(self):
3640
"""Check that the 'b' flag is used for the temporary file."""
3641
tempfile = NamedTemporaryFile()
3642
self.assertEqual(tempfile.mode, 'w+b')
3645
class CreateShareTestCase(ConnectedBaseTestCase):
3646
"""Test for CreateShare ActionQueueCommand."""
3648
@defer.inlineCallbacks
3651
yield super(CreateShareTestCase, self).setUp()
3652
self.request_queue = RequestQueue(action_queue=self.action_queue)
3653
self.orig_create_share_http = CreateShare._create_share_http
3655
@defer.inlineCallbacks
3657
yield super(CreateShareTestCase, self).tearDown()
3658
CreateShare._create_share_http = self.orig_create_share_http
3660
@defer.inlineCallbacks
3661
def test_access_level_modify_http(self):
3662
"""Test proper handling of the access level in the http case."""
3663
# replace _create_share_http with a fake, just to check the args
3664
d = defer.Deferred()
3665
def check_create_http(self, node_id, user, name, read_only, deferred):
3666
"""Fire the deferred with the args."""
3667
d.callback((node_id, user, name, read_only))
3668
deferred.callback(None)
3669
CreateShare._create_share_http = check_create_http
3670
command = CreateShare(self.request_queue, 'node_id',
3671
'share_to@example.com', 'share_name',
3672
'Modify', 'marker', 'path')
3673
self.assertTrue(command.use_http, 'CreateShare should be in http mode')
3676
node_id, user, name, read_only = yield d
3677
self.assertEqual('node_id', node_id)
3678
self.assertEqual('share_to@example.com', user)
3679
self.assertEqual('share_name', name)
3680
self.assertFalse(read_only)
3682
@defer.inlineCallbacks
3683
def test_access_level_view_http(self):
3684
"""Test proper handling of the access level in the http case."""
3685
# replace _create_share_http with a fake, just to check the args
3686
d = defer.Deferred()
3687
def check_create_http(self, node_id, user, name, read_only, deferred):
3688
"""Fire the deferred with the args."""
3689
d.callback((node_id, user, name, read_only))
3690
deferred.callback(None)
3691
CreateShare._create_share_http = check_create_http
3692
command = CreateShare(self.request_queue, 'node_id',
3693
'share_to@example.com', 'share_name',
3694
'View', 'marker', 'path')
3695
self.assertTrue(command.use_http, 'CreateShare should be in http mode')
3697
node_id, user, name, read_only = yield d
3698
self.assertEqual('node_id', node_id)
3699
self.assertEqual('share_to@example.com', user)
3700
self.assertEqual('share_name', name)
3701
self.assertTrue(read_only)
3703
def test_possible_markers(self):
3704
"""Test that it returns the correct values."""
3705
cmd = CreateShare(self.request_queue, 'node_id', 'shareto@example.com',
3706
'share_name', 'View', 'marker', 'path')
3707
res = [getattr(cmd, x) for x in cmd.possible_markers]
3708
self.assertEqual(res, ['node_id'])
3710
def test_handle_success_sends_invitation(self):
3711
"""The success handler sends the AQ_SHARE_INVITATION_SENT event."""
3712
marker_id = "marker"
3713
share_name = "share name"
3714
share_to = "share_to@example.com"
3716
class MockResult(object):
3717
"""A mock result."""
3720
mock_success = MockResult()
3721
cmd = CreateShare(self.request_queue, NODE, share_to,
3722
share_name, 'View', marker_id, 'path')
3723
cmd.log = cmd.make_logger()
3725
cmd.handle_success(mock_success)
3727
event_params = { 'marker': marker_id }
3728
events = [('AQ_SHARE_INVITATION_SENT', event_params)]
3729
self.assertEqual(events, cmd.action_queue.event_queue.events)
3731
def test_path_locking(self):
3732
"""Test that it acquires correctly the path lock."""
3734
self.patch(PathLockingTree, 'acquire',
3735
lambda s, *a, **k: t.extend((a, k)))
3736
cmd = CreateShare(self.request_queue, NODE, 'share_to',
3737
'share_name', 'View', 'marker_id',
3738
os.path.join('foo', 'bar'))
3739
cmd._acquire_pathlock()
3740
self.assertEqual(t, [('foo', 'bar'), {'logger': None}])
3743
class DeleteShareTestCase(ConnectedBaseTestCase):
3744
"""Test for DeleteShare ActionQueueCommand."""
3746
@defer.inlineCallbacks
3749
yield super(DeleteShareTestCase, self).setUp()
3750
request_queue = RequestQueue(action_queue=self.action_queue)
3751
self.command = DeleteShare(request_queue, SHARE)
3753
# silence the event to avoid propagation
3754
listener_map = self.action_queue.event_queue.listener_map
3755
del listener_map['AQ_DELETE_SHARE_OK']
3756
del listener_map['AQ_DELETE_SHARE_ERROR']
3758
def test_run_returns_a_deferred(self):
3759
"""Test a deferred is returned."""
3760
res = self.command._run()
3761
self.assertIsInstance(res, defer.Deferred)
3762
res.addErrback(self.silent_connection_lost)
3764
def test_run_calls_protocol(self):
3765
"""Test protocol's delete_volume is called."""
3767
def check(share_id):
3768
"""Take control over client's feature."""
3770
self.assertEqual(SHARE, share_id)
3771
self.patch(self.command.action_queue.client, 'delete_share', check)
3773
self.assertTrue(self.called, "command wasn't called")
3775
def test_handle_success_push_event(self):
3776
"""Test AQ_DELETE_SHARE_OK is pushed on success."""
3777
request = client.DeleteShare(self.action_queue.client, SHARE)
3778
self.command.handle_success(success=request)
3779
events = [('AQ_DELETE_SHARE_OK', {'share_id': SHARE})]
3780
self.assertEqual(events, self.command.action_queue.event_queue.events)
3782
def test_handle_failure_push_event(self):
3783
"""Test AQ_DELETE_SHARE_ERROR is pushed on failure."""
3784
msg = 'Something went wrong'
3785
failure = Failure(DefaultException(msg))
3786
self.command.handle_failure(failure=failure)
3787
events = [('AQ_DELETE_SHARE_ERROR',
3788
{'share_id': SHARE, 'error': msg})]
3789
self.assertEqual(events, self.command.action_queue.event_queue.events)
3792
class SimpleAQTestCase(BasicTestCase):
3793
"""Simple tests for AQ API."""
3795
def test_aq_query_volumes(self):
3796
"""Check the API of AQ.query_volumes."""
3798
d = defer.Deferred()
3800
"""Fake list_volumes."""
3801
result = DummyClass()
3802
result.volumes = ['foo', 'bar']
3803
return defer.succeed(result)
3805
self.action_queue.client = DummyClass()
3806
self.action_queue.client.list_volumes = list_volumes
3807
d = self.action_queue.query_volumes()
3809
self.assertIn('foo', result)
3810
self.assertIn('bar', result)
3812
d.addCallback(check)
3815
def test_have_sufficient_space_for_upload_if_free_space_is_none(self):
3816
"""Check have_sufficient_space_for_upload.
3818
If free_space is None, SYS_QUOTA_EXCEEDED is not pushed.
3821
self.patch(self.action_queue.main.vm, 'get_free_space',
3822
lambda share_id: None) # free space is None
3823
volume_id = 'test share'
3824
res = self.action_queue.have_sufficient_space_for_upload(volume_id,
3826
self.assertTrue(res, "Must have enough space to upload.")
3827
events = map(operator.itemgetter(0),
3828
self.action_queue.event_queue.events)
3829
self.assertNotIn('SYS_QUOTA_EXCEEDED', events)
3831
def test_have_sufficient_space_for_upload_if_no_free_space(self):
3832
"""Check have_sufficient_space_for_upload pushes SYS_QUOTA_EXCEEDED."""
3833
self.patch(self.action_queue.main.vm, 'get_free_space',
3834
lambda share_id: 0) # no free space, always
3835
volume_id = 'test share'
3836
res = self.action_queue.have_sufficient_space_for_upload(volume_id,
3838
self.assertEqual(res, False, "Must not have enough space to upload.")
3839
msg = 'SYS_QUOTA_EXCEEDED must have been pushed to event queue.'
3840
expected = ('SYS_QUOTA_EXCEEDED',
3841
{'volume_id': volume_id, 'free_bytes': 0})
3842
self.assertTrue(expected in self.action_queue.event_queue.events, msg)
3844
def test_have_sufficient_space_for_upload_if_free_space(self):
3845
"""Check have_sufficient_space_for_upload doesn't push any event."""
3846
self.patch(self.action_queue.main.vm, 'get_free_space',
3847
lambda share_id: 1) # free space, always
3848
res = self.action_queue.have_sufficient_space_for_upload(share_id=None,
3850
self.assertEqual(res, True, "Must have enough space to upload.")
3851
msg = 'No event must have been pushed to event queue.'
3852
self.assertEqual(self.action_queue.event_queue.events, [], msg)
3854
def test_SYS_QUOTA_EXCEEDED_is_valid_event(self):
3855
"""SYS_QUOTA_EXCEEDED is a valid event."""
3856
event = 'SYS_QUOTA_EXCEEDED'
3857
self.assertTrue(event in EVENTS)
3858
self.assertEqual(('volume_id', 'free_bytes'), EVENTS[event])
3860
def test_SYS_USER_CONNECT_is_valid_event(self):
3861
"""SYS_USER_CONNECT is a valid event."""
3862
event = 'SYS_USER_CONNECT'
3863
self.assertIn(event, EVENTS)
3864
self.assertEqual(('access_token',), EVENTS[event])
3866
def test_handle_SYS_USER_CONNECT(self):
3867
"""handle_SYS_USER_CONNECT stores credentials."""
3868
self.assertEqual(self.action_queue.token, None)
3869
self.assertEqual(self.action_queue.consumer, None)
3873
expected = oauth.OAuthToken('bla', 'ble')
3874
self.assertEqual(self.action_queue.token.key, expected.key)
3875
self.assertEqual(self.action_queue.token.secret, expected.secret)
3877
expected = oauth.OAuthConsumer('foo', 'bar')
3878
self.assertEqual(self.action_queue.consumer.key, expected.key)
3879
self.assertEqual(self.action_queue.consumer.secret, expected.secret)
3882
class SpecificException(Exception):
3883
"""The specific exception."""
3886
class SillyClass(object):
3887
"""Silly class that accepts the set of any attribute.
3889
We can't use object() directly, since its raises AttributeError.
3894
class ErrorHandlingTestCase(BasicTestCase):
3895
"""Error handling tests for ActionQueue."""
3897
@defer.inlineCallbacks
3900
yield super(ErrorHandlingTestCase, self).setUp()
3903
self.action_queue.client = SillyClass()
3904
self.patch(self.main, 'restart', lambda: None)
3908
def fail_please(self, an_exception):
3909
"""Raise the given exception."""
3910
def inner(*args, **kwargs):
3911
"""A request to the server that fails."""
3913
return defer.fail(an_exception)
3916
def succeed_please(self, result):
3917
"""Return the given result."""
3918
def inner(*args, **kwargs):
3919
"""A request to the server that succeeds."""
3921
return defer.succeed(result)
3924
def mock_caps(self, accepted):
3925
"""Reply to query caps with False."""
3926
def gset_caps(caps):
3927
"""get/set caps helper."""
3930
req.accepted = accepted
3931
return defer.succeed(req)
3934
def test_valid_event(self):
3935
"""SYS_SERVER_ERROR is valid in EventQueue."""
3936
event = 'SYS_SERVER_ERROR'
3937
self.assertTrue(event in EVENTS)
3938
self.assertEqual(('error',), EVENTS[event])
3940
@defer.inlineCallbacks
3941
def test_send_request_and_handle_errors_on_no_error(self):
3942
"""_send_request_and_handle_errors is correct when no error."""
3944
event = 'SYS_SPECIFIC_OK'
3945
EVENTS[event] = () # add event to the global valid events list
3946
self.addCleanup(EVENTS.pop, event)
3949
request = self.succeed_please(result)
3950
kwargs = dict(request=request, request_error=SpecificException,
3951
event_error='YADDA_YADDA', event_ok=event,
3952
args=(1, 2), kwargs={})
3953
d = self.action_queue._send_request_and_handle_errors(**kwargs)
3954
actual_result = yield d
3956
self.assertTrue(self.called, 'the request was called')
3957
self.assertEqual(actual_result, result)
3958
self.assertEqual((event, {}),
3959
self.action_queue.event_queue.events[-1])
3961
# assert over logging
3962
self.assertTrue(self.handler.check_info(request.__name__, 'OK'))
3964
@defer.inlineCallbacks
3965
def test_send_request_and_handle_errors_with_no_event_ok(self):
3966
"""_send_request_and_handle_errors does not push event if is None."""
3967
original_events = self.action_queue.event_queue.events[:]
3970
request = self.succeed_please(result)
3971
kwargs = dict(request=request, request_error=SpecificException,
3972
event_error='YADDA_YADDA', event_ok=None)
3973
d = self.action_queue._send_request_and_handle_errors(**kwargs)
3974
actual_result = yield d
3976
self.assertTrue(self.called, 'the request was called')
3977
self.assertEqual(actual_result, result)
3978
self.assertEqual(original_events,
3979
self.action_queue.event_queue.events)
3981
# assert over logging
3982
self.assertTrue(self.handler.check_info(request.__name__, 'OK'))
3984
@defer.inlineCallbacks
3985
def test_send_request_and_handle_errors_on_valid_error(self):
3986
"""_send_request_and_handle_errors is correct when expected error."""
3988
event = 'SYS_SPECIFIC_ERROR'
3989
EVENTS[event] = ('error',) # add event to the global valid events list
3990
self.addCleanup(EVENTS.pop, event)
3992
exc = SpecificException('The request failed! please be happy.')
3993
request = self.fail_please(exc)
3994
kwargs = dict(request=request, request_error=SpecificException,
3995
event_error=event, event_ok='YADDA_YADDA')
3996
d = self.action_queue._send_request_and_handle_errors(**kwargs)
3999
self.assertTrue(self.called, 'the request was called')
4000
self.assertEqual((event, {'error': str(exc)}),
4001
self.action_queue.event_queue.events[-1])
4003
# assert over logging
4004
self.assertTrue(self.handler.check_info(request.__name__,
4007
@defer.inlineCallbacks
4008
def assert_send_request_and_handle_errors_on_connection_end(self, exc):
4009
"""_send_request_and_handle_errors is ok when connection lost/done."""
4010
request = self.fail_please(exc)
4011
kwargs = dict(request=request, request_error=SpecificException,
4012
event_error='BAR', event_ok='FOO')
4013
d = self.action_queue._send_request_and_handle_errors(**kwargs)
4016
# check that SYS_UNKNOWN_ERROR wasn't sent, and that logged ok
4017
events = self.action_queue.event_queue.events
4018
self.assertNotIn('SYS_UNKNOWN_ERROR', [x[0] for x in events])
4019
self.assertTrue(self.handler.check_info(request.__name__, str(exc)))
4021
def test_send_request_and_handle_errors_on_connection_lost(self):
4022
"""_send_request_and_handle_errors is correct when connection lost."""
4023
e = twisted_error.ConnectionLost()
4024
return self.assert_send_request_and_handle_errors_on_connection_end(e)
4026
def test_send_request_and_handle_errors_on_connection_done(self):
4027
"""_send_request_and_handle_errors is correct when connection lost."""
4028
e = twisted_error.ConnectionDone()
4029
return self.assert_send_request_and_handle_errors_on_connection_end(e)
4031
def test_send_request_and_handle_errors_on_ssl_error(self):
4032
"""_send_request_and_handle_errors is correct when get a SSL error."""
4033
e = OpenSSL.SSL.Error()
4034
return self.assert_send_request_and_handle_errors_on_connection_end(e)
4036
@defer.inlineCallbacks
4037
def assert_send_request_and_handle_errors_on_server_error(self, serr):
4038
"""_send_request_and_handle_errors is correct when server error."""
4039
# XXX: we need to replace this list with and exception list
4040
# once bug #557718 is resolved
4041
msg = protocol_pb2.Message()
4042
msg.type = protocol_pb2.Message.ERROR
4043
msg.error.type = serr
4044
msg.error.comment = 'Error message for %s.' % serr
4045
exc = errors.error_to_exception(serr)(request=None, message=msg)
4047
request = self.fail_please(exc)
4048
kwargs = dict(request=request, request_error=SpecificException,
4049
event_error='BAR', event_ok='FOO')
4050
d = self.action_queue._send_request_and_handle_errors(**kwargs)
4053
event = 'SYS_SERVER_ERROR'
4054
self.assertEqual((event, {'error': str(exc)}),
4055
self.action_queue.event_queue.events[-1])
4057
# assert over logging
4058
self.assertTrue(self.handler.check_info(request.__name__,
4061
@defer.inlineCallbacks
4062
def test_send_request_and_handle_errors_on_try_again(self):
4063
"""_send_request_and_handle_errors is correct when server error."""
4064
serr = protocol_pb2.Error.TRY_AGAIN
4065
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4067
@defer.inlineCallbacks
4068
def test_send_request_and_handle_errors_on_internal_error(self):
4069
"""_send_request_and_handle_errors is correct when server error."""
4070
serr = protocol_pb2.Error.INTERNAL_ERROR
4071
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4073
@defer.inlineCallbacks
4074
def test_send_request_and_handle_errors_on_protocol_error(self):
4075
"""_send_request_and_handle_errors is correct when server error."""
4076
serr = protocol_pb2.Error.PROTOCOL_ERROR
4077
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4079
@defer.inlineCallbacks
4080
def test_send_request_and_handle_errors_on_unsupported_version(self):
4081
"""_send_request_and_handle_errors is correct when server error."""
4082
serr = protocol_pb2.Error.UNSUPPORTED_VERSION
4083
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4085
@defer.inlineCallbacks
4086
def test_send_request_and_handle_errors_on_authetication_failed(self):
4087
"""_send_request_and_handle_errors is correct when server error."""
4088
serr = protocol_pb2.Error.AUTHENTICATION_FAILED
4089
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4091
@defer.inlineCallbacks
4092
def test_send_request_and_handle_errors_on_no_permission(self):
4093
"""_send_request_and_handle_errors is correct when server error."""
4094
serr = protocol_pb2.Error.NO_PERMISSION
4095
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4097
@defer.inlineCallbacks
4098
def test_send_request_and_handle_errors_on_already_exists(self):
4099
"""_send_request_and_handle_errors is correct when server error."""
4100
serr = protocol_pb2.Error.ALREADY_EXISTS
4101
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4103
@defer.inlineCallbacks
4104
def test_send_request_and_handle_errors_on_does_not_exist(self):
4105
"""_send_request_and_handle_errors is correct when server error."""
4106
serr = protocol_pb2.Error.DOES_NOT_EXIST
4107
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4109
@defer.inlineCallbacks
4110
def test_send_request_and_handle_errors_on_not_a_dir(self):
4111
"""_send_request_and_handle_errors is correct when server error."""
4112
serr = protocol_pb2.Error.NOT_A_DIRECTORY
4113
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4115
@defer.inlineCallbacks
4116
def test_send_request_and_handle_errors_on_not_empty(self):
4117
"""_send_request_and_handle_errors is correct when server error."""
4118
serr = protocol_pb2.Error.NOT_EMPTY
4119
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4121
@defer.inlineCallbacks
4122
def test_send_request_and_handle_errors_on_not_available(self):
4123
"""_send_request_and_handle_errors is correct when server error."""
4124
serr = protocol_pb2.Error.NOT_AVAILABLE
4125
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4127
@defer.inlineCallbacks
4128
def test_send_request_and_handle_errors_on_upload_in_progress(self):
4129
"""_send_request_and_handle_errors is correct when server error."""
4130
serr = protocol_pb2.Error.UPLOAD_IN_PROGRESS
4131
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4133
@defer.inlineCallbacks
4134
def test_send_request_and_handle_errors_on_upload_corrupt(self):
4135
"""_send_request_and_handle_errors is correct when server error."""
4136
serr = protocol_pb2.Error.UPLOAD_CORRUPT
4137
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4139
@defer.inlineCallbacks
4140
def test_send_request_and_handle_errors_on_upload_canceled(self):
4141
"""_send_request_and_handle_errors is correct when server error."""
4142
serr = protocol_pb2.Error.UPLOAD_CANCELED
4143
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4145
@defer.inlineCallbacks
4146
def test_send_request_and_handle_errors_on_conflict(self):
4147
"""_send_request_and_handle_errors is correct when server error."""
4148
serr = protocol_pb2.Error.CONFLICT
4149
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4151
@defer.inlineCallbacks
4152
def test_send_request_and_handle_errors_on_quota_exceeded(self):
4153
"""_send_request_and_handle_errors is correct when server error."""
4154
serr = protocol_pb2.Error.QUOTA_EXCEEDED
4155
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4157
@defer.inlineCallbacks
4158
def test_send_request_and_handle_errors_on_invalid_filename(self):
4159
"""_send_request_and_handle_errors is correct when server error."""
4160
serr = protocol_pb2.Error.INVALID_FILENAME
4161
yield self.assert_send_request_and_handle_errors_on_server_error(serr)
4163
@defer.inlineCallbacks
4164
def test_send_request_and_handle_errors_on_unknown_error(self):
4165
"""_send_request_and_handle_errors is correct when unknown error."""
4166
# XXX: we need to replace this list with and exception list
4167
# once bug #557718 is resolved
4168
serr = protocol_pb2.Error.AUTHENTICATION_REQUIRED
4169
msg = protocol_pb2.Message()
4170
msg.type = protocol_pb2.Message.ERROR
4171
msg.error.type = serr
4172
msg.error.comment = 'Error message for %s.' % serr
4173
exc = errors.error_to_exception(serr)(request=None, message=msg)
4175
request = self.fail_please(exc)
4176
kwargs = dict(request=request, request_error=SpecificException,
4177
event_error='BAR', event_ok='FOO')
4178
d = self.action_queue._send_request_and_handle_errors(**kwargs)
4181
event = 'SYS_UNKNOWN_ERROR'
4182
self.assertIn((event, {}), self.action_queue.event_queue.events)
4184
# assert over logging
4185
self.assertTrue(self.handler.check_info(request.__name__,
4188
@defer.inlineCallbacks
4189
def test_send_request_and_handle_errors_on_no_protocol_error(self):
4190
"""_send_request_and_handle_errors is ok when no-protocol error."""
4192
event = 'SYS_UNKNOWN_ERROR'
4193
error_msg = 'Error message for any Exception.'
4194
exc = Exception(error_msg)
4195
request = self.fail_please(exc)
4196
kwargs = dict(request=request, request_error=SpecificException,
4197
event_error='BAR', event_ok='FOO')
4198
d = self.action_queue._send_request_and_handle_errors(**kwargs)
4201
self.assertIn((event, {}), self.action_queue.event_queue.events)
4203
# assert over logging
4204
self.assertTrue(self.handler.check_info(request.__name__,
4207
@defer.inlineCallbacks
4208
def test_send_request_and_handle_errors_on_client_mismatch(self):
4209
"""_send_request_and_handle_errors is correct when client mismatch."""
4211
def change_client(*args, **kwargs):
4212
"""Change AQ's client while doing the request."""
4213
self.action_queue.client = object()
4215
self.action_queue.event_queue.events = [] # event cleanup
4216
kwargs = dict(request=change_client, request_error=SpecificException,
4217
event_error='BAR', event_ok='FOO')
4218
d = self.action_queue._send_request_and_handle_errors(**kwargs)
4221
self.assertEqual([], self.action_queue.event_queue.events)
4223
# assert over logging
4224
self.assertTrue(self.handler.check_warning(change_client.__name__,
4227
@defer.inlineCallbacks
4228
def test_check_version_when_unsupported_version_exception(self):
4229
"""Test error handling after UnsupportedVersionError."""
4230
# raise a UnsupportedVersionError
4231
msg = protocol_pb2.Message()
4232
msg.type = protocol_pb2.Message.ERROR
4233
msg.error.type = protocol_pb2.Error.UNSUPPORTED_VERSION
4234
msg.error.comment = 'This is a funny comment.'
4235
exc = errors.UnsupportedVersionError(request=None, message=msg)
4237
self.action_queue.client.protocol_version = self.fail_please(exc)
4238
yield self.action_queue.check_version()
4239
event = ('SYS_PROTOCOL_VERSION_ERROR', {'error': str(exc)})
4240
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4242
@defer.inlineCallbacks
4243
def test_set_capabilities_when_query_caps_not_accepted(self):
4244
"""Test error handling when the query caps are not accepeted."""
4246
# query_caps returns False
4247
self.action_queue.client.query_caps = self.mock_caps(accepted=False)
4249
yield self.action_queue.set_capabilities(caps=None)
4250
msg = "The server doesn't have the requested capabilities"
4251
event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
4252
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4253
self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
4254
self.action_queue.event_queue.events)
4256
@defer.inlineCallbacks
4257
def test_set_capabilities_when_set_caps_not_accepted(self):
4258
"""Test error handling when the query caps are not accepted."""
4260
# query_caps returns True and set_caps returns False
4261
self.action_queue.client.query_caps = self.mock_caps(accepted=True)
4262
self.action_queue.client.set_caps = self.mock_caps(accepted=False)
4264
caps = 'very difficult cap'
4265
yield self.action_queue.set_capabilities(caps=caps)
4266
msg = "The server denied setting '%s' capabilities" % caps
4267
event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
4268
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4269
self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
4270
self.action_queue.event_queue.events)
4272
@defer.inlineCallbacks
4273
def test_set_capabilities_when_client_is_none(self):
4274
"""Test error handling when the client is None."""
4276
self.action_queue.client = None
4278
yield self.action_queue.set_capabilities(caps=None)
4279
msg = "'NoneType' object has no attribute 'query_caps'"
4280
event = ('SYS_SET_CAPABILITIES_ERROR', {'error': msg})
4281
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4282
self.assertNotIn(('SYS_SET_CAPABILITIES_OK', {}),
4283
self.action_queue.event_queue.events)
4285
@defer.inlineCallbacks
4286
def test_set_capabilities_when_set_caps_is_accepted(self):
4287
"""Test error handling when the query caps are not accepeted."""
4289
# query_caps returns True and set_caps returns True
4290
self.action_queue.client.query_caps = self.mock_caps(accepted=True)
4291
self.action_queue.client.set_caps = self.mock_caps(accepted=True)
4293
yield self.action_queue.set_capabilities(caps=None)
4294
event = ('SYS_SET_CAPABILITIES_OK', {})
4295
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4297
@defer.inlineCallbacks
4298
def test_authenticate_when_authenticated(self):
4299
"""Test error handling after authenticate with no error."""
4300
request = client.Authenticate(self.action_queue.client,
4301
{'dummy_token': 'credentials'})
4302
request.session_id = str(uuid.uuid4())
4303
self.action_queue.client.oauth_authenticate = \
4304
self.succeed_please(result=request)
4305
yield self.action_queue.authenticate()
4306
event = ('SYS_AUTH_OK', {})
4307
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4309
@defer.inlineCallbacks
4310
def test_authenticate_when_authentication_failed_exception(self):
4311
"""Test error handling after AuthenticationFailedError."""
4312
# raise a AuthenticationFailedError
4313
msg = protocol_pb2.Message()
4314
msg.type = protocol_pb2.Message.ERROR
4315
msg.error.type = protocol_pb2.Error.AUTHENTICATION_FAILED
4316
msg.error.comment = 'This is a funny comment.'
4317
exc = errors.AuthenticationFailedError(request=None, message=msg)
4319
self.action_queue.client.oauth_authenticate = self.fail_please(exc)
4320
yield self.action_queue.authenticate()
4321
event = ('SYS_AUTH_ERROR', {'error': str(exc)})
4322
self.assertEqual(event, self.action_queue.event_queue.events[-1])
4325
class GetDeltaTestCase(ConnectedBaseTestCase):
4326
"""Test for GetDelta ActionQueueCommand."""
4328
@defer.inlineCallbacks
4331
yield super(GetDeltaTestCase, self).setUp()
4332
self.rq = RequestQueue(action_queue=self.action_queue)
4334
def test_action_queue_get_delta(self):
4335
"""Test AQ get delta."""
4336
self.action_queue.get_delta(VOLUME, 0)
4338
def test_is_action_queue_command(self):
4339
"""Test proper inheritance."""
4340
cmd = GetDelta(self.rq, VOLUME, 0)
4341
self.assertTrue(isinstance(cmd, ActionQueueCommand))
4343
def test_run_returns_a_deferred(self):
4344
"""Test a deferred is returned."""
4345
cmd = GetDelta(self.rq, VOLUME, 0)
4347
self.assertIsInstance(res, defer.Deferred)
4348
res.addErrback(self.silent_connection_lost)
4350
def test_run_calls_protocol(self):
4351
"""Test protocol's get delta is called."""
4353
self.patch(self.action_queue.client, 'get_delta',
4354
lambda *a: called.append(a))
4356
cmd = GetDelta(self.rq, VOLUME, 35)
4358
self.assertEqual(called[0], (VOLUME, 35))
4360
def test_handle_success_push_event(self):
4361
"""Test AQ_DELTA_OK is pushed on success."""
4362
# create a request and fill it with succesful information
4363
request = client.GetDelta(self.action_queue.client,
4364
share_id=VOLUME, from_generation=21)
4365
request.response = ['foo', 'bar']
4366
request.end_generation = 76
4368
request.free_bytes = 1231234
4370
# create a command and trigger it success
4371
cmd = GetDelta(self.rq, VOLUME, 21)
4372
cmd.handle_success(request)
4374
# check for successful event
4375
received = self.action_queue.event_queue.events[0]
4376
delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'],
4378
full=True, free_bytes=1231234)
4379
self.assertEqual(received, ('AQ_DELTA_OK', delta_info))
4380
self.assertTrue(isinstance(received[1]["delta_content"], DeltaList))
4382
def test_handle_generic_failure_push_event(self):
4383
"""Test AQ_DELTA_ERROR is pushed on failure."""
4385
msg = 'Something went wrong'
4386
failure = Failure(DefaultException(msg))
4388
# create a command and trigger it success
4389
cmd = GetDelta(self.rq, VOLUME, 77)
4390
cmd.handle_failure(failure=failure)
4393
received = self.action_queue.event_queue.events[0]
4394
self.assertEqual(received, ('AQ_DELTA_ERROR',
4395
{'volume_id': VOLUME, 'error': msg}))
4397
def test_handle_notpossible_failure_push_event(self):
4398
"""Test AQ_DELTA_NOT_POSSIBLE is pushed on that failure."""
4400
msg = protocol_pb2.Message()
4401
msg.type = protocol_pb2.Message.ERROR
4402
msg.error.type = protocol_pb2.Error.CANNOT_PRODUCE_DELTA
4403
msg.error.comment = 'Something went wrong'
4404
failure = Failure(errors.CannotProduceDelta(self.rq, msg))
4406
# create a command and trigger it success
4407
cmd = GetDelta(self.rq, VOLUME, 2)
4408
cmd.handle_failure(failure=failure)
4411
received = self.action_queue.event_queue.events[0]
4412
self.assertEqual(received, ('AQ_DELTA_NOT_POSSIBLE',
4413
{'volume_id': VOLUME}))
4415
def test_queued_mixed_types(self):
4416
"""Command gets queued if other command is waiting."""
4417
cmd1 = FakeCommand()
4419
cmd2 = GetDelta(self.rq, 'vol2', 0)
4420
self.assertTrue(cmd2._should_be_queued())
4422
def test_queued_two_different(self):
4423
"""Two different queued commands is ok."""
4424
cmd1 = GetDelta(self.rq, 'vol1', 0)
4426
cmd2 = GetDelta(self.rq, 'vol2', 0)
4427
self.assertTrue(cmd2._should_be_queued())
4429
def test_queued_two_equal_second_bigger(self):
4430
"""When two equals, only survive the one with smaller gen, first."""
4431
cmd1 = GetDelta(self.rq, 'vol', 3)
4433
cmd2 = GetDelta(self.rq, 'vol', 5)
4435
self.assertFalse(cmd2._should_be_queued())
4436
self.assertTrue(self.handler.check_debug("not queueing self"))
4438
def test_queued_two_equal_second_smaller_first_not_running(self):
4439
"""Two equals, survive the one with smaller gen, second, not run."""
4440
cmd1 = GetDelta(self.rq, 'vol', 5)
4442
cmd1.running = False
4443
cmd2 = GetDelta(self.rq, 'vol', 3)
4445
self.assertTrue(cmd2._should_be_queued())
4446
self.assertFalse(cmd1 in self.rq.waiting)
4447
self.assertTrue(self.handler.check_debug("removing previous command"))
4449
def test_queued_two_equal_second_smaller_first_running(self):
4450
"""Two equals, survive the one with smaller gen, second, running."""
4451
cmd1 = GetDelta(self.rq, 'vol', 5)
4453
cmd2 = GetDelta(self.rq, 'vol', 3)
4455
self.assertTrue(cmd2._should_be_queued())
4457
def test_queued_three_equal(self):
4458
"""When several equals, only survive the one with smaller gen."""
4459
cmd1 = GetDelta(self.rq, 'vol', 5)
4461
cmd1.running = False
4462
cmd2 = GetDelta(self.rq, 'vol', 3)
4464
assert cmd2._should_be_queued()
4466
cmd3 = GetDelta(self.rq, 'vol', 7)
4468
self.assertFalse(cmd3._should_be_queued())
4469
self.assertFalse(cmd1 in self.rq.waiting)
4471
def test_uniqueness(self):
4472
"""Info used for uniqueness."""
4473
cmd = GetDelta(self.rq, 'vol', 1)
4474
self.assertEqual(cmd.uniqueness, ('GetDelta', 'vol'))
4477
class GetDeltaFromScratchTestCase(ConnectedBaseTestCase):
4478
"""Test for GetDelta ActionQueueCommand."""
4480
@defer.inlineCallbacks
4483
yield super(GetDeltaFromScratchTestCase, self).setUp()
4484
self.rq = RequestQueue(action_queue=self.action_queue)
4486
def test_action_queue_get_delta(self):
4487
"""Test AQ get delta."""
4488
self.action_queue.rescan_from_scratch(VOLUME)
4490
def test_is_action_queue_command(self):
4491
"""Test proper inheritance."""
4492
cmd = GetDeltaFromScratch(self.rq, VOLUME)
4493
self.assertTrue(isinstance(cmd, ActionQueueCommand))
4495
def test_run_returns_a_deferred(self):
4496
"""Test a deferred is returned."""
4497
cmd = GetDelta(self.rq, VOLUME, 0)
4499
self.assertIsInstance(res, defer.Deferred)
4500
res.addErrback(self.silent_connection_lost)
4502
def test_run_calls_protocol(self):
4503
"""Test protocol's get delta is called."""
4505
self.patch(self.action_queue.client, 'get_delta',
4506
lambda *a, **b: called.append((a, b)))
4508
cmd = GetDeltaFromScratch(self.rq, VOLUME)
4510
self.assertEqual(called[0], ((VOLUME,), {'from_scratch': True}))
4512
def test_handle_success_push_event(self):
4513
"""Test AQ_DELTA_OK is pushed on success."""
4514
# create a request and fill it with succesful information
4515
request = client.GetDelta(self.action_queue.client,
4516
share_id=VOLUME, from_scratch=True)
4517
request.response = ['foo', 'bar']
4518
request.end_generation = 76
4520
request.free_bytes = 1231234
4522
# create a command and trigger it success
4523
cmd = GetDeltaFromScratch(self.rq, VOLUME)
4524
cmd.handle_success(request)
4526
# check for successful event
4527
received = self.action_queue.event_queue.events[0]
4528
delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'],
4531
self.assertEqual(received, ('AQ_RESCAN_FROM_SCRATCH_OK', delta_info))
4532
self.assertTrue(isinstance(received[1]["delta_content"], DeltaList))
4534
def test_handle_generic_failure_push_event(self):
4535
"""Test AQ_DELTA_ERROR is pushed on failure."""
4537
msg = 'Something went wrong'
4538
failure = Failure(DefaultException(msg))
4540
# create a command and trigger it success
4541
cmd = GetDeltaFromScratch(self.rq, VOLUME)
4542
cmd.handle_failure(failure=failure)
4545
received = self.action_queue.event_queue.events[0]
4546
self.assertEqual(received, ('AQ_RESCAN_FROM_SCRATCH_ERROR',
4547
{'volume_id': VOLUME, 'error': msg}))
4549
def test_queued_mixed_types(self):
4550
"""Command gets queued if other command is waiting."""
4551
cmd1 = FakeCommand()
4553
cmd2 = GetDeltaFromScratch(self.rq, 'vol2')
4554
self.assertTrue(cmd2._should_be_queued())
4556
def test_queued_two_different(self):
4557
"""Two different queued commands is ok."""
4558
cmd1 = GetDeltaFromScratch(self.rq, 'vol1')
4560
cmd2 = GetDeltaFromScratch(self.rq, 'vol2')
4561
self.assertTrue(cmd2._should_be_queued())
4563
def test_queued_two_equal(self):
4564
"""When two equals, only survive the first one."""
4565
cmd1 = GetDeltaFromScratch(self.rq, 'vol')
4567
cmd2 = GetDeltaFromScratch(self.rq, 'vol')
4569
self.assertFalse(cmd2._should_be_queued())
4570
self.assertTrue(self.handler.check_debug("not queueing self"))
4572
def test_uniqueness(self):
4573
"""Info used for uniqueness."""
4574
cmd = GetDeltaFromScratch(self.rq, 'vol')
4575
self.assertEqual(cmd.uniqueness, ('GetDeltaFromScratch', 'vol'))
4578
class UnlinkTestCase(ConnectedBaseTestCase):
4579
"""Test for Unlink ActionQueueCommand."""
4581
@defer.inlineCallbacks
4584
yield super(UnlinkTestCase, self).setUp()
4585
self.rq = RequestQueue(action_queue=self.action_queue)
4587
def test_handle_success_push_event_file(self):
4588
"""Test AQ_UNLINK_OK is pushed on success for a file."""
4589
sample_path = "sample path"
4590
# create a request and fill it with succesful information
4591
request = client.Unlink(self.action_queue.client, VOLUME, 'node_id')
4592
request.new_generation = 13
4594
# create a command and trigger it success
4595
cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', sample_path,
4597
cmd.handle_success(request)
4599
# check for successful event
4600
received = self.action_queue.event_queue.events[0]
4601
info = dict(share_id=VOLUME, parent_id='parent_id',
4602
node_id='node_id', new_generation=13,
4603
was_dir=False, old_path=sample_path)
4604
self.assertEqual(received, ('AQ_UNLINK_OK', info))
4606
def test_handle_success_push_event_directory(self):
4607
"""Test AQ_UNLINK_OK is pushed on success for a directory."""
4608
# create a request and fill it with succesful information
4609
request = client.Unlink(self.action_queue.client, VOLUME, 'node_id')
4610
request.new_generation = 13
4612
# create a command and trigger it success
4613
cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', 'test_path',
4615
cmd.handle_success(request)
4617
full_path = "test_path"
4619
# check for successful event
4620
received = self.action_queue.event_queue.events[0]
4621
info = dict(share_id=VOLUME, parent_id='parent_id',
4622
node_id='node_id', new_generation=13,
4623
was_dir=True, old_path=full_path)
4624
self.assertEqual(received, ('AQ_UNLINK_OK', info))
4626
def test_possible_markers(self):
4627
"""Test that it returns the correct values."""
4628
cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id', 'path', False)
4629
res = [getattr(cmd, x) for x in cmd.possible_markers]
4630
self.assertEqual(res, ['node_id', 'parent_id'])
4632
def test_path_locking(self):
4633
"""Test that it acquires correctly the path lock."""
4635
self.patch(PathLockingTree, 'acquire',
4636
lambda s, *a, **k: t.extend((a, k)))
4637
cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id',
4638
os.path.join('foo','bar'), False)
4639
cmd._acquire_pathlock()
4640
self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
4641
'on_children': True,
4645
class MoveTestCase(ConnectedBaseTestCase):
4646
"""Test for Move ActionQueueCommand."""
4648
@defer.inlineCallbacks
4651
yield super(MoveTestCase, self).setUp()
4652
self.rq = RequestQueue(action_queue=self.action_queue)
4654
def test_handle_success_push_event(self):
4655
"""Test AQ_MOVE_OK is pushed on success."""
4656
# create a request and fill it with succesful information
4657
request = client.Move(self.action_queue.client, VOLUME, 'node',
4658
'new_parent', 'new_name')
4659
request.new_generation = 13
4661
# create a command and trigger it success
4662
cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
4663
'path_from', 'path_to')
4664
cmd.handle_success(request)
4666
# check for successful event
4667
received = self.action_queue.event_queue.events[0]
4668
info = dict(share_id=VOLUME, node_id='node', new_generation=13)
4669
self.assertEqual(received, ('AQ_MOVE_OK', info))
4671
def test_possible_markers(self):
4672
"""Test that it returns the correct values."""
4673
cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
4674
'path_from', 'path_to')
4675
res = [getattr(cmd, x) for x in cmd.possible_markers]
4676
self.assertEqual(res, ['node', 'o_parent', 'n_parent'])
4678
def test_uniqueness(self):
4679
"""Info used for uniqueness."""
4680
cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
4681
'path_from', 'path_to')
4682
self.assertEqual(cmd.uniqueness, ('Move', VOLUME, 'node'))
4684
def test_path_locking(self):
4685
"""Test that it acquires correctly the path lock."""
4687
fake_acquire = lambda s, *a, **k: t.extend((a, k)) or defer.succeed(None)
4688
self.patch(PathLockingTree, 'acquire', fake_acquire)
4689
cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name',
4690
os.path.join(os.path.sep, 'path', 'from'),
4691
os.path.join(os.path.sep, 'path', 'to'))
4692
cmd._acquire_pathlock()
4694
("", "path", "from"), {'on_parent': True, 'on_children': True,
4696
("", "path", "to"), {'on_parent': True, 'logger': None},
4698
self.assertEqual(t, should)
4700
def test_pathlock_mergepaths(self):
4701
"""Merge both path lockings."""
4702
d1 = defer.Deferred()
4703
d2 = defer.Deferred()
4704
fake_defers = [d1, d2]
4705
self.patch(PathLockingTree, 'acquire',
4706
lambda *a, **k: fake_defers.pop())
4707
cmd = Move(self.rq, VOLUME, 'node', 'o_p', 'n_p', 'n_n', 'p/f', 'p/t')
4709
# get the path lock, and add a callback to get the release function
4710
dl = cmd._acquire_pathlock()
4712
dl.addCallback(merge_release.append)
4714
# prepare marks to check both original releases are called
4717
# dl is triggered only when d1 and d2
4718
self.assertFalse(dl.called)
4719
d1.callback(lambda: release_called.append(1))
4720
self.assertFalse(dl.called)
4721
d2.callback(lambda: release_called.append(2))
4722
self.assertTrue(dl.called)
4725
self.assertFalse(release_called)
4727
self.assertEqual(sorted(release_called), [1, 2])
4730
class MakeFileTestCase(ConnectedBaseTestCase):
4731
"""Test for MakeFile ActionQueueCommand."""
4733
@defer.inlineCallbacks
4736
yield super(MakeFileTestCase, self).setUp()
4737
self.rq = RequestQueue(action_queue=self.action_queue)
4739
def test_handle_success_push_event(self):
4740
"""Test AQ_FILE_NEW_OK is pushed on success."""
4741
# create a request and fill it with succesful information
4742
request = client.MakeFile(self.action_queue.client, VOLUME,
4744
request.new_id = 'new_id'
4745
request.new_generation = 13
4747
# create a command and trigger it success
4748
cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4749
cmd.handle_success(request)
4751
# check for successful event
4752
received = self.action_queue.event_queue.events[0]
4753
info = dict(marker='marker', new_id='new_id', new_generation=13,
4755
self.assertEqual(received, ('AQ_FILE_NEW_OK', info))
4757
def test_handle_failure_push_event(self):
4758
"""Test AQ_FILE_NEW_ERROR is pushed on error."""
4759
# create a request and fill it with succesful information
4760
request = client.MakeFile(self.action_queue.client, VOLUME,
4762
request.new_id = 'new_id'
4763
request.new_generation = 13
4765
# create a command and trigger it fail
4766
cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4767
failure = Failure(Exception('foo'))
4768
cmd.handle_failure(failure)
4770
# check for successful event
4771
received = self.action_queue.event_queue.events[0]
4772
info = dict(marker='marker', failure=failure)
4773
self.assertEqual(received, ('AQ_FILE_NEW_ERROR', info))
4775
def test_possible_markers(self):
4776
"""Test that it returns the correct values."""
4777
cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4778
res = [getattr(cmd, x) for x in cmd.possible_markers]
4779
self.assertEqual(res, [ 'parent'])
4781
def test_path_locking(self):
4782
"""Test that it acquires correctly the path lock."""
4784
self.patch(PathLockingTree, 'acquire',
4785
lambda s, *a, **k: t.extend((a, k)))
4786
cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker',
4787
os.path.join('foo','bar'))
4788
cmd._acquire_pathlock()
4789
self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
4793
class MakeDirTestCase(ConnectedBaseTestCase):
4794
"""Test for MakeDir ActionQueueCommand."""
4796
@defer.inlineCallbacks
4799
yield super(MakeDirTestCase, self).setUp()
4800
self.rq = RequestQueue(action_queue=self.action_queue)
4802
def test_handle_success_push_event(self):
4803
"""Test AQ_DIR_NEW_OK is pushed on success."""
4804
# create a request and fill it with succesful information
4805
request = client.MakeDir(self.action_queue.client, VOLUME,
4807
request.new_id = 'new_id'
4808
request.new_generation = 13
4810
# create a command and trigger it success
4811
cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4812
cmd.handle_success(request)
4814
# check for successful event
4815
received = self.action_queue.event_queue.events[0]
4816
info = dict(marker='marker', new_id='new_id', new_generation=13,
4818
self.assertEqual(received, ('AQ_DIR_NEW_OK', info))
4820
def test_handle_failure_push_event(self):
4821
"""Test AQ_DIR_NEW_ERROR is pushed on error."""
4822
# create a request and fill it with succesful information
4823
request = client.MakeDir(self.action_queue.client, VOLUME,
4825
request.new_id = 'new_id'
4826
request.new_generation = 13
4828
# create a command and trigger it fail
4829
cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4830
failure = Failure(Exception('foo'))
4831
cmd.handle_failure(failure)
4833
# check for successful event
4834
received = self.action_queue.event_queue.events[0]
4835
info = dict(marker='marker', failure=failure)
4836
self.assertEqual(received, ('AQ_DIR_NEW_ERROR', info))
4838
def test_possible_markers(self):
4839
"""Test that it returns the correct values."""
4840
cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker', 'path')
4841
res = [getattr(cmd, x) for x in cmd.possible_markers]
4842
self.assertEqual(res, ['parent'])
4844
def test_path_locking(self):
4845
"""Test that it acquires correctly the path lock."""
4847
self.patch(PathLockingTree, 'acquire',
4848
lambda s, *a, **k: t.extend((a, k)))
4849
cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker',
4850
os.path.join('foo','bar'))
4851
cmd._acquire_pathlock()
4852
self.assertEqual(t, [('foo', 'bar'), {'on_parent': True,
4856
class TestDeltaList(unittest.TestCase):
4857
"""Tests for DeltaList."""
4859
def test_is_list(self):
4860
"""A DeltaList is a list."""
4863
self.assertTrue(isinstance(a, list))
4865
def test_is_equal_list(self):
4866
"""A DeltaList is equal to the list it represents."""
4869
self.assertEqual(a, l)
4871
def test_repr(self):
4872
"""A DeltaList has a short representation."""
4873
a = DeltaList(["a"*1000])
4874
self.assertTrue(len(repr(a)) < 100)
4875
self.assertTrue(len(str(a)) < 100)
4878
class AuthenticateTestCase(ConnectedBaseTestCase):
4879
"""Tests for authenticate."""
4881
@defer.inlineCallbacks
4884
yield super(AuthenticateTestCase, self).setUp()
4885
self.rq = RequestQueue(action_queue=self.action_queue)
4887
@defer.inlineCallbacks
4888
def test_session_id_is_logged(self):
4889
"""Test that session_id is logged after auth ok."""
4890
request = client.Authenticate(self.action_queue.client,
4891
{'dummy_token': 'credentials'})
4892
request.session_id = str(uuid.uuid4())
4893
self.action_queue.client.oauth_authenticate = \
4894
lambda *args: defer.succeed(request)
4896
yield self.action_queue.authenticate()
4898
self.assertTrue(self.handler.check_note('Session ID: %r' %
4899
str(request.session_id)))
4901
@defer.inlineCallbacks
4902
def test_send_platform_and_version(self):
4903
"""Test that platform and version is sent to the server."""
4905
def fake_oauth_authenticate(*args, **kwargs):
4906
called.append((args, kwargs))
4907
request = client.Authenticate(self.action_queue.client,
4908
{'dummy_token': 'credentials'})
4909
request.session_id = str(uuid.uuid4())
4910
return defer.succeed(request)
4911
self.action_queue.client.oauth_authenticate = fake_oauth_authenticate
4912
yield self.action_queue.authenticate()
4913
self.assertEqual(len(called), 1)
4914
metadata = called[0][0][2]
4915
expected_metadata = {'platform':platform, 'version':clientdefs.VERSION}
4916
self.assertEqual(metadata, expected_metadata)
4919
class ActionQueueProtocolTests(TwistedTestCase):
4920
"""Test the ACQ class."""
4924
# create an AQP and put a factory to it
4925
self.aqp = ActionQueueProtocol()
4926
obj = Mocker().mock()
4927
obj.event_queue.push('SYS_CONNECTION_MADE')
4928
self.aqp.factory = obj
4931
self.handler = MementoHandler()
4932
self.handler.setLevel(logging.DEBUG)
4933
self.aqp.log.addHandler(self.handler)
4937
self.aqp.log.removeHandler(self.handler)
4938
if self.aqp.ping_manager is not None:
4939
self.aqp.ping_manager.stop()
4941
def test_connection_made(self):
4942
"""Connection is made."""
4945
obj.event_queue.push('SYS_CONNECTION_MADE')
4946
self.aqp.factory = obj
4948
self.patch(ThrottlingStorageClient, 'connectionMade',
4949
lambda s: super_called.append(True))
4952
self.aqp.connectionMade()
4953
self.assertTrue(self.handler.check_info('Connection made.'))
4954
self.assertNotIdentical(self.aqp.ping_manager, None)
4955
self.assertTrue(super_called)
4957
def test_connection_lost(self):
4958
"""Connection is lost."""
4960
self.patch(ThrottlingStorageClient, 'connectionLost',
4961
lambda s, r: super_called.append(True))
4962
self.aqp.connectionLost('foo')
4963
self.assertTrue(self.handler.check_info(
4964
'Connection lost, reason: foo.'))
4965
self.assertIdentical(self.aqp.ping_manager, None)
4966
self.assertTrue(super_called)
4968
def test_ping_connection_made_twice(self):
4969
"""If connection made is called twice, don't create two tasks."""
4970
self.aqp.connectionMade()
4971
task1 = self.aqp.ping_manager
4972
self.aqp.connectionMade()
4973
task2 = self.aqp.ping_manager
4974
self.assertNotIdentical(task1, task2)
4975
self.assertFalse(task1._running)
4976
self.assertTrue(task2._running)
4978
def test_ping_connection_lost_twice(self):
4979
"""If connection lost is called twice, don't stop None."""
4980
self.aqp.connectionMade()
4981
self.assertNotIdentical(self.aqp.ping_manager, None)
4982
self.aqp.connectionLost('reason')
4983
self.assertIdentical(self.aqp.ping_manager, None)
4984
self.aqp.connectionLost('reason')
4985
self.assertIdentical(self.aqp.ping_manager, None)
4987
def test_init_max_payload_size(self):
4988
"""Configure max_payload_size on init according to config."""
4989
user_config = config.get_user_config()
4990
user_config.set_max_payload_size(12345)
4991
aqp = ActionQueueProtocol()
4992
self.assertEqual(aqp.max_payload_size, 12345)
4995
class CommandCycleTestCase(BasicTestCase):
4996
"""Test the command behaviour on run, retry, stop, etc.
4998
These tests are not exactly unit tests, but more about integration
4999
between the queue and the command, and how the command life cycle is.
5002
@defer.inlineCallbacks
5005
yield super(CommandCycleTestCase, self).setUp()
5007
class MyCommand(ActionQueueCommand):
5008
"""Monkeypatchable AQC."""
5010
def _acquire_pathlock(self):
5011
pathlock = PathLockingTree()
5012
return pathlock.acquire('foo')
5014
self.queue = RequestQueue(action_queue=self.action_queue)
5016
self.cmd = MyCommand(self.queue)
5018
def _check_finished_ok(self, cmd=None):
5019
"""Check that command finished ok."""
5022
self.assertFalse(cmd.running)
5023
self.assertNotIn(cmd, self.queue.waiting)
5024
self.assertFalse(self.action_queue.pathlock.count)
5026
def test_simple_start_end_ok_queue_active(self):
5027
"""Simple normal cycle, ending ok, queued while active."""
5028
# check _run was called, and returned ok
5030
self.cmd._run = lambda: called.append(1) or defer.succeed(2)
5032
# check handled success
5033
self.cmd.handle_success = lambda a: called.append(a)
5035
# let the command go
5039
self.assertEqual(called, [1, 2])
5040
self._check_finished_ok()
5042
def test_simple_start_end_bad_queue_active(self):
5043
"""Simple normal cycle, ending bad, queued while active.."""
5044
# check _run was called, and returned bad
5046
exc = ValueError('foo')
5047
self.cmd._run = lambda: called.append(1) or defer.fail(Failure(exc))
5048
self.cmd.suppressed_error_messages.append(ValueError)
5050
# check handled failure
5051
self.cmd.handle_failure = lambda f: called.append(f.value)
5053
# let the command go
5057
self.assertEqual(called, [1, exc])
5058
self._check_finished_ok()
5060
def test_simple_start_end_ok_queue_notactive(self):
5061
"""Simple normal cycle, ending ok, queued while not active."""
5062
# check _run was called, and returned ok
5064
self.cmd._run = lambda: called.append(1) or defer.succeed(2)
5066
# check handled success
5067
self.cmd.handle_success = lambda a: called.append(a)
5069
# stop the queue and let the command go
5073
# active the queue later
5077
self.assertEqual(called, [1, 2])
5078
self._check_finished_ok()
5080
def test_simple_start_end_bad_queue_notactive(self):
5081
"""Simple normal cycle, ending bad, queued while not active.."""
5082
# check _run was called, and returned bad
5084
exc = ValueError('foo')
5085
self.cmd._run = lambda: called.append(1) or defer.fail(Failure(exc))
5086
self.cmd.suppressed_error_messages.append(ValueError)
5088
# check handled failure
5089
self.cmd.handle_failure = lambda f: called.append(f.value)
5091
# stop the queue and let the command go
5095
# active the queue later
5099
self.assertEqual(called, [1, exc])
5100
self._check_finished_ok()
5102
def test_condition_goes_ok(self):
5103
"""Command not run initially, but yes when conditions are ok."""
5104
# check _run was called, and returned ok
5106
self.cmd._run = lambda: called.append('run') or defer.succeed('finish')
5108
# check handled success
5109
self.cmd.handle_success = lambda a: called.append(a)
5111
# command not ready to run
5112
self.cmd.is_runnable = False
5114
# let the command go, it will not run
5116
self.assertEqual(called, [])
5118
# fix conditions and check them
5119
self.cmd.is_runnable = True
5120
self.action_queue.conditions_locker.check_conditions()
5123
self.assertEqual(called, ['run', 'finish'])
5124
self._check_finished_ok()
5126
def test_check_conditions_while_running(self):
5127
"""Check conditions while the command is running.
5129
Check conditions can be executed at any time, we need to avoid
5130
running the command twice.
5132
# monkeypatch _run to flag called and test "while running"
5134
d = defer.Deferred()
5135
self.cmd._run = lambda: called.append(1) or d
5137
# check handled success
5138
self.cmd.handle_success = lambda a: called.append(a)
5140
# let the command go
5143
# before the command finishes, all conditions are checked
5144
self.action_queue.conditions_locker.check_conditions()
5150
self.assertEqual(called, [1, 2])
5151
self._check_finished_ok()
5153
def test_disconnect_connect_running_with_error(self):
5154
"""Simulate a disconnection and connection while command running."""
5157
"""Disconnect: stop the queue and fail with connection lost."""
5159
failure = Failure(twisted_error.ConnectionLost())
5160
return defer.fail(failure)
5164
return defer.succeed('finish')
5167
run_functions = [f1, f2]
5168
self.cmd._run = lambda: called.append('run') or run_functions.pop(0)()
5170
# check handled success and cleanup
5171
self.cmd.handle_success = lambda a: called.append(a)
5172
self.cmd.cleanup = lambda: called.append('clean')
5174
# let the command go
5181
self.assertEqual(called, ['run', 'clean', 'clean', 'run', 'finish'])
5182
self._check_finished_ok()
5184
def test_disconnect_connect_pathlocked(self):
5185
"""Simulate a disconnection and connection while waiting pathlock."""
5186
# check it called run
5188
self.cmd._run = lambda: called.append('run') or defer.succeed('finish')
5190
# monkeypatch to test "while waiting pathlock"
5191
d = defer.Deferred()
5192
self.cmd._acquire_pathlock = lambda: d
5194
# check handled success
5195
self.cmd.handle_success = lambda a: called.append(a)
5197
# let the command go
5200
# before the pathlock is released, we disconnect, and reconnect
5204
# release the pathlock
5208
self.assertEqual(called, ['run', 'finish'])
5209
self._check_finished_ok()
5211
@defer.inlineCallbacks
5212
def test_retry_immediate(self):
5213
"""Retry the command immediately."""
5214
finished = defer.Deferred()
5216
exc = twisted_error.ConnectionDone() # retryable!
5217
run_deferreds = [defer.fail(Failure(exc)), defer.succeed('finish')]
5218
self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
5219
self.cmd.handle_retryable = lambda f: called.append(f.value)
5221
# check handle success (failure is never called because it's retried)
5222
self.cmd.handle_success = lambda a: called.append(a)
5224
# need to wait finish() called, to be sure all ended ok, because of
5225
# the callLater for the retry
5227
ActionQueueCommand.finish(self.cmd)
5228
finished.callback(True)
5229
self.cmd.finish = fake_finish
5231
# let the command go
5234
# need to wait the callLater
5238
self.assertEqual(called, ['run', exc, 'run', 'finish'])
5239
self._check_finished_ok()
5241
def test_retry_conditions_solved(self):
5242
"""Retry the command because conditions solved later."""
5243
finished = defer.Deferred()
5247
"""Fail and make conditions not ok to run."""
5248
self.cmd.is_runnable = False
5249
failure = Failure(twisted_error.ConnectionDone()) # retryable!
5250
return defer.fail(failure)
5254
return defer.succeed('finish')
5256
run_functions = [f1, f2]
5257
self.cmd._run = lambda: called.append('run') or run_functions.pop(0)()
5259
# check handle success (failure is never called because it's retried)
5260
self.cmd.handle_success = lambda a: called.append(a)
5262
# need to wait finish() called, to be sure all ended ok, because of
5263
# the callLater for the retry
5265
ActionQueueCommand.finish(self.cmd)
5266
finished.callback(True)
5267
self.cmd.finish = fake_finish
5269
# let the command go, it will fail and wait for conditions
5271
self.assertEqual(called, ['run'])
5274
self.cmd.is_runnable = True
5275
self.action_queue.conditions_locker.check_conditions()
5277
# need to wait the callLater
5281
self.assertEqual(called, ['run', 'run', 'finish'])
5282
self._check_finished_ok()
5284
def test_cancel_while_running(self):
5285
"""Cancel the command while running."""
5286
# monkeypatch _run to flag called and test "while running"
5288
d = defer.Deferred()
5289
self.cmd._run = lambda: called.append(1) or d
5292
self.cmd.cleanup = lambda: called.append(2)
5294
"""Flag and call the real one."""
5296
ActionQueueCommand.finish(self.cmd)
5297
self.cmd.finish = fake_finish
5299
# let the command go
5302
# before it finishes, cancel
5306
self.assertTrue(self.cmd.cancelled)
5307
self.assertEqual(called, [1, 2, 3])
5308
self._check_finished_ok()
5310
def test_cancel_while_pathclocked(self):
5311
"""Cancel the command while running."""
5312
# monkeypatch _run to flag called and test "while running"
5314
self.cmd.run = lambda: called.append('should not')
5316
# monkeypatch to test "while waiting pathlock"
5317
d = defer.Deferred()
5318
self.cmd._acquire_pathlock = lambda: d
5320
# let the command go, and cancel in the middle
5324
# unlock the pathlock
5325
d.callback(lambda: called.append(1))
5328
self.assertEqual(called, [1])
5329
self._check_finished_ok()
5331
def test_cancel_while_waiting_conditions(self):
5332
"""Cancel the command while waiting for conditions."""
5333
# make it not runnable, and fake the pathlock to test releasing
5334
self.cmd.is_runnable = False
5336
self.cmd._acquire_pathlock = lambda: defer.succeed(
5337
lambda: released.append(True))
5339
# let the command go (will stuck because not runnable), and
5340
# cancel in the middle
5345
self._check_finished_ok()
5346
self.assertTrue(released)
5348
def test_cancel_while_waiting_queue(self):
5349
"""Cancel the command while waiting for queue."""
5350
# stop the queue, and fake the pathlock to test releasing
5353
self.cmd._acquire_pathlock = lambda: defer.succeed(
5354
lambda: released.append(True))
5356
# let the command go (will stuck because not runnable), and
5357
# cancel in the middle
5361
# now, set the queue active again, it should release everything
5362
# even if was cancelled before
5366
self._check_finished_ok()
5367
self.assertTrue(released)
5369
def test_marker_error_while_pathclocked(self):
5370
"""The marker errbacks while the command is waiting the pathlock."""
5371
# monkeypatch methods to flag called and test "while running"
5373
self.cmd.cleanup = lambda: called.append('cleanup')
5374
self.cmd.handle_failure = lambda f: called.append('handle_failure')
5375
self.cmd.run = lambda: called.append('should not')
5377
# finish is special as we need to really run it
5379
"""Flag and call the real one."""
5380
called.append('finish')
5381
ActionQueueCommand.finish(self.cmd)
5382
self.cmd.finish = fake_finish
5384
# do not let demark callback the marker
5385
self.cmd.demark = lambda: None
5387
# monkeypatch to test "while waiting pathlock"
5388
d = defer.Deferred()
5389
self.cmd._acquire_pathlock = lambda: d
5391
# let the command go, and errback the marker deferred
5393
self.cmd.markers_resolved_deferred.errback(ValueError('foo'))
5395
# unlock the pathlock
5396
d.callback(lambda: True)
5399
self.assertEqual(called, ['cleanup', 'handle_failure', 'finish'])
5400
self._check_finished_ok()
5402
def test_cancel_while_transfer_locked(self):
5403
"""Cancel the command while waiting for transfer semaphore.
5405
The semaphore lock must be released! Of course, this test is on
5406
download/upload commands.
5408
cmd = Upload(self.queue, share_id='a_share_id', node_id='a_node_id',
5409
previous_hash='prev_hash', hash='yadda', crc32=0, size=0,
5410
path='path', fileobj_factory=lambda: None)
5412
# patch the command to simulate a request to an already full
5413
# transfer semaphore in _start
5414
transfers_semaphore = self.queue.transfers_semaphore
5416
user_config = config.get_user_config()
5417
for i in xrange(user_config.get_simult_transfers()):
5418
s = transfers_semaphore.acquire()
5419
s.addCallback(semaphores.append)
5421
# let the command go, and cancel in the middle
5425
# release previous semaphores
5426
for s in semaphores:
5429
# semaphore released
5430
self.assertIdentical(cmd.tx_semaphore, None)
5431
self._check_finished_ok(cmd)
5433
def test_disconnect_connect_running_no_error(self):
5434
"""Simulate a disconnection and connection while running.
5436
Sometimes there's no error on the command (ConnectionLost) because the
5437
command got into running after the network was lost :(
5440
d1 = defer.Deferred()
5441
d2 = defer.Deferred()
5442
run_deferreds = [d1, d2]
5443
self.cmd._run = lambda: called.append('run') or run_deferreds.pop(0)
5445
# check handled success and cleanup
5446
self.cmd.handle_success = lambda a: called.append(a)
5447
self.cmd.cleanup = lambda: called.append('clean')
5449
# let the command go, it will stuck in d1
5452
# disconnect and connect, and then trigger d2 for the command to finish
5455
d2.callback('finish')
5458
self.assertEqual(called, ['run', 'clean', 'run', 'finish'])
5459
self._check_finished_ok()
5462
class InterruptibleDeferredTests(TwistedTestCase):
5463
"""Test the InterruptibleDeferred behaviour."""
5465
@defer.inlineCallbacks
5466
def test_original_callbacked(self):
5467
"""Original deferred is callbacked."""
5468
origdef = defer.Deferred()
5469
intrdef = InterruptibleDeferred(origdef)
5470
origdef.callback('foo')
5471
result = yield intrdef
5472
self.assertEqual(result, 'foo')
5474
# later we can interrupt, nothing happens
5476
self.assertFalse(intrdef.interrupted)
5478
@defer.inlineCallbacks
5479
def test_original_errbacked(self):
5480
"""Original deferred is errbacked."""
5481
origdef = defer.Deferred()
5482
intrdef = InterruptibleDeferred(origdef)
5483
origdef.errback(ValueError('foo'))
5486
except ValueError, e:
5487
self.assertEqual(str(e), 'foo')
5489
self.fail("Test should have raised an exception")
5491
# later we can interrupt, nothing happens
5493
self.assertFalse(intrdef.interrupted)
5496
def test_interrupt_except(self):
5498
intrdef = InterruptibleDeferred(defer.Deferred())
5503
except DeferredInterrupted:
5504
self.assertTrue(intrdef.interrupted)
5506
self.fail("Test should have raised an exception")
5508
def test_interrupt_callback_original(self):
5509
"""Interrupt silences further original callbacks."""
5510
origdef = defer.Deferred()
5511
intrdef = InterruptibleDeferred(origdef)
5516
except DeferredInterrupted:
5517
pass # just silecen the exception
5519
self.fail("Test should have raised an exception")
5521
# further callback to original deferred is harmless
5522
origdef.callback("foo")
5524
def test_interrupt_errback_original(self):
5525
"""Interrupt silences further original errbacks."""
5526
origdef = defer.Deferred()
5527
intrdef = InterruptibleDeferred(origdef)
5532
except DeferredInterrupted:
5533
pass # just silecen the exception
5535
self.fail("Test should have raised an exception")
5537
# further callback to original deferred is harmless
5538
origdef.errback(ValueError('foo'))
5541
class ConditionsLockerTests(TwistedTestCase):
5542
"""Test the ConditionsLocker."""
5546
self.cl = ConditionsLocker()
5548
def test_get_locking_deferred_returns_deferred(self):
5549
"""The locking is done by a deferred."""
5550
d = self.cl.get_lock('command')
5554
def test_get_locking_different_commands_different_deferreds(self):
5555
"""Asked by two commands, get two deferreds."""
5556
d1 = self.cl.get_lock('command1')
5557
d2 = self.cl.get_lock('command2')
5558
self.assertNotIdentical(d1, d2)
5560
def test_get_locking_same_command_same_deferred(self):
5561
"""If asked twice by the same command, return the same deferred.
5563
This is more a safe guard than a feature; if misused by the same
5564
command we're assuring than we will not overwrite a second deferred
5565
over the first one (so, never releasing the first one).
5567
d1 = self.cl.get_lock('command')
5568
d2 = self.cl.get_lock('command')
5569
self.assertIdentical(d1, d2)
5571
def test_check_conditions_simple_runnable(self):
5572
"""Release the command."""
5574
locking_d = self.cl.get_lock(cmd)
5575
self.assertFalse(locking_d.called)
5576
self.assertIn(cmd, self.cl.locked)
5579
assert cmd.is_runnable
5580
self.cl.check_conditions()
5581
self.assertTrue(locking_d.called)
5582
self.assertNotIn(cmd, self.cl.locked)
5584
def test_check_conditions_simple_notrunnable_then_ok(self):
5585
"""First don't release the command, then release it."""
5587
locking_d = self.cl.get_lock(cmd)
5588
self.assertFalse(locking_d.called)
5590
# check for conditions, do not release
5591
cmd.is_runnable = False
5592
self.cl.check_conditions()
5593
self.assertFalse(locking_d.called)
5595
# conditions are ok now, release
5596
cmd.is_runnable = True
5597
self.cl.check_conditions()
5598
self.assertTrue(locking_d.called)
5600
def test_check_conditions_mixed(self):
5601
"""Several commands, mixed situation."""
5602
cmd1 = FakeCommand()
5603
cmd1.is_runnable = False
5604
cmd2 = FakeCommand()
5605
assert cmd2.is_runnable
5607
# get lock for both, and check conditions
5608
locking_d1 = self.cl.get_lock(cmd1)
5609
locking_d2 = self.cl.get_lock(cmd2)
5610
self.cl.check_conditions()
5612
# one should be released, the other should not
5613
self.assertFalse(locking_d1.called)
5614
self.assertTrue(locking_d2.called)
5616
def test_cancel_command_nothold(self):
5617
"""It's ok to cancel a command not there."""
5618
self.cl.cancel_command('command')
5620
def test_cancel_releases_cancelled_command(self):
5621
"""It releases the cancelled command, even not runnable."""
5622
cmd1 = FakeCommand()
5623
cmd1.is_runnable = False
5624
cmd2 = FakeCommand()
5625
assert cmd2.is_runnable
5627
# get lock for both, and cancel only 1
5628
locking_d1 = self.cl.get_lock(cmd1)
5629
locking_d2 = self.cl.get_lock(cmd2)
5630
self.cl.cancel_command(cmd1)
5632
# 1 should be released, 2 should not (even with conditions ok)
5633
self.assertTrue(locking_d1.called)
5634
self.assertFalse(locking_d2.called)
5637
class OsIntegrationTests(BasicTestCase, MockerTestCase):
5638
"""Ensure that the correct os_helper methods are used."""
5640
@defer.inlineCallbacks
5642
"""Set up for tests."""
5643
yield super(OsIntegrationTests, self).setUp()
5644
self.fdopen = self.mocker.replace('os.fdopen')
5646
def test_fdopen(self):
5647
"""Ensure that we are calling fdopen correctly."""
5649
self.fdopen(ANY, 'w+b')
5650
self.mocker.replay()
5651
NamedTemporaryFile()
5653
def test_fdopen_real(self):
5654
"""Do test that the NamedTeporaryFile can read and write."""
5656
self.mocker.replay()
5657
tmp = NamedTemporaryFile()
5660
self.assertEqual(data, tmp.read(len(data)))
5664
class PingManagerTestCase(TwistedTestCase):
5665
"""Test the Ping manager."""
5669
class FakeActionQueueProtocol(object):
5670
"""Fake object for the tests."""
5671
log = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
5672
log.setLevel(logger.TRACE)
5673
ping = lambda self: defer.Deferred()
5675
self.fake_aqp = FakeActionQueueProtocol()
5676
self.handler = MementoHandler()
5677
self.fake_aqp.log.addHandler(self.handler)
5678
self.pm = PingManager(self.fake_aqp)
5682
self.fake_aqp.log.removeHandler(self.handler)
5685
def test_init(self):
5686
"""On start values."""
5687
self.assertTrue(self.pm._running)
5688
self.assertTrue(self.pm._loop.running)
5689
self.assertIdentical(self.pm._timeout_call, None)
5691
def test_ping_do_ping(self):
5696
expect(req.rtt).result(1.123123)
5697
d = defer.Deferred()
5698
self.pm.client.ping = lambda: d
5700
# call and check all is started when the ping is done
5702
self.assertTrue(self.pm._timeout_call.active())
5703
self.handler.debug = True
5704
self.assertTrue(self.handler.check(logger.TRACE, 'Sending ping'))
5706
# answer the ping, and check
5709
self.assertFalse(self.pm._timeout_call.active())
5710
self.assertTrue(self.handler.check_debug('Ping! rtt: 1.123 segs'))
5712
def test_stop_if_running(self):
5714
assert self.pm._running
5716
self.patch(self.pm, '_stop', lambda: called.append(True))
5718
self.assertFalse(self.pm._running)
5719
self.assertTrue(called)
5720
self.pm._running = True # back to True so it's properly stopped later
5722
def test_stop_if_not_running(self):
5723
"""Stopping the stopped."""
5725
assert not self.pm._running
5727
self.pm._stop = lambda: called.append(True)
5729
self.assertFalse(self.pm._running)
5730
self.assertFalse(called)
5732
def test_real_stop_loop(self):
5733
"""The loop is stopped."""
5734
assert self.pm._loop.running
5736
assert not self.pm._loop.running
5738
def test_real_stop_timeout_call_None(self):
5739
"""It can be stopped while not having a timeout call."""
5740
assert self.pm._timeout_call is None
5743
def test_real_stop_timeout_call_active(self):
5744
"""The timeout call is cancelled if active."""
5746
assert self.pm._timeout_call.active()
5748
self.assertFalse(self.pm._timeout_call.active())
5750
def test_real_stop_timeout_call_not_active(self):
5751
"""It can be stopped while the timeout call is not active."""
5753
self.pm._timeout_call.cancel()
5754
assert not self.pm._timeout_call.active()
5756
self.assertFalse(self.pm._timeout_call.active())
5758
def test_disconnect(self):
5759
"""Stop machinery, log and disconnect."""
5762
# mock the transport
5763
transport = mocker.mock()
5764
expect(transport.loseConnection())
5765
self.pm.client.transport = transport
5768
stop = mocker.mock()
5770
self.patch(self.pm, 'stop', stop)
5772
# ping will be called, and req accessed, otherwise mocker will complain
5774
self.pm._disconnect()
5776
self.assertTrue(self.handler.check_info("No Pong response"))