181
180
callback, message_time, new_count, icon)
184
class DelayedBufferTestCase(TestCase):
185
"""Test the delayed buffer class."""
193
"""Initialize this test instance."""
196
append = lambda *args: self.result.append(args)
197
self.buf = aggregator.DelayedBuffer(cb=append, clock=self.clock,
198
threshold=self.THRESHOLD,
199
timeout=self.TIMEOUT)
200
self.addCleanup(self.buf.cleanup)
202
def test_a_bunch_at_once_aggregated_as_one(self):
203
"""A bunch of messages sent at once are aggregated into one."""
204
for n in range(self.COUNT_MESSAGES):
205
self.buf.push_event("message1")
206
self.clock.advance(self.THRESHOLD)
207
self.assertEqual(len(self.result), 1)
209
def test_a_bunch_under_threshold_aggregated_as_one(self):
210
"""A bunch of messages sent under threshold are aggregated into one."""
211
for n in range(self.COUNT_MESSAGES):
212
self.buf.push_event("message1")
213
self.clock.advance(self.SMALL_DELAY)
214
self.clock.advance(self.THRESHOLD)
215
self.assertEqual(len(self.result), 1)
217
def test_a_bunch_over_threshold_are_not_aggregated(self):
218
"""A bunch of messages sent over the threshold are not aggregated."""
219
for n in range(self.COUNT_MESSAGES):
220
self.buf.push_event("message1")
221
self.clock.advance(self.THRESHOLD)
222
self.clock.advance(self.THRESHOLD + self.SMALL_DELAY)
223
self.assertEqual(len(self.result), self.COUNT_MESSAGES)
225
def test_a_bunch_exceeding_timeout_are_separated(self):
226
"""A bunch of messages exceeding timeout are separated."""
228
for n in range(int(self.TIMEOUT * expected_count / self.SMALL_DELAY)):
229
self.buf.push_event("message1")
230
self.clock.advance(self.SMALL_DELAY)
231
self.clock.advance(self.THRESHOLD)
232
self.assertEqual(len(self.result), expected_count)
233
test_a_bunch_exceeding_timeout_are_separated.skip = \
234
"Clock should sort calls after one is reset. See bug #4823 in twisted."
237
class DelayedBufferIntegrationTestCase(TestCase):
238
"""Test the delayed buffer class."""
245
def wait(self, delay):
246
"""return a deferred, fired after a given delay."""
248
reactor.callLater(delay, d.callback, None)
252
"""Initialize this test instance."""
255
append = lambda *args: self.result.append(args)
256
self.buf = aggregator.DelayedBuffer(cb=append,
257
threshold=self.THRESHOLD,
258
timeout=self.TIMEOUT)
260
@defer.inlineCallbacks
261
def test_a_bunch_exceeding_timeout_are_separated(self):
262
"""A bunch of messages exceeding timeout are separated."""
264
for n in range(int(self.TIMEOUT * expected_count / self.SMALL_DELAY)):
265
self.buf.push_event("message1")
266
yield self.wait(self.SMALL_DELAY)
267
yield self.wait(self.THRESHOLD)
268
self.assertEqual(len(self.result), expected_count)
272
183
class FakeStatusAggregator(object):
273
184
"""A fake status aggregator."""