1
# -*- coding: utf-8 *-*
3
# Copyright 2012 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License version 3, as published
7
# by the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12
# PURPOSE. See the GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License along
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
# In addition, as a special exception, the copyright holders give
18
# permission to link the code of portions of this program with the
19
# OpenSSL library under certain conditions as described in each
20
# individual source file, and distribute linked combinations
22
# You must obey the GNU General Public License in all respects
23
# for all of the code used other than OpenSSL. If you modify
24
# file(s) with this exception, you may extend this exception to your
25
# version of the file(s), but you are not obligated to do so. If you
26
# do not wish to do so, delete this exception statement from your
27
# version. If you delete this exception statement from all source
28
# files in the program, then also delete it here.
29
"""Test the Sync Menu."""
32
from collections import Callable
34
from twisted.internet import defer
35
from twisted.trial.unittest import TestCase
37
from ubuntuone.platform import sync_menu
38
from ubuntuone.platform.sync_menu import linux
41
def fake_call_later(*args):
42
"""Fake reactor.callLater."""
45
class FakeStatusFrontend(object):
46
"""Fake StatusFrontend."""
49
self.recent_transfers_data = []
50
self.uploading_data = []
52
def recent_transfers(self):
53
"""Return the fake recent transfers files."""
54
return self.recent_transfers_data
56
def files_uploading(self):
57
"""Return the fake files being upload."""
58
return self.uploading_data
61
class FakeAppLaunchContext(object):
62
def set_timestamp(self, timestamp):
63
self.timestamp = timestamp
66
class FakeGdkDisplay(object):
67
"""Fake Gdk.Display"""
68
def get_app_launch_context(self):
69
return FakeAppLaunchContext()
72
class FakeNullGdk(object):
73
"""Fake Gdk.Display with no default"""
79
class FakeAppInfo(object):
80
"""Fake Gio.AppInfo"""
91
def __new__(cls, *args, **kwargs):
92
cls.instance = super(FakeAppInfo, cls).__new__(cls, *args, **kwargs)
95
def __init__(self, command_line="", name="", flags=0):
96
self.command_line = command_line
101
def launch_default_for_uri(cls, uri, context):
103
cls.context = context
106
def create_from_commandline(cls, command_line, name, flags):
107
cls.instance.__init__(command_line, name, flags)
110
def launch(self, files, context):
113
self.context = context
116
class FakeDesktopAppInfo(FakeAppInfo):
117
"""Fake Gio.DestkopAppInfo"""
118
def __init__(self, desktop_id=""):
119
super(FakeDesktopAppInfo, self).__init__()
120
self.desktop_id = desktop_id
123
def new(cls, desktop_id):
124
cls.instance.__init__(desktop_id)
128
class FakeSyncdaemonService(object):
129
"""Fake SyncdaemonService."""
132
self.connect_called = False
133
self.disconnect_called = False
134
self.fake_root_path = "/home/user/Magicicada"
137
"""Set connect to True."""
138
self.connect_called = True
140
def disconnect(self):
141
"""Set connect to True."""
142
self.disconnect_called = True
144
def get_rootdir(self):
145
"""Return a fake ubuntu one folder path."""
146
return self.fake_root_path
149
class FakeSyncMenuApp(object):
156
return FakeSyncMenuApp()
160
"""Clear the values stored in data."""
161
FakeSyncMenuApp.data = {}
163
def set_menu(self, server):
164
"""Set the menu for SyncMenu App."""
165
self.data['server'] = server
167
def connect(self, signal, callback):
169
self.data['connect'] = (signal, callback)
171
def set_paused(self, status):
172
"""Set the pause state."""
173
self.data['paused'] = status
176
class SyncMenuDummyTestCase(TestCase):
177
"""Test the SyncMenu."""
179
def test_dummy_support(self):
180
"""Check that the Dummy object can be created properly."""
181
dummy = linux.DummySyncMenu('random', 'args')
182
self.assertIsInstance(dummy, linux.DummySyncMenu)
184
def test_dummy_has_update_transfers(self):
185
"""Check that the dummy has the proper methods required by the API."""
186
dummy = linux.DummySyncMenu('random', 'args')
187
self.assertIsInstance(dummy.update_transfers, Callable)
188
self.assertIsInstance(dummy.sync_status_changed, Callable)
191
class SyncMenuTestCase(TestCase):
192
"""Test the SyncMenu."""
194
skip = None if linux.use_syncmenu else "SyncMenu not installed."
196
@defer.inlineCallbacks
198
yield super(SyncMenuTestCase, self).setUp()
199
self.patch(linux.SyncMenu, "App", FakeSyncMenuApp)
200
self.patch(linux.Gdk.Display, "get_default", FakeGdkDisplay)
201
FakeSyncMenuApp.clean()
202
self.syncdaemon_service = FakeSyncdaemonService()
203
self.status_frontend = FakeStatusFrontend()
205
self.sync_menu = sync_menu.UbuntuOneSyncMenu(
206
self.status_frontend, self.syncdaemon_service)
209
"""Check that the menu is properly initialized."""
210
self.assertIsInstance(
211
FakeSyncMenuApp.data['server'], linux.Dbusmenu.Server)
213
self.sync_menu.open_u1.get_parent(), self.sync_menu.root_menu)
215
self.sync_menu.go_to_web.get_parent(), self.sync_menu.root_menu)
217
self.sync_menu.more_storage.get_parent(), self.sync_menu.root_menu)
219
self.sync_menu.get_help.get_parent(), self.sync_menu.root_menu)
221
self.sync_menu.transfers.get_parent(), self.sync_menu.root_menu)
223
self.sync_menu.open_u1_folder.get_parent(),
224
self.sync_menu.root_menu)
226
self.sync_menu.share_file.get_parent(), self.sync_menu.root_menu)
229
return item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
232
get_prop(self.sync_menu.open_u1), linux.OPEN_U1)
234
get_prop(self.sync_menu.open_u1_folder), linux.OPEN_U1_FOLDER)
236
get_prop(self.sync_menu.share_file), linux.SHARE_A_FILE)
238
get_prop(self.sync_menu.go_to_web), linux.GO_TO_WEB)
240
get_prop(self.sync_menu.transfers), linux.TRANSFERS)
242
get_prop(self.sync_menu.more_storage), linux.MORE_STORAGE)
244
get_prop(self.sync_menu.get_help), linux.GET_HELP)
246
self.sync_menu.transfers.update_progress()
247
self.assertIsInstance(
248
self.sync_menu.transfers.separator, linux.Dbusmenu.Menuitem)
250
def test_get_launch_context_with_display(self):
251
"""Check that the proper context is returned."""
252
timestamp = time.time()
253
context = self.sync_menu._get_launch_context(timestamp)
254
self.assertEqual(timestamp, context.timestamp)
256
def test_get_launch_context_with_no_display(self):
257
"""Check that the proper context is returned."""
258
self.patch(linux.Gdk, "Display", FakeNullGdk)
259
context = self.sync_menu._get_launch_context(time.time())
260
self.assertEqual(context, None)
262
def test_open_control_panel_by_command_line(self):
263
"""Check that the proper action is executed."""
264
appinfo = FakeAppInfo()
265
self.patch(linux.Gio, "AppInfo", appinfo)
266
timestamp = time.time()
267
self.sync_menu._open_control_panel_by_command_line(timestamp)
269
self.assertEqual(appinfo.command_line, linux.CLIENT_COMMAND_LINE)
270
self.assertEqual(appinfo.context.timestamp, timestamp)
272
def test_open_control_panel_by_command_line_with_arg(self):
273
"""Check that the proper action is executed."""
274
appinfo = FakeAppInfo()
275
self.patch(linux.Gio, "AppInfo", appinfo)
276
timestamp = time.time()
278
self.sync_menu._open_control_panel_by_command_line(timestamp, arg)
281
appinfo.command_line, "%s %s" % (linux.CLIENT_COMMAND_LINE, arg))
282
self.assertEqual(appinfo.context.timestamp, timestamp)
284
def test_open_uri(self):
285
"""Check that the proper action is executed."""
286
appinfo = FakeAppInfo()
287
self.patch(linux.Gio, "AppInfo", appinfo)
288
timestamp = time.time()
290
self.sync_menu._open_uri(linux.UBUNTUONE_LINK, timestamp)
291
self.assertEqual(appinfo.opened_uri, linux.UBUNTUONE_LINK)
292
self.assertEqual(appinfo.context.timestamp, timestamp)
294
def test_open_u1(self):
295
"""Check that the proper action is executed."""
296
appinfo = FakeDesktopAppInfo()
297
timestamp = time.time()
298
self.patch(linux.Gio, "DesktopAppInfo", appinfo)
300
self.sync_menu.open_control_panel(timestamp=timestamp)
301
self.assertEqual(appinfo.desktop_id, linux.CLIENT_DESKTOP_ID)
302
self.assertTrue(appinfo.launched)
303
self.assertEqual(appinfo.files, [])
304
self.assertEqual(appinfo.context.timestamp, timestamp)
306
def test_open_share_tab(self):
307
"""Check that the proper action is executed."""
308
timestamp = time.time()
312
self.sync_menu, "_open_control_panel_by_command_line",
313
lambda t, a: data.append((t, a)))
314
self.sync_menu.open_share_file_tab(timestamp=timestamp)
315
self.assertEqual(data, [(timestamp, "--switch-to share_links")])
317
def test_go_to_web(self):
318
"""Check that the proper action is executed."""
319
timestamp = time.time()
323
self.sync_menu, "_open_uri", lambda u, t: data.append((t, u)))
324
self.sync_menu.open_go_to_web(timestamp=timestamp)
325
self.assertEqual(data, [(timestamp, linux.DASHBOARD)])
327
def test_open_ubuntu_one_folder(self):
328
"""Check that the proper action is executed."""
329
timestamp = time.time()
333
self.sync_menu, "_open_uri", lambda u, t: data.append((t, u)))
334
self.sync_menu.open_ubuntu_one_folder(timestamp=timestamp)
337
[(timestamp, "file://" + self.syncdaemon_service.fake_root_path)])
339
def test_get_help(self):
340
"""Check that the proper action is executed."""
341
timestamp = time.time()
345
self.sync_menu, "_open_uri", lambda u, t: data.append((t, u)))
346
self.sync_menu.open_web_help(timestamp=timestamp)
347
self.assertEqual(data, [(timestamp, linux.HELP_LINK)])
349
def test_more_storage(self):
350
"""Check that the proper action is executed."""
351
timestamp = time.time()
355
self.sync_menu, "_open_uri", lambda u, t: data.append((t, u)))
356
self.sync_menu.open_get_more_storage(timestamp=timestamp)
357
self.assertEqual(data, [(timestamp, linux.GET_STORAGE_LINK)])
359
def test_empty_transfers(self):
360
"""Check that the Transfers menu is empty."""
361
self.assertEqual(self.sync_menu.transfers.get_children(), [])
363
def test_only_recent(self):
364
"""Check that only recent transfers items are loaded."""
365
data = ['file1', 'file2', 'file3']
366
self.status_frontend.recent_transfers_data = data
367
self.sync_menu.transfers.update_progress()
368
children = self.sync_menu.transfers.get_children()
369
self.assertEqual(len(children), 4)
371
for itemM, itemD in zip(children, data):
373
itemM.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL), itemD)
375
def test_only_progress(self):
376
"""Check that only progress items are loaded."""
378
('file1', 3000, 400),
379
('file2', 2000, 100),
380
('file3', 5000, 4600)]
382
for filename, size, written in data:
383
uploading_data[filename] = (size, written)
384
self.status_frontend.uploading_data = data
385
self.sync_menu.transfers.update_progress()
386
children = self.sync_menu.transfers.get_children()
387
self.assertEqual(len(children), 4)
389
for item in children:
390
text = item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
393
self.assertIn(text, uploading_data)
394
size, written = uploading_data[text]
395
percentage = written * 100 / size
396
self.assertEqual(item.property_get_int(
397
linux.SyncMenu.PROGRESS_MENUITEM_PROP_PERCENT_DONE),
400
def test_full_transfers(self):
401
"""Check that the transfers menu contains the maximum transfers."""
402
# The api of recent transfers always returns a maximum of 5 items
403
data_recent = ['file1', 'file2', 'file3', 'file4', 'file5']
404
self.status_frontend.recent_transfers_data = \
406
self.sync_menu.transfers.update_progress()
407
children = self.sync_menu.transfers.get_children()
408
self.assertEqual(len(children), 6)
409
data_recent.reverse()
410
for itemM, itemD in zip(children, data_recent):
411
self.assertEqual(itemM.property_get(
412
linux.Dbusmenu.MENUITEM_PROP_LABEL), itemD)
415
('file0', 1200, 600),
416
('file1', 3000, 400),
417
('file2', 2000, 100),
418
('file3', 2500, 150),
419
('file4', 1000, 600),
420
('file5', 5000, 4600)]
422
for filename, size, written in data_current:
423
uploading_data[filename] = (size, written)
424
self.status_frontend.uploading_data = data_current
425
self.sync_menu.transfers.update_progress()
426
children = self.sync_menu.transfers.get_children()
427
# The menu should only show 5 current transfers.
428
self.assertEqual(len(children), 11)
429
data_current.reverse()
430
for item in children[6:]:
431
text = item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
432
self.assertIn(text, uploading_data)
433
size, written = uploading_data[text]
434
percentage = written * 100 / size
435
self.assertEqual(item.property_get_int(
436
linux.SyncMenu.PROGRESS_MENUITEM_PROP_PERCENT_DONE),
439
def test_mnemonics_labels(self):
440
"""Check that the transfers menu sanitizes the underscores."""
441
data_recent = ['file_1', 'file__2']
442
self.status_frontend.recent_transfers_data = \
444
self.sync_menu.transfers.update_progress()
445
children = self.sync_menu.transfers.get_children()
446
data_recent.reverse()
447
for itemM, itemD in zip(children, data_recent):
448
self.assertEqual(itemM.property_get(
449
linux.Dbusmenu.MENUITEM_PROP_LABEL), itemD.replace('_', '__'))
451
def test_update_transfers(self):
452
"""Check that everything is ok when updating the transfers value."""
454
('file0', 1200, 600),
455
('file1', 3000, 400),
456
('file4', 1000, 600),
457
('file5', 5000, 4600)]
459
for filename, size, written in data_current:
460
uploading_data[filename] = (size, written)
461
self.status_frontend.uploading_data = data_current
462
self.sync_menu.transfers.update_progress()
463
children = self.sync_menu.transfers.get_children()
464
# The menu should only show 5 current transfers.
465
self.assertEqual(len(children), 5)
466
data_current.reverse()
467
for item in children:
468
text = item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
471
self.assertIn(text, uploading_data)
472
size, written = uploading_data[text]
473
percentage = written * 100 / size
474
self.assertEqual(item.property_get_int(
475
linux.SyncMenu.PROGRESS_MENUITEM_PROP_PERCENT_DONE),
478
data_recent = ['file5']
479
self.status_frontend.recent_transfers_data = data_recent
480
self.sync_menu.transfers.update_progress()
481
children = self.sync_menu.transfers.get_children()
482
self.assertEqual(len(children), 6)
483
data_recent.reverse()
484
for itemM, itemD in zip(children, data_recent):
485
self.assertEqual(itemM.property_get(
486
linux.Dbusmenu.MENUITEM_PROP_LABEL), itemD)
489
('file0', 1200, 700),
490
('file1', 3000, 600),
491
('file4', 1000, 800)]
493
for filename, size, written in data_current:
494
uploading_data[filename] = (size, written)
495
self.status_frontend.uploading_data = data_current
496
self.sync_menu.transfers.update_progress()
497
children = self.sync_menu.transfers.get_children()
498
# The menu should only show 5 current transfers.
499
self.assertEqual(len(children), 5)
500
data_current.reverse()
501
for item in children[6:]:
502
text = item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
503
self.assertIn(text, uploading_data)
504
size, written = uploading_data[text]
505
percentage = written * 100 / size
506
self.assertEqual(item.property_get_int(
507
linux.SyncMenu.PROGRESS_MENUITEM_PROP_PERCENT_DONE),
510
def test_transfers_order(self):
511
"""Check that the proper transfers are shown first."""
513
('file0', 1200, 610),
514
('file1', 3000, 400),
515
('file2', 2000, 100),
516
('file3', 2500, 150),
517
('file4', 2500, 950),
518
('file5', 3500, 550),
519
('file6', 1000, 600),
520
('file7', 5000, 4600)]
522
('file7', 5000, 4600),
523
('file4', 2500, 950),
524
('file0', 1200, 610),
525
('file6', 1000, 600),
526
('file5', 3500, 550)]
527
self.status_frontend.uploading_data = data_current
528
self.sync_menu.transfers.update_progress()
529
children = self.sync_menu.transfers.get_children()
530
# The menu should only show 5 current transfers and a separator item.
531
self.assertEqual(len(children), 6)
533
for item in children:
534
text = item.property_get(linux.Dbusmenu.MENUITEM_PROP_LABEL)
537
percentage = item.property_get_int(
538
linux.SyncMenu.PROGRESS_MENUITEM_PROP_PERCENT_DONE)
539
name, size, written = expected[i]
541
percentage_expected = written * 100 / size
542
self.assertEqual(text, name)
543
self.assertEqual(percentage, percentage_expected)
545
def test_update_transfers_delay(self):
546
"""Check that the timer is being handle properly."""
547
self.sync_menu.next_update = time.time()
548
self.sync_menu.update_transfers()
549
self.sync_menu.timer = None
550
self.sync_menu.next_update = time.time() * 2
551
self.sync_menu.update_transfers()
552
self.assertEqual(self.sync_menu.timer.delay, 3)
554
def test_status_change_from_menu(self):
555
"""Check the behavior when the status is changed from the menu."""
556
self.sync_menu.change_sync_status()
557
self.assertFalse(self.sync_menu._connected)
558
self.assertFalse(self.sync_menu._ignore_status_event)
559
self.assertFalse(self.sync_menu._syncdaemon_service.connect_called)
560
self.assertTrue(self.sync_menu._syncdaemon_service.disconnect_called)
562
self.sync_menu._syncdaemon_service.disconnect_called = False
563
self.sync_menu.change_sync_status()
564
self.assertTrue(self.sync_menu._connected)
565
self.assertFalse(self.sync_menu._ignore_status_event)
566
self.assertTrue(self.sync_menu._syncdaemon_service.connect_called)
567
self.assertFalse(self.sync_menu._syncdaemon_service.disconnect_called)
569
def test_ignore_status(self):
570
"""Check that neither connect or disconnect are called."""
571
self.sync_menu._ignore_status_event = True
572
self.assertTrue(self.sync_menu._ignore_status_event)
573
self.sync_menu.change_sync_status()
574
self.assertTrue(self.sync_menu._connected)
575
self.assertFalse(self.sync_menu._ignore_status_event)
576
self.assertFalse(self.sync_menu._syncdaemon_service.connect_called)
577
self.assertFalse(self.sync_menu._syncdaemon_service.disconnect_called)
579
def test_sync_status_changed(self):
580
"""Check sync_status_changed behavior."""
581
self.sync_menu.sync_status_changed(True)
582
self.assertNotIn('paused', self.sync_menu.app.data)
583
self.sync_menu.sync_status_changed(False)
584
self.assertFalse(self.sync_menu._connected)
585
self.assertTrue(self.sync_menu._ignore_status_event)
586
self.assertTrue(self.sync_menu.app.data['paused'])