5
from landscape.lib.persist import Persist
6
from landscape.broker.store import MessageStore
7
from landscape.schema import InvalidError, Message, Int, String, UnicodeOrString
9
from landscape.tests.helpers import LandscapeTest
10
from landscape.tests.mocker import ANY
11
from landscape import API
14
class MessageStoreTest(LandscapeTest):
17
super(MessageStoreTest, self).setUp()
19
self.temp_dir = tempfile.mkdtemp()
20
self.persist = Persist()
21
self.store = MessageStore(self.persist,
22
self.temp_dir, 20, get_time=self.get_time)
23
self.store.set_accepted_types(["empty", "data"])
24
self.store.add_schema(Message("empty", {}))
25
self.store.add_schema(Message("empty2", {}))
26
self.store.add_schema(Message("data", {"data": String()}))
27
self.store.add_schema(Message("unaccepted", {"data": String()}))
30
super(MessageStoreTest, self).tearDown()
31
shutil.rmtree(self.temp_dir)
36
def test_get_sequence(self):
37
self.assertEquals(self.store.get_sequence(), 0)
38
self.store.set_sequence(3)
39
self.assertEquals(self.store.get_sequence(), 3)
41
def test_get_pending_offset(self):
42
self.assertEquals(self.store.get_pending_offset(), 0)
43
self.store.set_pending_offset(3)
44
self.assertEquals(self.store.get_pending_offset(), 3)
46
def test_add_pending_offset(self):
47
self.assertEquals(self.store.get_pending_offset(), 0)
48
self.store.add_pending_offset(3)
49
self.assertEquals(self.store.get_pending_offset(), 3)
50
self.store.add_pending_offset(3)
51
self.assertEquals(self.store.get_pending_offset(), 6)
52
self.store.add_pending_offset(-3)
53
self.assertEquals(self.store.get_pending_offset(), 3)
55
def test_no_pending_messages(self):
56
self.assertEquals(self.store.get_pending_messages(1), [])
58
def test_delete_no_messages(self):
59
self.store.delete_old_messages()
61
def test_delete_old_messages_does_not_delete_held(self):
63
Deleting old messages should avoid deleting held messages.
65
self.store.add({"type": "unaccepted", "data": "blah"})
66
self.store.add({"type": "empty"})
67
self.store.set_pending_offset(1)
68
self.store.delete_old_messages()
69
self.store.set_accepted_types(["empty", "unaccepted"])
70
self.store.set_pending_offset(0)
71
messages = self.store.get_pending_messages()
72
self.assertEquals(len(messages), 1)
73
self.assertEquals(messages[0]["type"], "unaccepted")
75
def test_delete_all_messages(self):
76
"""Resetting the message store means removing *ALL* messages."""
77
self.store.set_accepted_types(["empty"])
78
self.store.add({"type": "unaccepted", "data": "blah"})
79
self.store.add({"type": "empty"})
80
self.store.add({"type": "unaccepted", "data": "blah"})
81
self.store.add({"type": "empty"})
82
self.store.set_pending_offset(2)
83
self.store.delete_all_messages()
84
self.store.set_accepted_types(["empty", "unaccepted"])
85
self.assertEquals(self.store.get_pending_offset(), 0)
86
self.assertEquals(self.store.get_pending_messages(), [])
88
def test_one_message(self):
89
self.store.add(dict(type="data", data="A thing"))
90
messages = self.store.get_pending_messages(200)
91
self.assertMessages(messages,
96
def test_max_pending(self):
98
self.store.add(dict(type="data", data=str(i)))
99
il = [m["data"] for m in self.store.get_pending_messages(5)]
100
self.assertEquals(il, map(str, [0, 1, 2, 3, 4]))
102
def test_offset(self):
103
self.store.set_pending_offset(5)
105
self.store.add(dict(type="data", data=str(i)))
106
il = [m["data"] for m in self.store.get_pending_messages(5)]
107
self.assertEquals(il, map(str, [5, 6, 7, 8, 9]))
109
def test_exercise_multi_dir(self):
111
self.store.add(dict(type="data", data=str(i)))
112
il = [m["data"] for m in self.store.get_pending_messages(50)]
113
self.assertEquals(il, map(str, range(35)))
115
def test_wb_clean_up_empty_directories(self):
117
self.store.add(dict(type="data", data=str(i)))
118
il = [m["data"] for m in self.store.get_pending_messages(60)]
119
self.assertEquals(il, map(str, range(60)))
120
self.assertEquals(set(os.listdir(self.temp_dir)), set(["0", "1", "2"]))
122
self.store.set_pending_offset(60)
123
self.store.delete_old_messages()
124
self.assertEquals(os.listdir(self.temp_dir), [])
126
def test_unaccepted(self):
128
self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
129
il = [m["data"] for m in self.store.get_pending_messages(20)]
130
self.assertEquals(il, map(str, [0, 2, 4, 6, 8]))
132
def test_unaccepted_with_offset(self):
134
self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
135
self.store.set_pending_offset(2)
136
il = [m["data"] for m in self.store.get_pending_messages(20)]
137
self.assertEquals(il, map(str, [4, 6, 8]))
139
def test_unaccepted_reaccepted(self):
141
self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
142
self.store.set_pending_offset(2)
143
il = [m["data"] for m in self.store.get_pending_messages(2)]
144
self.store.set_accepted_types(["data", "unaccepted"])
145
il = [m["data"] for m in self.store.get_pending_messages(20)]
146
self.assertEquals(il, map(str, [4, 6, 8, 1, 3, 5, 7, 9]))
148
def test_accepted_unaccepted(self):
150
self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
151
# Setting pending offset here means that the first two
152
# messages, even though becoming unaccepted now, were already
153
# accepted before, so they shouldn't be marked for hold.
154
self.store.set_pending_offset(2)
155
self.store.set_accepted_types(["unaccepted"])
156
il = [m["data"] for m in self.store.get_pending_messages(20)]
157
self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
158
self.store.set_accepted_types(["data", "unaccepted"])
159
il = [m["data"] for m in self.store.get_pending_messages(20)]
160
self.assertEquals(il, map(str, [1, 3, 5, 7, 9, 4, 6, 8]))
162
def test_accepted_unaccepted_old(self):
164
self.store.add(dict(type=["data", "unaccepted"][i%2], data=str(i)))
165
self.store.set_pending_offset(2)
166
self.store.set_accepted_types(["unaccepted"])
167
il = [m["data"] for m in self.store.get_pending_messages(20)]
168
self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
169
# Now, if the server asks us to go back and process
170
# previously accepted messages that are now unaccepted,
171
# they should be put on hold.
172
self.store.set_pending_offset(0)
173
il = [m["data"] for m in self.store.get_pending_messages(20)]
174
self.assertEquals(il, map(str, [1, 3, 5, 7, 9]))
175
# When the server starts accepting them again, these old
176
# messages will also be delivered.
177
self.store.set_accepted_types(["data", "unaccepted"])
178
il = [m["data"] for m in self.store.get_pending_messages(20)]
179
self.assertEquals(il, map(str, [1, 3, 5, 7, 9, 0, 2, 4, 6, 8]))
181
def test_wb_handle_broken_messages(self):
182
self.log_helper.ignore_errors(ValueError)
183
self.store.add({"type": "empty"})
184
self.store.add({"type": "empty2"})
186
filename = os.path.join(self.temp_dir, "0", "0")
187
self.assertTrue(os.path.isfile(filename))
189
file = open(filename, "w")
190
file.write("bpickle will break reading this")
193
self.assertEquals(self.store.get_pending_messages(), [])
195
# FIXME This is an unfortunate assertion because it relies on
196
# a message generated by external code. As it turns out, this
197
# message is different between Python 2.4 and 2.5. The
198
# snippet checked here is the largest common chunk between
199
# Python 2.4 and 2.5. It might be worth making the message
200
# store call an event handler when it encounters a broken
201
# message and hooking on that for this assertion instead of
202
# relying on this fragile check.
203
self.assertTrue("invalid literal for int()" in self.logfile.getvalue(),
204
self.logfile.getvalue())
207
self.logfile.truncate()
209
# Unholding will also load the message.
210
self.store.set_accepted_types([])
211
self.store.set_accepted_types(["empty", "empty2"])
213
self.assertTrue("invalid literal for int()" in self.logfile.getvalue(),
214
self.logfile.getvalue())
216
def test_wb_delete_messages_with_broken(self):
217
self.log_helper.ignore_errors(ValueError)
218
self.store.add({"type": "data", "data": "1"})
219
self.store.add({"type": "data", "data": "2"})
221
filename = os.path.join(self.temp_dir, "0", "0")
222
self.assertTrue(os.path.isfile(filename))
224
file = open(filename, "w")
225
file.write("bpickle will break reading this")
228
messages = self.store.get_pending_messages()
230
self.assertEquals(messages, [{"type": "data", "data": "2",
233
self.store.set_pending_offset(len(messages))
235
messages = self.store.get_pending_messages()
236
self.store.delete_old_messages()
237
self.assertEquals(messages, [])
238
self.assertTrue("ValueError" in self.logfile.getvalue())
240
def test_atomic_message_writing(self):
242
If the server gets unplugged halfway through writing a file,
243
the message should not be half-written.
245
store = MessageStore(self.persist, self.temp_dir,
246
get_time=self.get_time)
247
store.add_schema(Message("data", {"data": Int()}))
248
store.add({"type": "data", "data": 1})
249
# We simulate it by creating a fake file which raises halfway through
251
replaced_file_factory = self.mocker.replace("__builtin__.open",
253
replaced_file = replaced_file_factory(ANY, "w")
254
replaced_file.write(ANY)
255
self.mocker.throw(IOError("Sorry, pal!"))
257
# This kind of ensures that raising an exception is somewhat
258
# similar to unplugging the power -- i.e., we're not relying
259
# on special exception-handling in the file-writing code.
260
self.assertRaises(IOError, store.add, {"type": "data", "data": 2})
263
self.assertEquals(store.get_pending_messages(),
264
[{"type": "data", "data": 1,
267
def test_api_attribute(self):
268
self.assertEquals(self.store.api, API)
269
new_api = "New API version!"
270
self.store.api = new_api
271
self.assertEquals(self.store.api, new_api)
273
def test_default_api_on_messages(self):
274
self.store.add({"type": "empty"})
275
self.assertEquals(self.store.get_pending_messages(),
276
[{"type": "empty", "api": API}])
278
def test_custom_api_on_store(self):
279
self.store.api = "X.Y"
280
self.store.add({"type": "empty"})
281
self.assertEquals(self.store.get_pending_messages(),
282
[{"type": "empty", "api": "X.Y"}])
284
def test_custom_api_on_messages(self):
285
self.store.add({"type": "empty", "api": "X.Y"})
286
self.assertEquals(self.store.get_pending_messages(),
287
[{"type": "empty", "api": "X.Y"}])
289
def test_coercion(self):
291
When adding a message to the mesage store, it should be
292
coerced according to the message schema for the type of the
295
self.assertRaises(InvalidError,
296
self.store.add, {"type": "data", "data": 3})
299
def test_coercion_ignores_custom_api(self):
301
If a custom 'api' key is specified in the message, it should
302
not be considered during schema verification.
304
self.store.add({"type": "empty", "api": "whatever"})
306
def test_message_is_actually_coerced(self):
308
The message that eventually gets sent should be the result of
311
self.store.add_schema(
312
Message("data", {"data": UnicodeOrString("utf-8")}))
313
self.store.add({"type": "data",
314
"data": u"\N{HIRAGANA LETTER A}".encode("utf-8"),
316
self.assertEquals(self.store.get_pending_messages(),
317
[{"type": "data", "api": "whatever",
318
"data": u"\N{HIRAGANA LETTER A}"}])
320
def test_count_pending_messages(self):
321
"""It is possible to get the total number of pending messages."""
322
self.assertEquals(self.store.count_pending_messages(), 0)
323
self.store.add({"type": "empty"})
324
self.assertEquals(self.store.count_pending_messages(), 1)
325
self.store.add({"type": "data", "data": "yay"})
326
self.assertEquals(self.store.count_pending_messages(), 2)
328
def test_commit(self):
330
The Message Store can be told to save its persistent data to disk on
333
filename = self.make_path()
334
store = MessageStore(Persist(filename=filename), self.temp_dir)
335
store.set_accepted_types(["foo", "bar"])
337
self.assertFalse(os.path.exists(filename))
339
self.assertTrue(os.path.exists(filename))
341
store = MessageStore(Persist(filename=filename), self.temp_dir)
342
self.assertEquals(set(store.get_accepted_types()),
345
def test_is_pending_pre_and_post_message_delivery(self):
346
self.log_helper.ignore_errors(ValueError)
348
# We add a couple of messages held and broken, and also a few normal
349
# messages before and after, just to increase the chances of breaking
350
# due to picking the pending offset incorrectly.
351
self.store.set_accepted_types(["empty"])
353
# For the same reason we break the first message.
354
self.store.add({"type": "empty"})
356
filename = os.path.join(self.temp_dir, "0", "0")
357
self.assertTrue(os.path.isfile(filename))
359
file = open(filename, "w")
360
file.write("bpickle will break reading this")
363
# And hold the second one.
364
self.store.add({"type": "data", "data": "A thing"})
366
self.store.add({"type": "empty"})
367
self.store.add({"type": "empty"})
368
id = self.store.add({"type": "empty"})
369
self.store.add({"type": "empty"})
370
self.store.add({"type": "empty"})
372
# Broken messages will be processed here.
373
self.assertTrue(len(self.store.get_pending_messages()), 5)
375
self.assertTrue(self.store.is_pending(id))
376
self.store.add_pending_offset(2)
377
self.assertTrue(self.store.is_pending(id))
378
self.store.add_pending_offset(1)
379
self.assertFalse(self.store.is_pending(id))
381
def test_is_pending_with_held_message(self):
383
self.store.set_accepted_types(["empty"])
384
id = self.store.add({"type": "data", "data": "A thing"})
386
# Add another normal message and increment the pending offset
387
# to make the held message stay "behind" in the queue.
388
self.store.add({"type": "empty"})
389
self.store.add_pending_offset(1)
391
self.assertTrue(self.store.is_pending(id))
393
def test_is_pending_with_broken_message(self):
394
"""When a message breaks we consider it to be no longer there."""
396
self.log_helper.ignore_errors(ValueError)
398
id = self.store.add({"type": "empty"})
400
filename = os.path.join(self.temp_dir, "0", "0")
401
self.assertTrue(os.path.isfile(filename))
403
file = open(filename, "w")
404
file.write("bpickle will break reading this")
407
self.assertEquals(self.store.get_pending_messages(), [])
409
self.assertFalse(self.store.is_pending(id))