1
# tests.syncdaemon.test_status_listener
3
# Author: Alejandro J. Cura <alecu@canonical.com>
5
# Copyright 2011 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
"""Test the syncdaemon status listener."""
22
from twisted.internet import defer
23
from twisted.trial.unittest import TestCase
25
from contrib.testing.testcase import FakeMainTestCase
26
from ubuntuone.syncdaemon import status_listener
27
from ubuntuone.syncdaemon.volume_manager import Share
30
class GetListenerTestCase(TestCase):
31
"""The status listener is created."""
33
def test_returns_listener(self):
34
"""get_listener returns a listener if status reporting is enabled."""
35
self.patch(status_listener, "should_start_listener", lambda: True)
38
listener = status_listener.get_listener(fsm, vm)
39
self.assertIsInstance(listener, status_listener.StatusListener)
40
self.assertEqual(listener.fsm, fsm)
41
self.assertEqual(listener.vm, vm)
42
self.assertNotEqual(listener.aggregator, None)
44
def test_zeitgeist_not_installed_returns_none(self):
45
"""get_listener returns None if status reporting is disabled."""
46
self.patch(status_listener, "should_start_listener", lambda: False)
47
listener = status_listener.get_listener(None, None)
48
self.assertEqual(listener, None)
51
def listen_for(event_q, event, callback, count=1, collect=False):
52
"""Setup a EQ listener for the specified event."""
53
class Listener(object):
54
"""A basic listener to handle the pushed event."""
60
def _handle_event(self, *args, **kwargs):
63
self.events.append((args, kwargs))
64
if self.hits == count:
65
event_q.unsubscribe(self)
69
callback((args, kwargs))
74
setattr(listener, 'handle_'+event, listener._handle_event)
75
event_q.subscribe(listener)
79
class FakeStatusAggregator(object):
80
"""A fake status aggregator."""
83
"""Initialize this instance."""
86
def __getattr__(self, f_name):
87
"""Return a method that will log it's args when called."""
89
def f(*args, **kwargs):
90
"""Log the args I'm passed."""
91
self.call_log.append((f_name, args, kwargs))
96
class StatusListenerTestCase(FakeMainTestCase):
97
"""Tests for StatusListener."""
100
"""Initialize this instance."""
101
super(StatusListenerTestCase, self).setUp()
102
self.aggregator = FakeStatusAggregator()
103
self.listener = status_listener.StatusListener(self.fs, self.vm,
105
self.event_q.subscribe(self.listener)
107
def _listen_for(self, *args, **kwargs):
108
return listen_for(self.main.event_q, *args, **kwargs)
111
class PublicFilesStatusTestCase(StatusListenerTestCase):
112
"""Public files events are passed to the status object."""
114
@defer.inlineCallbacks
115
def test_publish_url_is_forwarded(self):
116
"""Publishing a file with a url is forwarded."""
120
public_url = 'http://example.com/foo.mp3'
122
share_path = os.path.join(self.shares_dir, 'share')
123
self.main.vm.add_share(Share(path=share_path, volume_id='share',
124
other_username='other username'))
125
path = os.path.join(share_path, "foo.mp3")
126
self.main.fs.create(path, str(share_id))
127
self.main.fs.set_node_id(path, str(node_id))
130
self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
131
self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
132
share_id=share_id, node_id=node_id,
133
is_public=is_public, public_url=public_url)
135
call = ("file_published", (public_url,), {})
136
self.assertIn(call, self.aggregator.call_log)
138
@defer.inlineCallbacks
139
def test_unpublish_url_is_forwarded(self):
140
"""Unpublishing a file with a url is forwarded."""
144
public_url = 'http://example.com/foo.mp3'
146
share_path = os.path.join(self.shares_dir, 'share')
147
self.main.vm.add_share(Share(path=share_path, volume_id='share',
148
other_username='other username'))
149
path = os.path.join(share_path, "foo.mp3")
150
self.main.fs.create(path, str(share_id))
151
self.main.fs.set_node_id(path, str(node_id))
154
self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
155
self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
156
share_id=share_id, node_id=node_id,
157
is_public=is_public, public_url=public_url)
159
call = ("file_unpublished", (public_url,), {})
160
self.assertIn(call, self.aggregator.call_log)