22
22
from dbus.exceptions import DBusException
23
23
from ubuntuone.syncdaemon import (
28
29
from ubuntuone.platform.linux import dbus_interface, tools, get_udf_path
29
from tests.platform.linux.test_dbus import (
30
FakeCommand, EmptyCommand, DBusTwistedTestCase
30
from tests.platform.linux.test_dbus import FakeCommand, DBusTwistedTestCase
33
32
from twisted.internet import defer, reactor
75
74
def test_wait_no_more_events(self):
76
""" test wait_no_more_events """
75
"""Test wait_no_more_events."""
77
76
d = self.tool.wait_no_more_events(last_event_interval=.1)
78
# test callback, pylint: disable-msg=C0111
79
77
event_names = event_queue.EVENTS.keys()
80
79
def events(result):
81
self.assertEquals(True, result)
81
self.assertEqual(True, result)
84
85
# unsubscribe the vm subscribed in FakeMain as we are going to push
86
87
self.event_q.unsubscribe(self.main.vm)
302
304
def test_waiting_metadata(self):
303
305
"""Test SyncDaemonTool.waiting_metadata."""
304
306
# inject the fake data
305
self.action_q.meta_queue.waiting.extend([
307
self.action_q.queue.waiting.extend([
306
308
FakeCommand("node_a_foo", "node_a_bar"),
307
309
FakeCommand("node_b_foo", "node_b_bar")])
308
310
result = yield self.tool.waiting_metadata()
317
319
other='', path='')
318
320
self.assertEqual(result[1], ('FakeCommand', pl))
320
def test_waiting_content_schedule_next(self):
321
"""Test waiting_content and schedule_next"""
322
def test_waiting_content(self):
323
"""Test waiting_content."""
325
class FakeContentCommand(FakeCommand, action_queue.Upload):
326
"""Fake command that goes in content queue."""
327
def __init__(self, *args):
328
FakeCommand.__init__(self, *args)
322
330
path = os.path.join(self.root_dir, "foo")
323
331
self.fs_manager.create(path, "")
324
332
self.fs_manager.set_node_id(path, "node_id")
326
334
self.fs_manager.create(path1, "")
327
335
self.fs_manager.set_node_id(path1, "node_id_1")
328
336
# inject the fake data
329
self.action_q.content_queue.waiting.extend([
330
FakeCommand("", "node_id"),
331
FakeCommand("", "node_id_1"),
337
self.action_q.queue.waiting.extend([
338
FakeContentCommand("", "node_id"),
339
FakeContentCommand("", "node_id_1")])
334
341
d = self.tool.waiting_content()
336
342
def handler(result):
338
node_1, node, empty = result
340
node, node_1, empty = result
341
self.assertEquals(path, str(node['path']))
342
self.assertEquals(path1, str(node_1['path']))
343
self.assertEquals('', str(node['share']))
344
self.assertEquals('', str(node_1['share']))
345
self.assertEquals('node_id', str(node['node']))
346
self.assertEquals('node_id_1', str(node_1['node']))
347
self.assertEquals(EmptyCommand.__name__, str(empty['operation']))
343
node, node_1 = result
344
self.assertEqual(path, str(node['path']))
345
self.assertEqual(path1, str(node_1['path']))
346
self.assertEqual('', str(node['share']))
347
self.assertEqual('', str(node_1['share']))
348
self.assertEqual('node_id', str(node['node']))
349
self.assertEqual('node_id_1', str(node_1['node']))
348
350
self.assertTrue(result)
349
351
d.addCallbacks(handler, self.fail)
352
d.addCallback(second)
353
d.addCallback(lambda _: self.tool.schedule_next('', 'node_id_1'))
354
d.addCallbacks(lambda _: self.tool.waiting_content(), self.fail)
355
d.addCallbacks(handler, self.fail)
358
354
def test_start(self):
383
379
"""Test for Folders.delete."""
384
380
path = os.path.join(self.home_dir, u'ñoño').encode('utf-8')
385
381
# patch AQ.delete_volume
386
def delete_volume(volume_id):
382
def delete_volume(volume_id, path):
387
383
"""Fake delete_volume"""
388
384
self.main.event_q.push("AQ_DELETE_VOLUME_OK", volume_id="folder_id")
389
385
self.main.action_q.delete_volume = delete_volume