~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to tests/platform/linux/test_tools.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-01-25 16:42:52 UTC
  • mto: This revision was merged to the branch mainline in revision 64.
  • Revision ID: james.westby@ubuntu.com-20110125164252-rl1pybasx1nsqgoy
Tags: upstream-1.5.3
Import upstream version 1.5.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 
22
22
from dbus.exceptions import DBusException
23
23
from ubuntuone.syncdaemon import (
 
24
    action_queue,
24
25
    event_queue,
 
26
    states,
25
27
    volume_manager,
26
 
    states,
27
28
)
28
29
from ubuntuone.platform.linux import dbus_interface, tools, get_udf_path
29
 
from tests.platform.linux.test_dbus import (
30
 
    FakeCommand, EmptyCommand, DBusTwistedTestCase
31
 
)
 
30
from tests.platform.linux.test_dbus import FakeCommand, DBusTwistedTestCase
32
31
 
33
32
from twisted.internet import defer, reactor
34
33
 
73
72
        return d
74
73
 
75
74
    def test_wait_no_more_events(self):
76
 
        """ test wait_no_more_events """
 
75
        """Test wait_no_more_events."""
77
76
        d = self.tool.wait_no_more_events(last_event_interval=.1)
78
 
        # test callback, pylint: disable-msg=C0111
79
77
        event_names = event_queue.EVENTS.keys()
 
78
 
80
79
        def events(result):
81
 
            self.assertEquals(True, result)
 
80
            """Test callback."""
 
81
            self.assertEqual(True, result)
82
82
 
83
83
        def fire_events():
 
84
            """Fire events."""
84
85
            # unsubscribe the vm subscribed in FakeMain as we are going to push
85
86
            # fake events
86
87
            self.event_q.unsubscribe(self.main.vm)
93
94
        d.addBoth(events)
94
95
        d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
95
96
        return d
 
97
    test_wait_no_more_events.skip = "Facundo will fix this in other branch."
96
98
 
97
99
    def test_all_downloads(self):
98
100
        """ test wait_all_downloads """
302
304
    def test_waiting_metadata(self):
303
305
        """Test SyncDaemonTool.waiting_metadata."""
304
306
        # inject the fake data
305
 
        self.action_q.meta_queue.waiting.extend([
 
307
        self.action_q.queue.waiting.extend([
306
308
                FakeCommand("node_a_foo", "node_a_bar"),
307
309
                FakeCommand("node_b_foo", "node_b_bar")])
308
310
        result = yield self.tool.waiting_metadata()
317
319
                  other='', path='')
318
320
        self.assertEqual(result[1], ('FakeCommand', pl))
319
321
 
320
 
    def test_waiting_content_schedule_next(self):
321
 
        """Test waiting_content and schedule_next"""
 
322
    def test_waiting_content(self):
 
323
        """Test waiting_content."""
 
324
 
 
325
        class FakeContentCommand(FakeCommand, action_queue.Upload):
 
326
            """Fake command that goes in content queue."""
 
327
            def __init__(self, *args):
 
328
                FakeCommand.__init__(self, *args)
 
329
 
322
330
        path = os.path.join(self.root_dir, "foo")
323
331
        self.fs_manager.create(path, "")
324
332
        self.fs_manager.set_node_id(path, "node_id")
326
334
        self.fs_manager.create(path1, "")
327
335
        self.fs_manager.set_node_id(path1, "node_id_1")
328
336
        # inject the fake data
329
 
        self.action_q.content_queue.waiting.extend([
330
 
                FakeCommand("", "node_id"),
331
 
                FakeCommand("", "node_id_1"),
332
 
                EmptyCommand()])
 
337
        self.action_q.queue.waiting.extend([
 
338
                FakeContentCommand("", "node_id"),
 
339
                FakeContentCommand("", "node_id_1")])
333
340
 
334
341
        d = self.tool.waiting_content()
335
 
        self.second = False
336
342
        def handler(result):
337
 
            if self.second:
338
 
                node_1, node, empty = result
339
 
            else:
340
 
                node, node_1, empty = result
341
 
            self.assertEquals(path, str(node['path']))
342
 
            self.assertEquals(path1, str(node_1['path']))
343
 
            self.assertEquals('', str(node['share']))
344
 
            self.assertEquals('', str(node_1['share']))
345
 
            self.assertEquals('node_id', str(node['node']))
346
 
            self.assertEquals('node_id_1', str(node_1['node']))
347
 
            self.assertEquals(EmptyCommand.__name__, str(empty['operation']))
 
343
            node, node_1 = result
 
344
            self.assertEqual(path, str(node['path']))
 
345
            self.assertEqual(path1, str(node_1['path']))
 
346
            self.assertEqual('', str(node['share']))
 
347
            self.assertEqual('', str(node_1['share']))
 
348
            self.assertEqual('node_id', str(node['node']))
 
349
            self.assertEqual('node_id_1', str(node_1['node']))
348
350
            self.assertTrue(result)
349
351
        d.addCallbacks(handler, self.fail)
350
 
        def second(_):
351
 
            self.second = True
352
 
        d.addCallback(second)
353
 
        d.addCallback(lambda _: self.tool.schedule_next('', 'node_id_1'))
354
 
        d.addCallbacks(lambda _: self.tool.waiting_content(), self.fail)
355
 
        d.addCallbacks(handler, self.fail)
356
352
        return d
357
353
 
358
354
    def test_start(self):
383
379
        """Test for Folders.delete."""
384
380
        path = os.path.join(self.home_dir, u'ñoño').encode('utf-8')
385
381
        # patch AQ.delete_volume
386
 
        def delete_volume(volume_id):
 
382
        def delete_volume(volume_id, path):
387
383
            """Fake delete_volume"""
388
384
            self.main.event_q.push("AQ_DELETE_VOLUME_OK", volume_id="folder_id")
389
385
        self.main.action_q.delete_volume = delete_volume