~nataliabidart/ubuntuone-client/get-sharesdir

« back to all changes in this revision

Viewing changes to tests/status/test_aggregator.py

  • Committer: Alejandro J. Cura
  • Date: 2011-02-04 02:25:03 UTC
  • mto: This revision was merged to the branch mainline in revision 844.
  • Revision ID: alecu@canonical.com-20110204022503-vnny1l2mjt5413sj
remove delayedbuffer test cases

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 
20
20
import logging
21
21
 
22
 
from twisted.internet import defer, reactor
23
22
from twisted.internet.task import Clock
24
23
from twisted.trial.unittest import TestCase
25
24
 
181
180
            callback, message_time, new_count, icon)
182
181
 
183
182
 
184
 
class DelayedBufferTestCase(TestCase):
185
 
    """Test the delayed buffer class."""
186
 
 
187
 
    THRESHOLD = 1.0
188
 
    SMALL_DELAY = 0.5
189
 
    COUNT_MESSAGES = 4
190
 
    TIMEOUT = 2.0
191
 
 
192
 
    def setUp(self):
193
 
        """Initialize this test instance."""
194
 
        self.result = []
195
 
        self.clock = Clock()
196
 
        append = lambda *args: self.result.append(args)
197
 
        self.buf = aggregator.DelayedBuffer(cb=append, clock=self.clock,
198
 
                                            threshold=self.THRESHOLD,
199
 
                                            timeout=self.TIMEOUT)
200
 
        self.addCleanup(self.buf.cleanup)
201
 
 
202
 
    def test_a_bunch_at_once_aggregated_as_one(self):
203
 
        """A bunch of messages sent at once are aggregated into one."""
204
 
        for n in range(self.COUNT_MESSAGES):
205
 
            self.buf.push_event("message1")
206
 
        self.clock.advance(self.THRESHOLD)
207
 
        self.assertEqual(len(self.result), 1)
208
 
 
209
 
    def test_a_bunch_under_threshold_aggregated_as_one(self):
210
 
        """A bunch of messages sent under threshold are aggregated into one."""
211
 
        for n in range(self.COUNT_MESSAGES):
212
 
            self.buf.push_event("message1")
213
 
            self.clock.advance(self.SMALL_DELAY)
214
 
        self.clock.advance(self.THRESHOLD)
215
 
        self.assertEqual(len(self.result), 1)
216
 
 
217
 
    def test_a_bunch_over_threshold_are_not_aggregated(self):
218
 
        """A bunch of messages sent over the threshold are not aggregated."""
219
 
        for n in range(self.COUNT_MESSAGES):
220
 
            self.buf.push_event("message1")
221
 
            self.clock.advance(self.THRESHOLD)
222
 
        self.clock.advance(self.THRESHOLD + self.SMALL_DELAY)
223
 
        self.assertEqual(len(self.result), self.COUNT_MESSAGES)
224
 
 
225
 
    def test_a_bunch_exceeding_timeout_are_separated(self):
226
 
        """A bunch of messages exceeding timeout are separated."""
227
 
        expected_count = 3
228
 
        for n in range(int(self.TIMEOUT * expected_count / self.SMALL_DELAY)):
229
 
            self.buf.push_event("message1")
230
 
            self.clock.advance(self.SMALL_DELAY)
231
 
        self.clock.advance(self.THRESHOLD)
232
 
        self.assertEqual(len(self.result), expected_count)
233
 
    test_a_bunch_exceeding_timeout_are_separated.skip = \
234
 
        "Clock should sort calls after one is reset. See bug #4823 in twisted."
235
 
 
236
 
 
237
 
class DelayedBufferIntegrationTestCase(TestCase):
238
 
    """Test the delayed buffer class."""
239
 
 
240
 
    THRESHOLD = 0.5
241
 
    SMALL_DELAY = 0.2
242
 
    COUNT_MESSAGES = 4
243
 
    TIMEOUT = 1.0
244
 
 
245
 
    def wait(self, delay):
246
 
        """return a deferred, fired after a given delay."""
247
 
        d = defer.Deferred()
248
 
        reactor.callLater(delay, d.callback, None)
249
 
        return d
250
 
 
251
 
    def setUp(self):
252
 
        """Initialize this test instance."""
253
 
        self.result = []
254
 
        self.clock = Clock()
255
 
        append = lambda *args: self.result.append(args)
256
 
        self.buf = aggregator.DelayedBuffer(cb=append,
257
 
                                            threshold=self.THRESHOLD,
258
 
                                            timeout=self.TIMEOUT)
259
 
 
260
 
    @defer.inlineCallbacks
261
 
    def test_a_bunch_exceeding_timeout_are_separated(self):
262
 
        """A bunch of messages exceeding timeout are separated."""
263
 
        expected_count = 3
264
 
        for n in range(int(self.TIMEOUT * expected_count / self.SMALL_DELAY)):
265
 
            self.buf.push_event("message1")
266
 
            yield self.wait(self.SMALL_DELAY)
267
 
        yield self.wait(self.THRESHOLD)
268
 
        self.assertEqual(len(self.result), expected_count)
269
 
 
270
 
 
271
 
 
272
183
class FakeStatusAggregator(object):
273
184
    """A fake status aggregator."""
274
185
 
435
346
        self.bubble.new_file_found()
436
347
        self.clock.advance(self.updates_delay)
437
348
        self.assertEqual(2, len(self.get_notifications_shown()))
438
 
    test_new_files_found_while_updating_are_shown_after_a_delay.skip = \
439
 
        "Requires thisfred's branch with notification.update_notification."
440
349
 
441
350
    def test_update_resets_progress_bubble(self):
442
351
        """The update callback resets the progress bubble."""