2
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
4
# Copyright 2009 Canonical Ltd.
6
# This program is free software: you can redistribute it and/or modify it
7
# under the terms of the GNU General Public License version 3, as published
8
# by the Free Software Foundation.
10
# This program is distributed in the hope that it will be useful, but
11
# WITHOUT ANY WARRANTY; without even the implied warranties of
12
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13
# PURPOSE. See the GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License along
16
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
""" Base tests cases and test utilities """
20
import dbus.mainloop.glib
24
from warnings import warn
27
from ubuntuone.syncdaemon import (
30
filesystem_manager as fs_manager,
35
from ubuntuone.syncdaemon.dbus_interface import (
39
NM_STATE_DISCONNECTED,
41
from twisted.internet import defer, reactor
42
from twisted.trial.unittest import TestCase as TwistedTestCase
43
from zope.interface import implements
45
class FakeOAuthClient(object):
46
""" Fake OAuthClient"""
48
def __init__(self, realm):
49
""" create the instance. """
51
self.consumer = 'ubuntuone'
53
def get_access_token(self):
54
""" returns a Token"""
58
class FakeHashQueue(object):
59
"""A fake hash queue"""
61
"""are we empty? sure we are"""
65
""" length is 0. we are empty, right?"""
69
class FakeMain(main.Main):
70
""" A fake Main class to setup the tests """
72
# don't call Main.__init__ we take care of creating a fake main and
73
# all it's attributes. pylint: disable-msg=W0231
74
def __init__(self, root_dir, shares_dir, data_dir):
75
""" create the instance. """
76
self.logger = logging.getLogger('ubuntuone.SyncDaemon.FakeMain')
77
self.root_dir = root_dir
78
self.data_dir = data_dir
79
self.shares_dir = shares_dir
80
self.realm = 'http://test.ubuntuone.com'
81
self.oauth_client = FakeOAuthClient(self.realm)
82
self.vm = volume_manager.VolumeManager(self)
83
self.fs = fs_manager.FileSystemManager(self.data_dir, self.vm)
84
self.event_q = event_queue.EventQueue(self.fs)
85
self.action_q = FakeActionQueue(self.event_q)
86
self.state = main.SyncDaemonStateManager(self)
87
self.event_q.subscribe(self.vm)
89
self.hash_q = FakeHashQueue()
91
def _connect_aq(self, token):
92
""" Connect the fake action queue """
93
self.action_q.connect()
95
def _disconnect_aq(self):
96
""" Disconnect the fake action queue """
97
self.action_q.disconnect()
100
""" shutdown all created objects """
101
self.event_q.shutdown()
103
def get_access_token(self):
107
def check_version(self):
108
""" Check the client protocol version matches that of the server. """
109
self.event_q.push('SYS_PROTOCOL_VERSION_OK')
111
def authenticate(self):
112
""" Do the OAuth dance. """
113
self.event_q.push('SYS_OAUTH_OK')
115
def get_root(self, root_mdid):
116
""" Ask que AQ for our root's uuid. """
117
return defer.succeed('root_uuid')
119
def server_rescan(self):
120
""" Do the server rescan? naaa! """
121
self.event_q.push('SYS_SERVER_RESCAN_STARTING')
122
self.event_q.push('SYS_SERVER_RESCAN_DONE')
123
return defer.succeed('root_uuid')
127
class BaseTwistedTestCase(TwistedTestCase):
128
""" Base TestCase that provides:
129
mktemp(name): helper to create temporary dirs
132
def mktemp(self, name='temp'):
133
""" Customized mktemp that accepts an optional name argument. """
134
tempdir = os.path.join(self.tmpdir, name)
135
if os.path.exists(tempdir):
137
self.makedirs(tempdir)
142
""" default tmpdir: 'module name'/'class name'"""
143
MAX_FILENAME = 32 # some platforms limit lengths of filenames
144
base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
145
self.__class__.__name__[:MAX_FILENAME],
146
self._testMethodName[:MAX_FILENAME])
147
# use _trial_temp dir, it should be os.gwtcwd()
148
# define the root temp dir of the testcase, pylint: disable-msg=W0201
149
self.__root = os.path.join(os.getcwd(), base)
150
return os.path.join(self.__root, 'tmpdir')
152
def rmtree(self, path):
153
""" rmtree that handle ro parent(s) and childs. """
154
# change perms to rw, so we can delete the temp dir
155
if path != getattr(self, '__root', None):
156
os.chmod(os.path.dirname(path), 0755)
158
for dirpath, dirs, files in os.walk(path):
160
os.chmod(os.path.join(dirpath, dir), 0777)
163
def makedirs(self, path):
164
""" makedirs that handle ro parent. """
165
parent = os.path.dirname(path)
166
if os.path.exists(parent):
167
os.chmod(parent, 0755)
171
""" cleanup the temp dir. """
172
if hasattr(self, '__root'):
173
self.rmtree(self.__root)
174
return TwistedTestCase.tearDown(self)
177
class DBusTwistedTestCase(BaseTwistedTestCase):
178
""" Test the DBus event handling """
181
""" Setup the infrastructure fo the test (dbus service). """
182
BaseTwistedTestCase.setUp(self)
183
self.log = logging.getLogger("ubuntuone.SyncDaemon.TEST")
184
self.log.info("starting test %s.%s", self.__class__.__name__,
185
self._testMethodName)
187
self.data_dir = self.mktemp('data_dir')
188
self.root_dir = self.mktemp('root_dir')
189
self.shares_dir = self.mktemp('shares_dir')
190
self.main = FakeMain(self.root_dir, self.shares_dir, self.data_dir)
191
self.fs_manager = self.main.fs
192
self.event_q = self.main.event_q
193
self.action_q = self.main.action_q
194
self.loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
195
self.bus = dbus.bus.BusConnection(mainloop=self.loop)
196
self.nm = FakeNetworkManager(self.bus)
197
# monkeypatch busName.__del__ to avoid errors on gc
198
# we take care of releasing the name in shutdown
199
dbus.service.BusName.__del__ = lambda _: None
200
self.dbus_iface = DBusInterface(self.bus, self.main,
202
self.busName = self.dbus_iface.busName
203
self.bus.set_exit_on_disconnect(False)
204
self.dbus_iface.connect()
205
self.event_q.push('SYS_WAIT_FOR_LOCAL_RESCAN')
206
self.event_q.push('SYS_LOCAL_RESCAN_DONE')
207
self.signal_receivers = set()
210
""" Cleanup the test. """
211
d = self.cleanup_signal_receivers()
212
d.addBoth(self._tearDown)
215
def _tearDown(self, *args):
218
self.dbus_iface.shutdown()
220
self.dbus_iface.bus.flush()
223
self.rmtree(self.shares_dir)
224
self.rmtree(self.root_dir)
225
self.rmtree(self.data_dir)
226
self.log.info("finished test %s.%s", self.__class__.__name__,
227
self._testMethodName)
228
return BaseTwistedTestCase.tearDown(self)
230
def error_handler(self, *error):
231
""" default error handler for DBus calls. """
234
def cleanup_signal_receivers(self):
235
""" cleanup self.signal_receivers and returns a deferred """
237
for match in self.signal_receivers:
241
""" callback that accepts *args. """
243
self.bus.call_async(dbus.bus.BUS_DAEMON_NAME,
244
dbus.bus.BUS_DAEMON_PATH,
245
dbus.bus.BUS_DAEMON_IFACE, 'RemoveMatch', 's',
246
(str(match),), callback, self.error_handler)
249
return defer.DeferredList(deferreds)
251
return defer.succeed(True)
255
class FakeActionQueue(object):
256
""" stub implementation """
258
implements(interfaces.IActionQueue)
260
def __init__(self, eq, *args, **kwargs):
261
""" Creates the instance """
263
self.client = action_queue.ActionQueueProtocol()
265
self.downloading = {}
266
class UUID_Map(object):
268
def set(self, *args):
269
"""mock set method"""
272
self.uuid_map = UUID_Map()
273
self.content_queue = action_queue.ContentQueue('CONTENT', self)
274
self.meta_queue = action_queue.MetaQueue('META', self)
276
def connect(self, host=None, port=None, user_ssl=False):
277
""" stub implementation """
278
self.eq.push('SYS_CONNECTION_MADE')
280
def disconnect(self):
281
""" stub implementation """
284
def cancel_download(self, share_id, node_id):
285
""" stub implementation """
288
def cancel_upload(self, share_id, node_id):
289
""" stub implementation """
292
def download(self, share_id, node_id, server_hash, fileobj):
293
""" stub implementation """
296
def upload(self, share_id, node_id, previous_hash, hash, crc32,
297
size, deflated_size, fileobj):
298
""" stub implementation """
301
def make_file(self, share_id, parent_id, name, marker):
302
""" stub implementation """
305
def make_dir(self, share_id, parent_id, name, marker):
306
""" stub implementation """
309
def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
310
""" stub implementation """
313
def unlink(self, share_id, node_id):
314
""" stub implementation """
317
def query(self, items):
318
""" stub implementation """
321
def listdir(self, share_id, node_id, server_hash, fileobj):
322
""" stub implementation """
325
def list_shares(self):
326
""" stub implementation """
329
def answer_share(self, share_id, answer):
330
""" stub implementation """
332
reactor.callLater(0.2, d.callback, True)
335
def create_share(self, *args):
336
""" sutb implementation """
340
class MementoHandler(logging.Handler):
341
""" A handler class which store logging records in a list """
343
def __init__(self, *args, **kwargs):
344
""" Create the instance, and add a records attribute. """
345
logging.Handler.__init__(self, *args, **kwargs)
348
def emit(self, record):
349
""" Just add the record to self.records. """
350
self.records.append(record)
353
class FakeNetworkManager(DBusExposedObject):
354
""" A fake NetworkManager that only emits StatusChanged signal. """
358
def __init__(self, bus):
359
""" Creates the instance. """
360
self.path = '/org/freedesktop/NetworkManager'
362
self.bus.request_name('org.freedesktop.NetworkManager',
363
flags=dbus.bus.NAME_FLAG_REPLACE_EXISTING | \
364
dbus.bus.NAME_FLAG_DO_NOT_QUEUE | \
365
dbus.bus.NAME_FLAG_ALLOW_REPLACEMENT)
366
self.busName = dbus.service.BusName('org.freedesktop.NetworkManager',
368
DBusExposedObject.__init__(self, bus_name=self.busName,
372
""" Shutdown the fake NetworkManager """
373
self.busName.get_bus().release_name(self.busName.get_name())
374
self.remove_from_connection()
376
@dbus.service.signal('org.freedesktop.NetworkManager', signature='i')
377
def StateChanged(self, state):
378
""" Fire DBus signal StatusChanged. """
381
def emit_connected(self):
382
""" Emits the signal StateCganged(3). """
383
self.StateChanged(NM_STATE_CONNECTED)
385
def emit_disconnected(self):
386
""" Emits the signal StateCganged(4). """
387
self.StateChanged(NM_STATE_DISCONNECTED)
389
@dbus.service.method(dbus.PROPERTIES_IFACE,
390
in_signature='ss', out_signature='v',
391
async_callbacks=('reply_handler', 'error_handler'))
392
def Get(self, interface, propname, reply_handler=None, error_handler=None):
394
Fake dbus's Get method to get at the State property
397
reply_handler(getattr(self, propname, None))
398
except Exception, e: # pylint: disable-msg=W0703
401
class FakeVolumeManager(object):
402
""" A volume manager that only knows one share, the root"""
404
def __init__(self, root_path):
405
""" Creates the instance"""
406
self.root = volume_manager.Share(root_path, access_level='Modify')
407
self.shares = {'':self.root}
408
self.log = logging.getLogger('ubuntuone.SyncDaemon.VM-test')
410
def add_share(self, share):
411
""" Adss share to the shares dict """
412
self.shares[share.id] = share
413
# if the share don't exists, create it
414
if not os.path.exists(share.path):
416
# if it's a ro share, change the perms
417
if not share.can_write():
418
os.chmod(share.path, 0555)
420
def on_server_root(self, _):