30
from ubuntuone.devtools.handlers import MementoHandler
29
32
from ubuntuone.platform.linux import dbus_interface, tools, get_udf_path
30
from tests.platform.linux.test_dbus import FakeCommand, DBusTwistedTestCase
33
from tests.platform.linux.test_dbus import (
34
FakeCommand, DBusTwistedTestCase, AllEventsSender,
32
37
from twisted.internet import defer, reactor
48
55
self.rmtree(self.home_dir)
49
56
return DBusTwistedTestCase.tearDown(self)
58
@defer.inlineCallbacks
51
59
def create_file(self, path):
52
60
""" creates a test file in fsm """
53
61
share_path = os.path.join(self.shares_dir, 'share_tools')
54
self.main.vm.add_share(volume_manager.Share(share_path,
55
volume_id='tools_share_id'))
62
share = volume_manager.Share(share_path, volume_id='tools_share_id')
63
yield self.main.vm.add_share(share)
56
64
self.fs_manager.create(path, "tools_share_id")
57
65
self.fs_manager.set_node_id(path, "node_id")
58
return 'tools_share_id', 'node_id'
66
defer.returnValue(('tools_share_id', 'node_id'))
61
69
class TestToolsBasic(TestToolsBase):
62
70
""" Baic test of SyncDaemonTool """
72
def _create_share(self, accepted=True, subscribed=True):
73
"""Return a newly created Share."""
74
share_path = os.path.join(self.shares_dir, 'share_name')
75
share = volume_manager.Share(path=share_path, volume_id='share_vol_id')
78
def _create_udf(self, volume_id, node_id, suggested_path, subscribed=True):
79
"""Create an UDF and returns it and the volume"""
80
path = get_udf_path(suggested_path)
81
udf = volume_manager.UDF(str(volume_id), str(node_id), suggested_path,
64
85
def test_wait_connected(self):
65
86
""" test wait_connected """
66
87
self.action_q.connect()
74
95
def test_wait_no_more_events(self):
75
96
"""Test wait_no_more_events."""
97
# subscribe the all event sender, as it's needed for this test
98
self.event_q.subscribe(AllEventsSender(self.dbus_iface))
76
99
d = self.tool.wait_no_more_events(last_event_interval=.1)
77
event_names = event_queue.EVENTS.keys()
101
self.event_q.push("FS_FILE_CREATE", path="somepath")
105
self.assertTrue(self.handler.check_debug("new event",
107
self.assertTrue(self.handler.check_debug("No more events"))
81
108
self.assertEqual(True, result)
85
# unsubscribe the vm subscribed in FakeMain as we are going to push
87
self.event_q.unsubscribe(self.main.vm)
88
for event_name in event_names:
89
args = event_queue.EVENTS[event_name]
90
self.event_q.push(event_name, **dict((x, x) for x in args))
92
reactor.callLater(0, self.action_q.connect)
93
reactor.callLater(0, fire_events)
95
d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
97
test_wait_no_more_events.skip = "Facundo will fix this in other branch."
99
113
def test_all_downloads(self):
100
114
""" test wait_all_downloads """
194
@defer.inlineCallbacks
180
195
def test_accept_share(self):
181
196
"""Test accept_share method"""
182
197
share_path = os.path.join(self.main.shares_dir, 'share')
183
self.main.vm.add_share(volume_manager.Share(path=share_path,
198
yield self.main.vm.add_share(volume_manager.Share(path=share_path,
184
199
volume_id='share_id', access_level='Read',
185
200
accepted=False, node_id="node_id"))
186
201
self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
187
d = self.tool.accept_share('share_id')
190
self.assertEquals('Yes', result['answer'])
191
self.assertEquals('share_id', result['volume_id'])
192
self.assertEquals(True, self.main.vm.shares['share_id'].accepted)
202
result = yield self.tool.accept_share('share_id')
204
self.assertEquals('Yes', result['answer'])
205
self.assertEquals('share_id', result['volume_id'])
206
self.assertEquals(True, self.main.vm.shares['share_id'].accepted)
208
@defer.inlineCallbacks
197
209
def test_reject_share(self):
198
210
"""Test the reject_share method"""
199
211
share_path = os.path.join(self.main.shares_dir, 'share')
200
self.main.vm.add_share(volume_manager.Share(path=share_path,
212
yield self.main.vm.add_share(volume_manager.Share(path=share_path,
201
213
volume_id='share_id', access_level='Read',
203
215
self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
204
d = self.tool.reject_share('share_id')
207
self.assertEquals('No', result['answer'])
208
self.assertEquals('share_id', result['volume_id'])
209
self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
216
result = yield self.tool.reject_share('share_id')
218
self.assertEquals('No', result['answer'])
219
self.assertEquals('share_id', result['volume_id'])
220
self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
214
222
def test_wait_for_signal(self):
215
223
"""Test wait_for_signal method"""
393
401
d = self.tool.delete_folder("folder_id")
396
def _create_udf(self, id, node_id, suggested_path, subscribed=True):
397
"""Create an UDF and returns it and the volume"""
398
path = get_udf_path(suggested_path)
399
udf = volume_manager.UDF(str(id), str(node_id), suggested_path, path,
404
@defer.inlineCallbacks
403
405
def test_subscribe_folder(self):
404
406
"""Test for Folders.subscribe and that it fires a dbus signal."""
405
407
suggested_path = u'~/ñoño'
406
408
udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
407
409
subscribed=False)
408
self.main.vm.add_udf(udf)
409
d = self.tool.subscribe_folder(udf.id)
412
"""Check that the folder is subscribed."""
413
self.assertTrue(self.main.vm.udfs[udf.id].subscribed,
414
"UDF %s isn't subscribed" % udf.id)
410
yield self.main.vm.add_udf(udf)
411
yield self.tool.subscribe_folder(udf.id)
413
self.assertTrue(self.main.vm.udfs[udf.id].subscribed,
414
"UDF %s isn't subscribed" % udf.id)
416
@defer.inlineCallbacks
418
417
def test_unsubscribe_folder(self):
419
418
"""Test for Folders.unsubscribe."""
420
419
suggested_path = u'~/ñoño'
421
420
udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
423
self.main.vm.add_udf(udf)
424
d = self.tool.unsubscribe_folder(udf.id)
427
"""Check that the folder is not subscribed."""
428
self.assertFalse(self.main.vm.udfs[udf.id].subscribed,
429
"UDF %s is subscribed" % udf.id)
422
yield self.main.vm.add_udf(udf)
423
yield self.tool.unsubscribe_folder(udf.id)
425
self.assertFalse(self.main.vm.udfs[udf.id].subscribed,
426
"UDF %s is subscribed" % udf.id)
428
@defer.inlineCallbacks
429
def test_subscribe_share(self):
430
"""Test for Shares.subscribe."""
431
share = self._create_share(accepted=True, subscribed=False)
432
yield self.main.vm.add_share(share)
433
yield self.tool.subscribe_share(share.volume_id)
435
self.assertTrue(self.main.vm.shares[share.id].subscribed,
436
"share %s should be subscribed" % share)
438
@defer.inlineCallbacks
439
def test_unsubscribe_share(self):
440
"""Test for Shares.unsubscribe."""
441
share = self._create_share(accepted=True, subscribed=False)
442
yield self.main.vm.add_share(share)
443
yield self.tool.unsubscribe_share(share.volume_id)
445
self.assertFalse(self.main.vm.shares[share.id].subscribed,
446
"share %s should not be subscribed" % share)
433
448
def test_change_public_access(self):
434
449
"""Test change_public_access."""