~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_status_listener.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-01-25 16:42:52 UTC
  • mto: This revision was merged to the branch mainline in revision 64.
  • Revision ID: james.westby@ubuntu.com-20110125164252-rl1pybasx1nsqgoy
Tags: upstream-1.5.3
ImportĀ upstreamĀ versionĀ 1.5.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# tests.syncdaemon.test_status_listener
 
2
#
 
3
# Author: Alejandro J. Cura <alecu@canonical.com>
 
4
#
 
5
# Copyright 2011 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""Test the syncdaemon status listener."""
 
19
 
 
20
import os
 
21
 
 
22
from twisted.internet import defer
 
23
from twisted.trial.unittest import TestCase
 
24
 
 
25
from contrib.testing.testcase import FakeMainTestCase
 
26
from ubuntuone.syncdaemon import status_listener
 
27
from ubuntuone.syncdaemon.volume_manager import Share
 
28
 
 
29
 
 
30
class GetListenerTestCase(TestCase):
 
31
    """The status listener is created."""
 
32
 
 
33
    def test_returns_listener(self):
 
34
        """get_listener returns a listener if status reporting is enabled."""
 
35
        self.patch(status_listener, "should_start_listener", lambda: True)
 
36
        fsm = object()
 
37
        vm = object()
 
38
        listener = status_listener.get_listener(fsm, vm)
 
39
        self.assertIsInstance(listener, status_listener.StatusListener)
 
40
        self.assertEqual(listener.fsm, fsm)
 
41
        self.assertEqual(listener.vm, vm)
 
42
        self.assertNotEqual(listener.aggregator, None)
 
43
 
 
44
    def test_zeitgeist_not_installed_returns_none(self):
 
45
        """get_listener returns None if status reporting is disabled."""
 
46
        self.patch(status_listener, "should_start_listener", lambda: False)
 
47
        listener = status_listener.get_listener(None, None)
 
48
        self.assertEqual(listener, None)
 
49
 
 
50
 
 
51
def listen_for(event_q, event, callback, count=1, collect=False):
 
52
    """Setup a EQ listener for the specified event."""
 
53
    class Listener(object):
 
54
        """A basic listener to handle the pushed event."""
 
55
 
 
56
        def __init__(self):
 
57
            self.hits = 0
 
58
            self.events = []
 
59
 
 
60
        def _handle_event(self, *args, **kwargs):
 
61
            self.hits += 1
 
62
            if collect:
 
63
                self.events.append((args, kwargs))
 
64
            if self.hits == count:
 
65
                event_q.unsubscribe(self)
 
66
                if collect:
 
67
                    callback(self.events)
 
68
                elif kwargs:
 
69
                    callback((args, kwargs))
 
70
                else:
 
71
                    callback(args)
 
72
 
 
73
    listener = Listener()
 
74
    setattr(listener, 'handle_'+event, listener._handle_event)
 
75
    event_q.subscribe(listener)
 
76
    return listener
 
77
 
 
78
 
 
79
class FakeStatusAggregator(object):
 
80
    """A fake status aggregator."""
 
81
 
 
82
    def __init__(self):
 
83
        """Initialize this instance."""
 
84
        self.call_log = []
 
85
 
 
86
    def __getattr__(self, f_name):
 
87
        """Return a method that will log it's args when called."""
 
88
 
 
89
        def f(*args, **kwargs):
 
90
            """Log the args I'm passed."""
 
91
            self.call_log.append((f_name, args, kwargs))
 
92
 
 
93
        return f
 
94
 
 
95
 
 
96
class StatusListenerTestCase(FakeMainTestCase):
 
97
    """Tests for StatusListener."""
 
98
 
 
99
    def setUp(self):
 
100
        """Initialize this instance."""
 
101
        super(StatusListenerTestCase, self).setUp()
 
102
        self.aggregator = FakeStatusAggregator()
 
103
        self.listener = status_listener.StatusListener(self.fs, self.vm,
 
104
                                                       self.aggregator)
 
105
        self.event_q.subscribe(self.listener)
 
106
 
 
107
    def _listen_for(self, *args, **kwargs):
 
108
        return listen_for(self.main.event_q, *args, **kwargs)
 
109
 
 
110
 
 
111
class PublicFilesStatusTestCase(StatusListenerTestCase):
 
112
    """Public files events are passed to the status object."""
 
113
 
 
114
    @defer.inlineCallbacks
 
115
    def test_publish_url_is_forwarded(self):
 
116
        """Publishing a file with a url is forwarded."""
 
117
        share_id = "share"
 
118
        node_id = "node_id"
 
119
        is_public = True
 
120
        public_url = 'http://example.com/foo.mp3'
 
121
 
 
122
        share_path = os.path.join(self.shares_dir, 'share')
 
123
        self.main.vm.add_share(Share(path=share_path, volume_id='share',
 
124
                               other_username='other username'))
 
125
        path = os.path.join(share_path, "foo.mp3")
 
126
        self.main.fs.create(path, str(share_id))
 
127
        self.main.fs.set_node_id(path, str(node_id))
 
128
 
 
129
        d = defer.Deferred()
 
130
        self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
 
131
        self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
 
132
                               share_id=share_id, node_id=node_id,
 
133
                               is_public=is_public, public_url=public_url)
 
134
        yield d
 
135
        call = ("file_published", (public_url,), {})
 
136
        self.assertIn(call, self.aggregator.call_log)
 
137
 
 
138
    @defer.inlineCallbacks
 
139
    def test_unpublish_url_is_forwarded(self):
 
140
        """Unpublishing a file with a url is forwarded."""
 
141
        share_id = "share"
 
142
        node_id = "node_id"
 
143
        is_public = False
 
144
        public_url = 'http://example.com/foo.mp3'
 
145
 
 
146
        share_path = os.path.join(self.shares_dir, 'share')
 
147
        self.main.vm.add_share(Share(path=share_path, volume_id='share',
 
148
                               other_username='other username'))
 
149
        path = os.path.join(share_path, "foo.mp3")
 
150
        self.main.fs.create(path, str(share_id))
 
151
        self.main.fs.set_node_id(path, str(node_id))
 
152
 
 
153
        d = defer.Deferred()
 
154
        self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
 
155
        self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
 
156
                               share_id=share_id, node_id=node_id,
 
157
                               is_public=is_public, public_url=public_url)
 
158
        yield d
 
159
        call = ("file_unpublished", (public_url,), {})
 
160
        self.assertIn(call, self.aggregator.call_log)