~didrocks/ubuntuone-client/use_result_var

« back to all changes in this revision

Viewing changes to tests/platform/linux/test_tools.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-02-11 16:18:11 UTC
  • mfrom: (1.1.44 upstream)
  • Revision ID: james.westby@ubuntu.com-20110211161811-lbelxu332e8r2dov
Tags: 1.5.4-0ubuntu1
* New upstream release.
  - Files still get downloaded from unsubscribed folder (LP: #682878)
  - Add subscription capabilities to shares (LP: #708335)
  - Nautilus offers Publish option within other's shares (LP: #712674)
  - Shares dir name may not be unique (LP: #715776)
  - Send a notification when new Volume is available (LP: #702055)
  - Add messaging menu entry for new Volumes (LP: #702075)
  - Aggregate notifications for completed operations (LP: #702128)
  - Send a notification for new Share offers (LP: #702138)
  - Add messaging menu entry for new Share offers (LP: #702144)
* Remove ubuntuone-client-tools as u1sync has been moved out.
* Simplify python package as we don't have multiple python packages now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    states,
27
27
    volume_manager,
28
28
)
 
29
 
 
30
from ubuntuone.devtools.handlers import MementoHandler
 
31
 
29
32
from ubuntuone.platform.linux import dbus_interface, tools, get_udf_path
30
 
from tests.platform.linux.test_dbus import FakeCommand, DBusTwistedTestCase
 
33
from tests.platform.linux.test_dbus import (
 
34
    FakeCommand, DBusTwistedTestCase, AllEventsSender,
 
35
)
31
36
 
32
37
from twisted.internet import defer, reactor
33
38
 
41
46
        self.home_dir = self.mktemp('ubuntuonehacker')
42
47
        self._old_home = os.environ['HOME']
43
48
        os.environ['HOME'] = self.home_dir
 
49
        self.handler = MementoHandler()
 
50
        self.tool.log.addHandler(self.handler)
44
51
        return self.main.wait_for_nirvana()
45
52
 
46
53
    def tearDown(self):
48
55
        self.rmtree(self.home_dir)
49
56
        return DBusTwistedTestCase.tearDown(self)
50
57
 
 
58
    @defer.inlineCallbacks
51
59
    def create_file(self, path):
52
60
        """ creates a test file in fsm """
53
61
        share_path = os.path.join(self.shares_dir, 'share_tools')
54
 
        self.main.vm.add_share(volume_manager.Share(share_path,
55
 
                                                    volume_id='tools_share_id'))
 
62
        share = volume_manager.Share(share_path, volume_id='tools_share_id')
 
63
        yield self.main.vm.add_share(share)
56
64
        self.fs_manager.create(path, "tools_share_id")
57
65
        self.fs_manager.set_node_id(path, "node_id")
58
 
        return 'tools_share_id', 'node_id'
 
66
        defer.returnValue(('tools_share_id', 'node_id'))
59
67
 
60
68
 
61
69
class TestToolsBasic(TestToolsBase):
62
70
    """ Baic test of SyncDaemonTool """
63
71
 
 
72
    def _create_share(self, accepted=True, subscribed=True):
 
73
        """Return a newly created Share."""
 
74
        share_path = os.path.join(self.shares_dir, 'share_name')
 
75
        share = volume_manager.Share(path=share_path, volume_id='share_vol_id')
 
76
        return share
 
77
 
 
78
    def _create_udf(self, volume_id, node_id, suggested_path, subscribed=True):
 
79
        """Create an UDF and returns it and the volume"""
 
80
        path = get_udf_path(suggested_path)
 
81
        udf = volume_manager.UDF(str(volume_id), str(node_id), suggested_path,
 
82
                                 path, subscribed)
 
83
        return udf
 
84
 
64
85
    def test_wait_connected(self):
65
86
        """ test wait_connected """
66
87
        self.action_q.connect()
73
94
 
74
95
    def test_wait_no_more_events(self):
75
96
        """Test wait_no_more_events."""
 
97
        # subscribe the all event sender, as it's needed for this test
 
98
        self.event_q.subscribe(AllEventsSender(self.dbus_iface))
76
99
        d = self.tool.wait_no_more_events(last_event_interval=.1)
77
 
        event_names = event_queue.EVENTS.keys()
78
 
 
79
 
        def events(result):
80
 
            """Test callback."""
 
100
 
 
101
        self.event_q.push("FS_FILE_CREATE", path="somepath")
 
102
 
 
103
        def check(result):
 
104
            """Check result."""
 
105
            self.assertTrue(self.handler.check_debug("new event",
 
106
                                                     "FS_FILE_CREATE"))
 
107
            self.assertTrue(self.handler.check_debug("No more events"))
81
108
            self.assertEqual(True, result)
82
109
 
83
 
        def fire_events():
84
 
            """Fire events."""
85
 
            # unsubscribe the vm subscribed in FakeMain as we are going to push
86
 
            # fake events
87
 
            self.event_q.unsubscribe(self.main.vm)
88
 
            for event_name in event_names:
89
 
                args = event_queue.EVENTS[event_name]
90
 
                self.event_q.push(event_name, **dict((x, x) for x in args))
91
 
 
92
 
        reactor.callLater(0, self.action_q.connect)
93
 
        reactor.callLater(0, fire_events)
94
 
        d.addBoth(events)
95
 
        d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
 
110
        d.addBoth(check)
96
111
        return d
97
 
    test_wait_no_more_events.skip = "Facundo will fix this in other branch."
98
112
 
99
113
    def test_all_downloads(self):
100
114
        """ test wait_all_downloads """
177
191
        d.addBoth(check)
178
192
        return d
179
193
 
 
194
    @defer.inlineCallbacks
180
195
    def test_accept_share(self):
181
196
        """Test accept_share method"""
182
197
        share_path = os.path.join(self.main.shares_dir, 'share')
183
 
        self.main.vm.add_share(volume_manager.Share(path=share_path,
 
198
        yield self.main.vm.add_share(volume_manager.Share(path=share_path,
184
199
                                     volume_id='share_id', access_level='Read',
185
200
                                     accepted=False, node_id="node_id"))
186
201
        self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
187
 
        d = self.tool.accept_share('share_id')
188
 
        def check(result):
189
 
            """do the asserts"""
190
 
            self.assertEquals('Yes', result['answer'])
191
 
            self.assertEquals('share_id', result['volume_id'])
192
 
            self.assertEquals(True, self.main.vm.shares['share_id'].accepted)
193
 
 
194
 
        d.addCallback(check)
195
 
        return d
196
 
 
 
202
        result = yield self.tool.accept_share('share_id')
 
203
 
 
204
        self.assertEquals('Yes', result['answer'])
 
205
        self.assertEquals('share_id', result['volume_id'])
 
206
        self.assertEquals(True, self.main.vm.shares['share_id'].accepted)
 
207
 
 
208
    @defer.inlineCallbacks
197
209
    def test_reject_share(self):
198
210
        """Test the reject_share method"""
199
211
        share_path = os.path.join(self.main.shares_dir, 'share')
200
 
        self.main.vm.add_share(volume_manager.Share(path=share_path,
 
212
        yield self.main.vm.add_share(volume_manager.Share(path=share_path,
201
213
                                     volume_id='share_id', access_level='Read',
202
214
                                     accepted=False))
203
215
        self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
204
 
        d = self.tool.reject_share('share_id')
205
 
        def check(result):
206
 
            """do the asserts"""
207
 
            self.assertEquals('No', result['answer'])
208
 
            self.assertEquals('share_id', result['volume_id'])
209
 
            self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
 
216
        result = yield self.tool.reject_share('share_id')
210
217
 
211
 
        d.addCallback(check)
212
 
        return d
 
218
        self.assertEquals('No', result['answer'])
 
219
        self.assertEquals('share_id', result['volume_id'])
 
220
        self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
213
221
 
214
222
    def test_wait_for_signal(self):
215
223
        """Test wait_for_signal method"""
393
401
        d = self.tool.delete_folder("folder_id")
394
402
        yield d
395
403
 
396
 
    def _create_udf(self, id, node_id, suggested_path, subscribed=True):
397
 
        """Create an UDF and returns it and the volume"""
398
 
        path = get_udf_path(suggested_path)
399
 
        udf = volume_manager.UDF(str(id), str(node_id), suggested_path, path,
400
 
              subscribed)
401
 
        return udf
402
 
 
 
404
    @defer.inlineCallbacks
403
405
    def test_subscribe_folder(self):
404
406
        """Test for Folders.subscribe and that it fires a dbus signal."""
405
407
        suggested_path = u'~/ñoño'
406
408
        udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
407
409
                               subscribed=False)
408
 
        self.main.vm.add_udf(udf)
409
 
        d = self.tool.subscribe_folder(udf.id)
410
 
 
411
 
        def check(r):
412
 
            """Check that the folder is subscribed."""
413
 
            self.assertTrue(self.main.vm.udfs[udf.id].subscribed,
414
 
                            "UDF %s isn't subscribed" % udf.id)
415
 
        d.addCallback(check)
416
 
        return d
417
 
 
 
410
        yield self.main.vm.add_udf(udf)
 
411
        yield self.tool.subscribe_folder(udf.id)
 
412
 
 
413
        self.assertTrue(self.main.vm.udfs[udf.id].subscribed,
 
414
                        "UDF %s isn't subscribed" % udf.id)
 
415
 
 
416
    @defer.inlineCallbacks
418
417
    def test_unsubscribe_folder(self):
419
418
        """Test for Folders.unsubscribe."""
420
419
        suggested_path = u'~/ñoño'
421
420
        udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
422
421
                               subscribed=True)
423
 
        self.main.vm.add_udf(udf)
424
 
        d = self.tool.unsubscribe_folder(udf.id)
425
 
 
426
 
        def check(r):
427
 
            """Check that the folder is not subscribed."""
428
 
            self.assertFalse(self.main.vm.udfs[udf.id].subscribed,
429
 
                            "UDF %s is subscribed" % udf.id)
430
 
        d.addCallback(check)
431
 
        return d
 
422
        yield self.main.vm.add_udf(udf)
 
423
        yield self.tool.unsubscribe_folder(udf.id)
 
424
 
 
425
        self.assertFalse(self.main.vm.udfs[udf.id].subscribed,
 
426
                         "UDF %s is subscribed" % udf.id)
 
427
 
 
428
    @defer.inlineCallbacks
 
429
    def test_subscribe_share(self):
 
430
        """Test for Shares.subscribe."""
 
431
        share = self._create_share(accepted=True, subscribed=False)
 
432
        yield self.main.vm.add_share(share)
 
433
        yield self.tool.subscribe_share(share.volume_id)
 
434
 
 
435
        self.assertTrue(self.main.vm.shares[share.id].subscribed,
 
436
                        "share %s should be subscribed" % share)
 
437
 
 
438
    @defer.inlineCallbacks
 
439
    def test_unsubscribe_share(self):
 
440
        """Test for Shares.unsubscribe."""
 
441
        share = self._create_share(accepted=True, subscribed=False)
 
442
        yield self.main.vm.add_share(share)
 
443
        yield self.tool.unsubscribe_share(share.volume_id)
 
444
 
 
445
        self.assertFalse(self.main.vm.shares[share.id].subscribed,
 
446
                         "share %s should not be subscribed" % share)
432
447
 
433
448
    def test_change_public_access(self):
434
449
        """Test change_public_access."""