4
from landscape.lib.persist import (
5
path_string_to_tuple, path_tuple_to_string, Persist, RootedPersist,
6
PickleBackend, ConfigObjBackend, PersistError, PersistReadOnlyError)
7
from landscape.tests.helpers import LandscapeTest, MakePathHelper
10
class PersistHelpersTest(LandscapeTest):
14
("ab.cd", ("ab", "cd")),
15
("ab.cd.de", ("ab", "cd", "de")),
17
("ab[0][1]", ("ab", 0, 1)),
18
("ab.cd[1]", ("ab", "cd", 1)),
19
("ab[0].cd[1]", ("ab", 0, "cd", 1)),
20
("ab.cd.de[2]", ("ab", "cd", "de", 2)),
23
def test_path_string_to_tuple(self):
24
for path_string, path_tuple in self.paths:
25
self.assertEquals(path_string_to_tuple(path_string), path_tuple)
27
def test_path_string_to_tuple_error(self):
28
self.assertRaises(PersistError, path_string_to_tuple, "ab[0][c]")
30
def test_path_tuple_to_string(self):
31
for path_string, path_tuple in self.paths:
32
self.assertEquals(path_tuple_to_string(path_tuple), path_string)
35
class BasePersistTest(LandscapeTest):
43
("cd.ij.kl", (1, 2.3, "4", [5], (6,))),
44
("cd.ij.mn", [1, 2.3, "4", [5], (6,)]),
48
("qr", {"s": {"t": "u"}}),
59
"kl": (1, 2.3, "4", [5], (6,)),
60
"mn": [1, 2.3, "4", [5], (6,)],
64
"qr": {"s": {"t": "u"}},
65
"v": [0, {"v": "woot"}, 2],
72
("cd.ij.kl", (1, 2.3, "4", [5], (6,))),
74
("cd.ij.kl[3][0]", 5),
75
("cd.ij.mn", [1, 2.3, "4", [5], (6,)]),
78
("cd.ij.op", [0, 1, 2]),
82
("cd.ij.op[3]", None),
83
("qr", {"s": {"t": "u"}}),
102
"ab": [1, 2.3, "4", [5], (6,), {"cd": ["foo", "bar"]}],
106
LandscapeTest.setUp(self)
107
self.persist = self.build_persist()
111
LandscapeTest.tearDown(self)
113
def build_persist(self, *args, **kwargs):
114
return Persist(*args, **kwargs)
116
def format(self, result, expected):
117
repr_result = pprint.pformat(result)
118
repr_expected = pprint.pformat(expected)
119
return "\nResult:\n%s\nExpected:\n%s\n" % (repr_result, repr_expected)
122
class GeneralPersistTest(BasePersistTest):
125
for path, value in self.set_items:
126
self.persist.set(path, value)
127
result = self.persist.get((), hard=True)
128
self.assertEquals(result, self.set_result,
129
self.format(result, self.set_result))
131
def test_set_tuple_paths(self):
132
for path, value in self.set_items:
133
self.persist.set(path_string_to_tuple(path), value)
134
result = self.persist.get((), hard=True)
135
self.assertEquals(result, self.set_result,
136
self.format(result, self.set_result))
138
def test_set_from_result(self):
139
for path in self.set_result:
140
self.persist.set(path, self.set_result[path])
141
result = self.persist.get((), hard=True)
142
self.assertEquals(result, self.set_result,
143
self.format(result, self.set_result))
146
for path in self.set_result:
147
self.persist.set(path, self.set_result[path])
148
for path, value in self.get_items:
149
self.assertEquals(self.persist.get(path), value)
151
def test_get_tuple_paths(self):
152
for path in self.set_result:
153
self.persist.set(path_string_to_tuple(path), self.set_result[path])
154
for path, value in self.get_items:
155
self.assertEquals(self.persist.get(path), value)
158
for path, value in self.add_items:
159
self.persist.add(path, value)
160
result = self.persist.get((), hard=True)
161
self.assertEquals(result, self.add_result,
162
self.format(result, self.add_result))
164
def test_add_unique(self):
165
self.persist.add("a", "b")
166
self.assertEquals(self.persist.get("a"), ["b"])
167
self.persist.add("a", "b")
168
self.assertEquals(self.persist.get("a"), ["b", "b"])
169
self.persist.add("a", "b", unique=True)
170
self.assertEquals(self.persist.get("a"), ["b", "b"])
171
self.persist.add("a", "c", unique=True)
172
self.assertEquals(self.persist.get("a"), ["b", "b", "c"])
175
self.persist.set("a", {"b": 1, "c": {"d": 2}, "e": list("foo")})
176
keys = self.persist.keys
177
self.assertEquals(set(keys((), hard=True)), set(["a"]))
178
self.assertEquals(set(keys("a")), set(["b", "c", "e"]))
179
self.assertEquals(set(keys("a.d")), set([]))
180
self.assertEquals(set(keys("a.e")), set([0, 1, 2]))
181
self.assertEquals(set(keys("a.f")), set([]))
182
self.assertRaises(PersistError, keys, "a.b")
185
self.persist.set("a", {"b": 1, "c": {"d": 2}, "e": list("foo")})
186
has = self.persist.has
187
self.assertTrue(has("a"))
188
self.assertTrue(has(("a", "b")))
189
self.assertTrue(has("a.c"))
190
self.assertTrue(has("a.c", "d"))
191
self.assertTrue(has("a.c.d"))
192
self.assertTrue(has("a.e"))
193
self.assertTrue(has("a.e[0]"))
194
self.assertTrue(has("a.e", "f"))
195
self.assertTrue(has("a.e", "o"))
196
self.assertFalse(has("b"))
197
self.assertFalse(has("a.f"))
198
self.assertFalse(has("a.c.f"))
199
self.assertFalse(has("a.e[3]"))
200
self.assertFalse(has("a.e", "g"))
201
self.assertRaises(PersistError, has, "a.b.c")
203
def test_remove(self):
204
self.persist.set("a", {"b": [1], "c": {"d": 2}, "e": list("foot")})
205
get = self.persist.get
206
has = self.persist.has
207
remove = self.persist.remove
209
self.assertFalse(remove("a.f"))
211
self.assertRaises(PersistError, remove, "a.c.d.e")
213
self.assertTrue(remove(("a", "e", "o")))
214
self.assertEquals(get("a.e"), ["f", "t"])
216
self.assertFalse(remove("a.e[2]"))
217
self.assertEquals(get("a.e"), ["f", "t"])
219
self.assertTrue(remove("a.e[1]"))
220
self.assertEquals(get("a.e"), ["f"])
222
self.assertTrue(remove("a.e", "f"))
223
self.assertFalse(has("a.e"))
225
self.assertFalse(remove("a.b[1]"))
226
self.assertEquals(get("a.b"), [1])
228
self.assertTrue(remove("a.b", 1))
229
self.assertFalse(has("a.b"))
231
self.assertTrue(remove("a.c"))
232
self.assertFalse(has("a.c"))
234
self.assertFalse(has("a"))
237
self.persist.set("a", {"b": [1], "c": {"d": 2}})
239
move = self.persist.move
240
get = self.persist.get
242
self.assertTrue(move("a.b", "a.c.b"))
243
self.assertEquals(get("a"), {"c": {"b": [1], "d": 2}})
245
self.assertTrue(move("a.c.b[0]", "a.c.b"))
246
self.assertEquals(get("a"), {"c": {"b": 1, "d": 2}})
248
self.assertTrue(move(("a", "c", "b"), ("a", "c", "b", 0)))
249
self.assertEquals(get("a"), {"c": {"b": [1], "d": 2}})
251
def test_copy_values_on_set(self):
254
self.persist.set("a", d)
256
self.assertEquals(self.persist.get("a"), d_orig)
258
def test_copy_values_on_add(self):
261
self.persist.add("a", d)
263
self.assertEquals(self.persist.get("a[0]"), d_orig)
265
def test_copy_values_on_get(self):
266
self.persist.set("a", {"b": 1})
267
d = self.persist.get("a")
270
self.assertEquals(self.persist.get("a"), d_orig)
272
def test_root_at(self):
273
rooted = self.persist.root_at("my-module")
274
rooted.set("option", 1)
275
self.assertEquals(self.persist.get("my-module.option"), 1)
278
class SaveLoadPersistTest(BasePersistTest):
280
helpers = [MakePathHelper]
282
def test_readonly(self):
283
self.assertFalse(self.persist.readonly)
284
self.persist.readonly = True
285
self.assertTrue(self.persist.readonly)
286
self.persist.readonly = False
287
self.assertFalse(self.persist.readonly)
289
self.persist.readonly = True
290
self.assertRaises(PersistReadOnlyError, self.persist.set, "ab", 2)
291
self.assertRaises(PersistReadOnlyError, self.persist.add, "ab", 3)
292
self.assertRaises(PersistReadOnlyError, self.persist.remove, "ab", 4)
293
self.assertRaises(PersistReadOnlyError, self.persist.move, "ab", "cd")
295
for keyword in ["weak", "soft"]:
296
kwargs = {keyword: True}
297
self.persist.set("ab", 2, **kwargs)
298
self.persist.add("cd", 2, **kwargs)
299
self.persist.remove("ab", **kwargs)
300
self.persist.move("cd", "ef", **kwargs)
302
def test_assert_writable(self):
303
self.persist.assert_writable()
304
self.persist.set("ab", 1)
305
self.persist.readonly = True
306
self.assertRaises(PersistReadOnlyError, self.persist.assert_writable)
308
def test_modified(self):
309
self.assertFalse(self.persist.modified)
310
self.persist.set("ab", 1)
311
self.assertTrue(self.persist.modified)
312
self.persist.reset_modified()
313
self.assertFalse(self.persist.modified)
314
self.persist.add("cd", 2)
315
self.assertTrue(self.persist.modified)
316
self.persist.reset_modified()
317
self.assertFalse(self.persist.modified)
318
self.persist.remove("ab")
319
self.assertTrue(self.persist.modified)
320
self.persist.reset_modified()
321
self.assertFalse(self.persist.modified)
322
self.persist.move("cd", "ef")
323
self.assertTrue(self.persist.modified)
325
def test_save_and_load(self):
326
for path in self.set_result:
327
self.persist.set(path, self.set_result[path])
329
filename = self.make_path()
330
self.persist.save(filename)
332
persist = self.build_persist()
333
persist.load(filename)
335
result = persist.get((), hard=True)
336
self.assertEquals(result, self.set_result,
337
self.format(result, self.set_result))
339
def test_save_on_unexistent_dir(self):
340
dirname = self.make_path()
341
filename = os.path.join(dirname, "foobar")
343
self.assertFalse(os.path.exists(dirname))
344
self.persist.save(filename)
345
self.assertTrue(os.path.isfile(filename))
347
def test_save_creates_backup(self):
348
filename = self.make_path("foobar")
350
self.assertFalse(os.path.exists(filename+".old"))
351
self.persist.save(filename)
352
self.assertTrue(os.path.exists(filename+".old"))
354
def test_save_to_default_file(self):
356
Persist can be constructed with a filename, and Persist.save with no
357
arguments will write to that filename.
359
filename = self.make_path()
360
persist = self.build_persist(filename=filename)
361
self.assertFalse(os.path.exists(filename))
363
self.assertTrue(os.path.exists(filename))
365
def test_save_to_no_default_file(self):
367
If no default filename was given, calling Persist.save with no
368
arguments will raise a PersistError.
370
self.assertRaises(PersistError, self.persist.save)
372
def test_load_default_file(self):
374
If a Persist is created with a default filename, and the filename
375
exists, it will be loaded.
377
filename = self.make_path()
378
persist = self.build_persist(filename=filename)
379
persist.set("foo", "bar")
382
persist = self.build_persist(filename=filename)
383
self.assertEquals(persist.get("foo"), "bar")
385
def test_load_restores_backup(self):
386
filename = self.make_path("foobar")
388
self.persist.set("a", 1)
389
self.persist.save(filename+".old")
391
persist = self.build_persist()
392
persist.load(filename)
394
self.assertEquals(persist.get("a"), 1)
396
def test_load_empty_files_wont_break(self):
397
filename = self.make_path("")
398
self.persist.load(filename)
401
class PicklePersistTest(GeneralPersistTest, SaveLoadPersistTest):
403
def build_persist(self, *args, **kwargs):
404
return Persist(PickleBackend(), *args, **kwargs)
407
class ConfigObjPersistTest(GeneralPersistTest, SaveLoadPersistTest):
409
def build_persist(self, *args, **kwargs):
410
return Persist(ConfigObjBackend(), *args, **kwargs)
413
class RootedPersistTest(GeneralPersistTest):
415
def build_persist(self, *args, **kwargs):
416
return RootedPersist(Persist(), "root.path", *args, **kwargs)
418
def test_readonly(self):
419
self.assertFalse(self.persist.readonly)
420
self.assertRaises(AttributeError,
421
setattr, self.persist, "readonly", True)
422
self.persist.parent.readonly = True
423
self.assertTrue(self.persist.readonly)
425
def test_assert_writable(self):
426
self.persist.assert_writable()
427
self.persist.set("ab", 1)
428
self.persist.parent.readonly = True
429
self.assertRaises(PersistReadOnlyError, self.persist.assert_writable)
431
def test_modified(self):
432
self.assertFalse(self.persist.modified)
433
self.persist.set("ab", 1)
434
self.assertTrue(self.persist.modified)
435
self.persist.parent.reset_modified()
436
self.assertFalse(self.persist.modified)
437
self.persist.add("cd", 2)
438
self.assertTrue(self.persist.modified)
439
self.persist.parent.reset_modified()
440
self.assertFalse(self.persist.modified)
441
self.persist.remove("ab")
442
self.assertTrue(self.persist.modified)
443
self.persist.parent.reset_modified()
444
self.assertFalse(self.persist.modified)
445
self.persist.move("cd", "ef")
446
self.assertTrue(self.persist.modified)