1
# -*- coding: utf-8 -*-
3
# Author: John R. Lenton <john.lenton@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
""" Tests for the action queue """
19
from __future__ import with_statement
26
from dbus.mainloop.glib import DBusGMainLoop
27
from twisted.internet import defer
29
from contrib.testing.testcase import (
34
from ubuntuone.syncdaemon.dbus_interface import DBusInterface
35
from ubuntuone.syncdaemon.main import Main
36
from ubuntuone.syncdaemon.action_queue import NoisyRequestQueue
38
DBusInterface.test = True
41
class AcionQueueTests(BaseTwistedTestCase):
42
""" Basic tests to check ActionQueue """
46
prepare to run the test
48
BaseTwistedTestCase.setUp(self)
49
self.root = self.mktemp('root')
50
self.shares = self.mktemp('shares')
51
self.data = self.mktemp('data')
52
self.handler = MementoHandler()
53
self.handler.setLevel(logging.ERROR)
54
self.main = Main(root_dir=self.root,
55
shares_dir=self.shares,
57
host='localhost', port=0,
58
dns_srv=False, ssl=False,
59
disable_ssl_verify=True,
63
glib_loop=DBusGMainLoop(set_as_default=True))
64
logging.getLogger('ubuntuone.SyncDaemon').addHandler(self.handler)
65
dbus.service.BusName.__del__ = lambda _: None
69
cleanup after the test
72
shutil.rmtree(self.root)
73
shutil.rmtree(self.shares)
74
shutil.rmtree(self.data)
75
for record in self.handler.records:
76
exc_info = getattr(record, 'exc_info', None)
77
if exc_info is not None:
78
raise exc_info[0], exc_info[1], exc_info[2]
79
BaseTwistedTestCase.tearDown(self)
81
@defer.inlineCallbacks
82
def test_content_queue_has_only_one_op_per_node(self):
84
Check that the content queue uniquifies operations per node.
86
yield self.main.start()
87
# totally fake, we don't care: the messages are only validated on run
88
self.main.action_q.download('foo', 'bar', 0, 0)
89
self.main.action_q.upload('foo', 'bar', 0, 0, 0, 0, 0)
90
self.assertEqual(len(self.main.action_q.content_queue.waiting), 1)
92
@defer.inlineCallbacks
93
def test_content_queue_has_only_one_op_per_node_even_counting_markers(self):
95
Check that the content queue uniquifies operations per node
96
even when some of the operations were added using markers.
98
yield self.main.start()
99
self.main.action_q.download('foo', 'bar', 0, 0)
100
self.main.action_q.uuid_map.set('foo', 'feh')
101
self.main.action_q.uuid_map.set('bar', 'bah')
102
self.main.action_q.upload('feh', 'bah', 0, 0, 0, 0, 0)
103
self.assertEqual(len(self.main.action_q.content_queue.waiting), 1)
105
@defer.inlineCallbacks
106
def test_aq_resolve_uuid_maybe(self):
108
Check action_q.resolve_uuid_maybe does what it's supposed to
110
yield self.main.start()
111
self.assertEqual(self.main.action_q.resolve_uuid_maybe('foo'), 'foo')
112
self.main.action_q.uuid_map.set('foo', 'feh')
113
self.assertEqual(self.main.action_q.resolve_uuid_maybe('foo'), 'feh')
116
class TestNoisyRQ(unittest.TestCase):
118
Tests for NoisyRequestQueue
121
def test_noisy_rq_blurts_about_head(self):
123
Test NRQ calls its callback when head is set
125
rq = NoisyRequestQueue('name', None,
127
setattr(self, 'result', (h, tuple(w))))
129
self.assertEqual(self.result, ('blah', ()))
131
def test_noisy_rq_blurts_about_waiting(self):
133
Test NRQ calls its callback when the waiting queue is altered.
135
class BlackHole(object):
136
'''The universal tool.'''
137
__call__ = __getattr__ = lambda *_, **__: BlackHole()
138
class FakeCommand(object):
139
'''Yet another fake action queue command'''
141
'''run that just succeeds'''
142
return defer.succeed(None)
143
def cb(head, waiting):
144
'''NRQ testing callback'''
145
evts.append((head, tuple(waiting)))
148
rq = NoisyRequestQueue('name', BlackHole(), cb)
152
self.assertEqual(evts, [(None, ()), # __init__
153
(None, ('one',)), # queue
154
(None, (cmd, 'one')), # queue_top
155
(cmd, ('one',)), # run
156
(None, ('one',)), # done