160
161
return get_instance
163
class FakeMessaging(object):
164
class FakeMessaging(AbstractMessaging):
164
165
"""A fake messaging class."""
166
def __init__(self, application_name="fake app"):
167
def __init__(self): # pylint: disable=W0231
167
168
self.messages_shown = {}
168
self.application_name = application_name
169
self.messages_updated = {}
170
def show_message(self, sender, callback, message_time=None,
171
# pylint: disable=R0913
172
def show_message(self, sender, callback=None, message_time=None,
171
173
message_count=None, icon=None):
172
174
"""Show a message to the user."""
175
if message_count and sender in self.messages_shown:
176
self.update_count(sender, message_count)
173
177
self.messages_shown[sender] = (
174
178
callback, message_time, message_count, icon)
179
# pylint: enable=R0913
176
def update_count(self, sender, new_count):
181
def update_count(self, sender, add_count):
177
182
"""Update the count for an existing indicator."""
178
callback, message_time, _, icon = self.messages_shown[sender]
179
self.messages_shown[sender] = (
180
callback, message_time, new_count, icon)
183
self.messages_updated[sender] = (sender, add_count)
183
186
class FakeStatusAggregator(object):
184
187
"""A fake status aggregator."""
186
def __init__(self, clock):
189
def __init__(self, clock): # pylint: disable=W0613
187
190
"""Initialize this instance."""
188
191
self.discovered = 0
189
192
self.completed = 0
555
553
"""Initialize this test instance."""
556
554
self.patch(aggregator, "StatusAggregator", FakeAggregator)
555
self.patch(aggregator, "Notification", FakeNotificationSingleton())
556
self.patch(aggregator, "Messaging", FakeMessaging)
557
557
self.fakefsm = None
558
558
self.fakevm = FakeVolumeManager()
559
559
self.status_frontend = aggregator.StatusFrontend()
560
560
self.listener = status_listener.StatusListener(self.fakefsm,
562
562
self.status_frontend)
563
564
def test_file_published(self):
564
565
"""A file published event is processed."""
565
566
share_id = "fake share id"
566
567
node_id = "fake node id"
568
569
public_url = "http://fake_public/url"
570
570
self.listener.handle_AQ_CHANGE_PUBLIC_ACCESS_OK(share_id, node_id,
571
571
is_public, public_url)
572
event = self.status_frontend.aggregator.interesting_events[0]
573
self.assertIsInstance(event, aggregator.FilePublishingStatus)
573
1, len(self.status_frontend.notification.notifications_shown))
575
(aggregator.UBUNTUONE_TITLE,
576
'A file was just made public at http://fake_public/url', None,
578
self.status_frontend.notification.notifications_shown[0])
575
580
def test_file_unpublished(self):
576
581
"""A file unpublished event is processed."""
582
587
self.listener.handle_AQ_CHANGE_PUBLIC_ACCESS_OK(share_id, node_id,
583
588
is_public, public_url)
584
event = self.status_frontend.aggregator.interesting_events[0]
585
self.assertIsInstance(event, aggregator.FileUnpublishingStatus)
590
1, len(self.status_frontend.notification.notifications_shown))
592
(aggregator.UBUNTUONE_TITLE, 'A file is no longer published', None,
594
self.status_frontend.notification.notifications_shown[0])
587
596
def test_download_started(self):
588
597
"""A download was added to the queue."""
644
653
def test_new_share_available(self):
645
654
"""A new share is available for subscription."""
646
655
SHARE_ID = "fake share id"
647
share = Share(volume_id=SHARE_ID)
657
share = Share(volume_id=SHARE_ID, other_visible_name=FAKE_SENDER)
648
658
self.fakevm.volumes[SHARE_ID] = share
649
659
self.listener.handle_VM_SHARE_CREATED(SHARE_ID)
650
event = self.status_frontend.aggregator.interesting_events[0]
651
self.assertIsInstance(event, aggregator.ShareAvailableStatus)
661
1, len(self.status_frontend.notification.notifications_shown))
663
(aggregator.UBUNTUONE_TITLE,
664
'New cloud folder available: <%s> shared by <%s>' % (
665
'None', FAKE_SENDER), None, False),
666
self.status_frontend.notification.notifications_shown[0])
667
msg = self.status_frontend.messaging.messages_shown[FAKE_SENDER]
668
# msg did not receive a time argument
669
self.assertEqual(None, msg[1])
670
# msg did not receive a count argument
671
self.assertEqual(None, msg[2])
653
673
def test_new_udf_available(self):
654
674
"""A new udf is available for subscription."""
656
676
self.listener.handle_VM_UDF_CREATED(udf)
657
event = self.status_frontend.aggregator.interesting_events[0]
658
self.assertIsInstance(event, aggregator.UDFAvailableStatus)
678
1, len(self.status_frontend.notification.notifications_shown))
680
(aggregator.UBUNTUONE_TITLE, 'New cloud folder available: None',
682
self.status_frontend.notification.notifications_shown[0])
684
1, len(self.status_frontend.messaging.messages_shown))
686
0, len(self.status_frontend.messaging.messages_updated))
687
msg = self.status_frontend.messaging.messages_shown[
688
aggregator.NEW_UDFS_SENDER]
689
# msg did not receive a time argument
690
self.assertEqual(None, msg[1])
691
# msg did receive a count argument
692
self.assertEqual(1, msg[2])
694
def test_two_new_udfs_available(self):
695
"""A new udf is available for subscription."""
697
self.listener.handle_VM_UDF_CREATED(udf1)
699
self.listener.handle_VM_UDF_CREATED(udf2)
701
2, len(self.status_frontend.notification.notifications_shown))
703
(aggregator.UBUNTUONE_TITLE, 'New cloud folder available: None',
705
self.status_frontend.notification.notifications_shown[0])
707
(aggregator.UBUNTUONE_TITLE, 'New cloud folder available: None',
709
self.status_frontend.notification.notifications_shown[1])
711
1, len(self.status_frontend.messaging.messages_shown))
713
1, len(self.status_frontend.messaging.messages_updated))
714
msg = self.status_frontend.messaging.messages_shown[
715
aggregator.NEW_UDFS_SENDER]
716
# msg did not receive a time argument
717
self.assertEqual(None, msg[1])
718
# msg did receive a count argument
719
self.assertEqual(1, msg[2])
721
(aggregator.NEW_UDFS_SENDER, 1),
722
self.status_frontend.messaging.messages_updated[
723
aggregator.NEW_UDFS_SENDER])
661
726
class StatusEventTestCase(TestCase):
663
727
"""Test the status event class and children."""
665
729
CLASS = aggregator.StatusEvent