~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/tests/test_store.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import tempfile
 
2
import shutil
 
3
import os
 
4
 
 
5
from landscape.lib.persist import Persist
 
6
from landscape.broker.store import MessageStore
 
7
from landscape.schema import InvalidError, Message, Int, String, UnicodeOrString
 
8
 
 
9
from landscape.tests.helpers import LandscapeTest
 
10
from landscape.tests.mocker import ANY
 
11
from landscape import API
 
12
 
 
13
 
 
14
class MessageStoreTest(LandscapeTest):
 
15
 
 
16
    def setUp(self):
 
17
        super(MessageStoreTest, self).setUp()
 
18
        self.time = 0
 
19
        self.temp_dir = tempfile.mkdtemp()
 
20
        self.persist = Persist()
 
21
        self.store = MessageStore(self.persist,
 
22
                                  self.temp_dir, 20, get_time=self.get_time)
 
23
        self.store.set_accepted_types(["empty", "data"])
 
24
        self.store.add_schema(Message("empty", {}))
 
25
        self.store.add_schema(Message("empty2", {}))
 
26
        self.store.add_schema(Message("data", {"data": String()}))
 
27
        self.store.add_schema(Message("unaccepted", {"data": String()}))
 
28
 
 
29
    def tearDown(self):
 
30
        super(MessageStoreTest, self).tearDown()
 
31
        shutil.rmtree(self.temp_dir)
 
32
 
 
33
    def get_time(self):
 
34
        return self.time
 
35
 
 
36
    def test_get_sequence(self):
 
37
        self.assertEquals(self.store.get_sequence(), 0)
 
38
        self.store.set_sequence(3)
 
39
        self.assertEquals(self.store.get_sequence(), 3)
 
40
 
 
41
    def test_get_pending_offset(self):
 
42
        self.assertEquals(self.store.get_pending_offset(), 0)
 
43
        self.store.set_pending_offset(3)
 
44
        self.assertEquals(self.store.get_pending_offset(), 3)
 
45
 
 
46
    def test_add_pending_offset(self):
 
47
        self.assertEquals(self.store.get_pending_offset(), 0)
 
48
        self.store.add_pending_offset(3)
 
49
        self.assertEquals(self.store.get_pending_offset(), 3)
 
50
        self.store.add_pending_offset(3)
 
51
        self.assertEquals(self.store.get_pending_offset(), 6)
 
52
        self.store.add_pending_offset(-3)
 
53
        self.assertEquals(self.store.get_pending_offset(), 3)
 
54
 
 
55
    def test_no_pending_messages(self):
 
56
        self.assertEquals(self.store.get_pending_messages(1), [])
 
57
 
 
58
    def test_delete_no_messages(self):
 
59
        self.store.delete_old_messages()
 
60
 
 
61
    def test_delete_old_messages_does_not_delete_held(self):
 
62
        """
 
63
        Deleting old messages should avoid deleting held messages.
 
64
        """
 
65
        self.store.add({"type": "unaccepted", "data": "blah"})
 
66
        self.store.add({"type": "empty"})
 
67
        self.store.set_pending_offset(1)
 
68
        self.store.delete_old_messages()
 
69
        self.store.set_accepted_types(["empty", "unaccepted"])
 
70
        self.store.set_pending_offset(0)
 
71
        messages = self.store.get_pending_messages()
 
72
        self.assertEquals(len(messages), 1)
 
73
        self.assertEquals(messages[0]["type"], "unaccepted")
 
74
 
 
75
    def test_delete_all_messages(self):
 
76
        """Resetting the message store means removing *ALL* messages."""
 
77
        self.store.set_accepted_types(["empty"])
 
78
        self.store.add({"type": "unaccepted", "data": "blah"})
 
79
        self.store.add({"type": "empty"})
 
80
        self.store.add({"type": "unaccepted", "data": "blah"})
 
81
        self.store.add({"type": "empty"})
 
82
        self.store.set_pending_offset(2)
 
83
        self.store.delete_all_messages()
 
84
        self.store.set_accepted_types(["empty", "unaccepted"])
 
85
        self.assertEquals(self.store.get_pending_offset(), 0)
 
86
        self.assertEquals(self.store.get_pending_messages(), [])
 
87
 
 
88
    def test_one_message(self):
 
89
        self.store.add(dict(type="data", data="A thing"))
 
90
        messages = self.store.get_pending_messages(200)
 
91
        self.assertMessages(messages,
 
92
                            [{"type": "data",
 
93
                              "data": "A thing",
 
94
                              "api": API}])
 
95
 
 
96
    def test_max_pending(self):
 
97
        for i in range(10):
 
98
            self.store.add(dict(type="data", data=str(i)))
 
99
        il = [m["data"] for m in self.store.get_pending_messages(5)]
 
100
        self.assertEquals(il, map(str, [0, 1, 2, 3, 4]))
 
101
 
 
102
    def test_offset(self):
 
103
        self.store.set_pending_offset(5)
 
104
        for i in range(15):
 
105
            self.store.add(dict(type="data", data=str(i)))
 
106
        il = [m["data"] for m in self.store.get_pending_messages(5)]
 
107
        self.assertEquals(il, map(str, [5, 6, 7, 8, 9]))
 
108
 
 
109
    def test_exercise_multi_dir(self):
 
110
        for i in range(35):
 
111
            self.store.add(dict(type="data", data=str(i)))
 
112
        il = [m["data"] for m in self.store.get_pending_messages(50)]
 
113
        self.assertEquals(il, map(str, range(35)))
 
114
 
 
115
    def test_wb_clean_up_empty_directories(self):
 
116
        for i in range(60):
 
117
            self.store.add(dict(type="data", data=str(i)))
 
118
        il = [m["data"] for m in self.store.get_pending_messages(60)]
 
119
        self.assertEquals(il, map(str, range(60)))
 
120
        self.assertEquals(set(os.listdir(self.temp_dir)), set(["0", "1", "2"]))
 
121
 
 
122
        self.store.set_pending_offset(60)
 
123
        self.store.delete_old_messages()
 
124
        self.assertEquals(os.listdir(self.temp_dir), [])
 
125
 
 
126
    def test_unaccepted(self):
 
127
        for i in range(10):
 
128
            self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
 
129
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
130
        self.assertEquals(il, map(str, [0, 2, 4, 6, 8]))
 
131
 
 
132
    def test_unaccepted_with_offset(self):
 
133
        for i in range(10):
 
134
            self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
 
135
        self.store.set_pending_offset(2)
 
136
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
137
        self.assertEquals(il, map(str, [4, 6, 8]))
 
138
 
 
139
    def test_unaccepted_reaccepted(self):
 
140
        for i in range(10):
 
141
            self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
 
142
        self.store.set_pending_offset(2)
 
143
        il = [m["data"] for m in self.store.get_pending_messages(2)]
 
144
        self.store.set_accepted_types(["data", "unaccepted"])
 
145
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
146
        self.assertEquals(il, map(str, [4, 6, 8, 1, 3, 5, 7, 9]))
 
147
 
 
148
    def test_accepted_unaccepted(self):
 
149
        for i in range(10):
 
150
            self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
 
151
        # Setting pending offset here means that the first two
 
152
        # messages, even though becoming unaccepted now, were already
 
153
        # accepted before, so they shouldn't be marked for hold.
 
154
        self.store.set_pending_offset(2)
 
155
        self.store.set_accepted_types(["unaccepted"])
 
156
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
157
        self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
 
158
        self.store.set_accepted_types(["data", "unaccepted"])
 
159
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
160
        self.assertEquals(il, map(str, [1, 3, 5, 7, 9, 4, 6, 8]))
 
161
 
 
162
    def test_accepted_unaccepted_old(self):
 
163
        for i in range(10):
 
164
            self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
 
165
        self.store.set_pending_offset(2)
 
166
        self.store.set_accepted_types(["unaccepted"])
 
167
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
168
        self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
 
169
        # Now, if the server asks us to go back and process
 
170
        # previously accepted messages that are now unaccepted,
 
171
        # they should be put on hold.
 
172
        self.store.set_pending_offset(0)
 
173
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
174
        self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
 
175
        # When the server starts accepting them again, these old
 
176
        # messages will also be delivered.
 
177
        self.store.set_accepted_types(["data", "unaccepted"])
 
178
        il = [m["data"] for m in self.store.get_pending_messages(20)]
 
179
        self.assertEquals(il, map(str, [1, 3, 5, 7, 9, 0, 2, 4, 6, 8]))
 
180
 
 
181
    def test_wb_handle_broken_messages(self):
 
182
        self.log_helper.ignore_errors(ValueError)
 
183
        self.store.add({"type": "empty"})
 
184
        self.store.add({"type": "empty2"})
 
185
 
 
186
        filename = os.path.join(self.temp_dir, "0", "0")
 
187
        self.assertTrue(os.path.isfile(filename))
 
188
 
 
189
        file = open(filename, "w")
 
190
        file.write("bpickle will break reading this")
 
191
        file.close()
 
192
 
 
193
        self.assertEquals(self.store.get_pending_messages(), [])
 
194
 
 
195
        # FIXME This is an unfortunate assertion because it relies on
 
196
        # a message generated by external code.  As it turns out, this
 
197
        # message is different between Python 2.4 and 2.5.  The
 
198
        # snippet checked here is the largest common chunk between
 
199
        # Python 2.4 and 2.5.  It might be worth making the message
 
200
        # store call an event handler when it encounters a broken
 
201
        # message and hooking on that for this assertion instead of
 
202
        # relying on this fragile check.
 
203
        self.assertTrue("invalid literal for int()" in self.logfile.getvalue(),
 
204
                        self.logfile.getvalue())
 
205
 
 
206
        self.logfile.seek(0)
 
207
        self.logfile.truncate()
 
208
 
 
209
        # Unholding will also load the message.
 
210
        self.store.set_accepted_types([])
 
211
        self.store.set_accepted_types(["empty", "empty2"])
 
212
 
 
213
        self.assertTrue("invalid literal for int()" in self.logfile.getvalue(),
 
214
                        self.logfile.getvalue())
 
215
 
 
216
    def test_wb_delete_messages_with_broken(self):
 
217
        self.log_helper.ignore_errors(ValueError)
 
218
        self.store.add({"type": "data", "data": "1"})
 
219
        self.store.add({"type": "data", "data": "2"})
 
220
 
 
221
        filename = os.path.join(self.temp_dir, "0", "0")
 
222
        self.assertTrue(os.path.isfile(filename))
 
223
 
 
224
        file = open(filename, "w")
 
225
        file.write("bpickle will break reading this")
 
226
        file.close()
 
227
 
 
228
        messages = self.store.get_pending_messages()
 
229
 
 
230
        self.assertEquals(messages, [{"type": "data", "data": "2",
 
231
                                      "api": API}])
 
232
 
 
233
        self.store.set_pending_offset(len(messages))
 
234
 
 
235
        messages = self.store.get_pending_messages()
 
236
        self.store.delete_old_messages()
 
237
        self.assertEquals(messages, [])
 
238
        self.assertTrue("ValueError" in self.logfile.getvalue())
 
239
 
 
240
    def test_atomic_message_writing(self):
 
241
        """
 
242
        If the server gets unplugged halfway through writing a file,
 
243
        the message should not be half-written.
 
244
        """
 
245
        store = MessageStore(self.persist, self.temp_dir,
 
246
                             get_time=self.get_time)
 
247
        store.add_schema(Message("data", {"data": Int()}))
 
248
        store.add({"type": "data", "data": 1})
 
249
        # We simulate it by creating a fake file which raises halfway through
 
250
        # writing a file.
 
251
        replaced_file_factory = self.mocker.replace("__builtin__.open",
 
252
                                                    passthrough=False)
 
253
        replaced_file = replaced_file_factory(ANY, "w")
 
254
        replaced_file.write(ANY)
 
255
        self.mocker.throw(IOError("Sorry, pal!"))
 
256
        self.mocker.replay()
 
257
        # This kind of ensures that raising an exception is somewhat
 
258
        # similar to unplugging the power -- i.e., we're not relying
 
259
        # on special exception-handling in the file-writing code.
 
260
        self.assertRaises(IOError, store.add, {"type": "data", "data": 2})
 
261
        self.mocker.verify()
 
262
        self.mocker.reset()
 
263
        self.assertEquals(store.get_pending_messages(),
 
264
                          [{"type": "data", "data": 1,
 
265
                            "api": API}])
 
266
 
 
267
    def test_api_attribute(self):
 
268
        self.assertEquals(self.store.api, API)
 
269
        new_api = "New API version!"
 
270
        self.store.api = new_api
 
271
        self.assertEquals(self.store.api, new_api)
 
272
 
 
273
    def test_default_api_on_messages(self):
 
274
        self.store.add({"type": "empty"})
 
275
        self.assertEquals(self.store.get_pending_messages(),
 
276
                          [{"type": "empty", "api": API}])
 
277
 
 
278
    def test_custom_api_on_store(self):
 
279
        self.store.api = "X.Y"
 
280
        self.store.add({"type": "empty"})
 
281
        self.assertEquals(self.store.get_pending_messages(),
 
282
                          [{"type": "empty", "api": "X.Y"}])
 
283
 
 
284
    def test_custom_api_on_messages(self):
 
285
        self.store.add({"type": "empty", "api": "X.Y"})
 
286
        self.assertEquals(self.store.get_pending_messages(),
 
287
                          [{"type": "empty", "api": "X.Y"}])
 
288
 
 
289
    def test_coercion(self):
 
290
        """
 
291
        When adding a message to the mesage store, it should be
 
292
        coerced according to the message schema for the type of the
 
293
        message.
 
294
        """
 
295
        self.assertRaises(InvalidError,
 
296
                          self.store.add, {"type": "data", "data": 3})
 
297
 
 
298
 
 
299
    def test_coercion_ignores_custom_api(self):
 
300
        """
 
301
        If a custom 'api' key is specified in the message, it should
 
302
        not be considered during schema verification.
 
303
        """
 
304
        self.store.add({"type": "empty", "api": "whatever"})
 
305
 
 
306
    def test_message_is_actually_coerced(self):
 
307
        """
 
308
        The message that eventually gets sent should be the result of
 
309
        the coercion.
 
310
        """
 
311
        self.store.add_schema(
 
312
            Message("data", {"data": UnicodeOrString("utf-8")}))
 
313
        self.store.add({"type": "data",
 
314
                        "data": u"\N{HIRAGANA LETTER A}".encode("utf-8"),
 
315
                        "api": "whatever"})
 
316
        self.assertEquals(self.store.get_pending_messages(),
 
317
                          [{"type": "data", "api": "whatever",
 
318
                            "data": u"\N{HIRAGANA LETTER A}"}])
 
319
 
 
320
    def test_count_pending_messages(self):
 
321
        """It is possible to get the total number of pending messages."""
 
322
        self.assertEquals(self.store.count_pending_messages(), 0)
 
323
        self.store.add({"type": "empty"})
 
324
        self.assertEquals(self.store.count_pending_messages(), 1)
 
325
        self.store.add({"type": "data", "data": "yay"})
 
326
        self.assertEquals(self.store.count_pending_messages(), 2)
 
327
 
 
328
    def test_commit(self):
 
329
        """
 
330
        The Message Store can be told to save its persistent data to disk on
 
331
        demand.
 
332
        """
 
333
        filename = self.make_path()
 
334
        store = MessageStore(Persist(filename=filename), self.temp_dir)
 
335
        store.set_accepted_types(["foo", "bar"])
 
336
 
 
337
        self.assertFalse(os.path.exists(filename))
 
338
        store.commit()
 
339
        self.assertTrue(os.path.exists(filename))
 
340
 
 
341
        store = MessageStore(Persist(filename=filename), self.temp_dir)
 
342
        self.assertEquals(set(store.get_accepted_types()),
 
343
                          set(["foo", "bar"]))
 
344
 
 
345
    def test_is_pending_pre_and_post_message_delivery(self):
 
346
        self.log_helper.ignore_errors(ValueError)
 
347
 
 
348
        # We add a couple of messages held and broken, and also a few normal
 
349
        # messages before and after, just to increase the chances of breaking
 
350
        # due to picking the pending offset incorrectly.
 
351
        self.store.set_accepted_types(["empty"])
 
352
 
 
353
        # For the same reason we break the first message.
 
354
        self.store.add({"type": "empty"})
 
355
 
 
356
        filename = os.path.join(self.temp_dir, "0", "0")
 
357
        self.assertTrue(os.path.isfile(filename))
 
358
 
 
359
        file = open(filename, "w")
 
360
        file.write("bpickle will break reading this")
 
361
        file.close()
 
362
 
 
363
        # And hold the second one.
 
364
        self.store.add({"type": "data", "data": "A thing"})
 
365
 
 
366
        self.store.add({"type": "empty"})
 
367
        self.store.add({"type": "empty"})
 
368
        id = self.store.add({"type": "empty"})
 
369
        self.store.add({"type": "empty"})
 
370
        self.store.add({"type": "empty"})
 
371
 
 
372
        # Broken messages will be processed here.
 
373
        self.assertTrue(len(self.store.get_pending_messages()), 5)
 
374
 
 
375
        self.assertTrue(self.store.is_pending(id))
 
376
        self.store.add_pending_offset(2)
 
377
        self.assertTrue(self.store.is_pending(id))
 
378
        self.store.add_pending_offset(1)
 
379
        self.assertFalse(self.store.is_pending(id))
 
380
 
 
381
    def test_is_pending_with_held_message(self):
 
382
 
 
383
        self.store.set_accepted_types(["empty"])
 
384
        id = self.store.add({"type": "data", "data": "A thing"})
 
385
 
 
386
        # Add another normal message and increment the pending offset
 
387
        # to make the held message stay "behind" in the queue.
 
388
        self.store.add({"type": "empty"})
 
389
        self.store.add_pending_offset(1)
 
390
 
 
391
        self.assertTrue(self.store.is_pending(id))
 
392
 
 
393
    def test_is_pending_with_broken_message(self):
 
394
        """When a message breaks we consider it to be no longer there."""
 
395
 
 
396
        self.log_helper.ignore_errors(ValueError)
 
397
 
 
398
        id = self.store.add({"type": "empty"})
 
399
 
 
400
        filename = os.path.join(self.temp_dir, "0", "0")
 
401
        self.assertTrue(os.path.isfile(filename))
 
402
 
 
403
        file = open(filename, "w")
 
404
        file.write("bpickle will break reading this")
 
405
        file.close()
 
406
 
 
407
        self.assertEquals(self.store.get_pending_messages(), [])
 
408
 
 
409
        self.assertFalse(self.store.is_pending(id))