2
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
4
# Copyright 2009 Canonical Ltd.
6
# This program is free software: you can redistribute it and/or modify it
7
# under the terms of the GNU General Public License version 3, as published
8
# by the Free Software Foundation.
10
# This program is distributed in the hope that it will be useful, but
11
# WITHOUT ANY WARRANTY; without even the implied warranties of
12
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13
# PURPOSE. See the GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License along
16
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
"""tests for the syncdaemon tools module """
20
from ubuntuone.syncdaemon import (
26
from contrib.testing.testcase import (
29
from twisted.internet import reactor
32
class TestToolsBasic(DBusTwistedTestCase):
33
""" Baic test of SyncDaemonTool """
36
DBusTwistedTestCase.setUp(self)
37
self.tool = tools.SyncDaemonTool(self.bus)
39
def create_file(self, path):
40
""" creates a test file in fsm """
41
share_path = os.path.join(self.shares_dir, 'share_tools')
42
self.main.vm.add_share(volume_manager.Share(share_path,
43
share_id='tools_share_id'))
44
self.fs_manager.create(path, "tools_share_id")
45
self.fs_manager.set_node_id(path, "node_id")
46
return 'tools_share_id', 'node_id'
48
def test_wait_connected(self):
49
""" test wait_connected """
50
self.action_q.connect()
51
d = self.tool.wait_connected()
52
# test callback, pylint: disable-msg=C0111
53
def connected(result):
54
self.assertEquals(True, result)
58
def test_wait_no_more_events(self):
59
""" test wait_no_more_events """
60
d = self.tool.wait_no_more_events(last_event_interval=.1)
61
# test callback, pylint: disable-msg=C0111
62
event_names = event_queue.EVENTS.keys()
64
self.assertEquals(True, result)
67
# unsubscribe the vm subscribed in FakeMain as we are going to push
69
self.event_q.unsubscribe(self.main.vm)
70
for event_name in event_names:
71
args = event_queue.EVENTS[event_name]
72
self.event_q.push(event_name, *args)
74
reactor.callLater(0, self.action_q.connect)
75
reactor.callLater(0, fire_events)
77
d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
80
def test_all_downloads(self):
81
""" test wait_all_downloads """
82
d = self.tool.wait_all_downloads()
84
# test callback, pylint: disable-msg=C0111
85
def downloads(result):
86
self.assertEquals(True, result)
90
def test_all_uploads(self):
91
""" test wait_all_uploads """
92
d = self.tool.wait_all_uploads()
94
# test callback, pylint: disable-msg=C0111
96
self.assertEquals(True, result)
100
def test_wait_for_nirvana(self):
101
""" test wait_for_nirvana """
102
self.main.state = states.IDLE
103
d = self.tool.wait_for_nirvana(last_event_interval=.1)
105
# test callback, pylint: disable-msg=C0111
106
def callback(result):
107
self.assertEquals(True, result)
110
reactor.callLater(0, self.action_q.connect)
111
event_names = event_queue.EVENTS.keys()
113
# unsubscribe the vm subscribed in FakeMain as we are going to push
115
self.event_q.unsubscribe(self.main.vm)
116
for event_name in event_names:
117
args = event_queue.EVENTS[event_name]
118
self.event_q.push(event_name, *args)
120
# fire fake events to keep the deferred running
121
reactor.callLater(0, fire_events)
122
# 1 sec later, clear the download queue, and wait to reach nirvana
123
d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
126
def test_get_metadata(self):
127
""" check that get_metadata works as expected """
128
path = os.path.join(self.root_dir, "foo")
129
self.fs_manager.create(path, "")
130
self.fs_manager.set_node_id(path, "node_id")
131
d = self.tool.get_metadata(path)
132
# the callback, pylint: disable-msg=C0111
133
def callback(result):
134
self.assertEquals('foo', str(result['path']))
135
self.assertEquals('', str(result['share_id']))
136
self.assertEquals('node_id', result['node_id'])
137
d.addCallback(callback)