~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/lib/tests/test_persist.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import pprint
 
2
import os
 
3
 
 
4
from landscape.lib.persist import (
 
5
    path_string_to_tuple, path_tuple_to_string, Persist, RootedPersist,
 
6
    PickleBackend, ConfigObjBackend, PersistError, PersistReadOnlyError)
 
7
from landscape.tests.helpers import LandscapeTest, MakePathHelper
 
8
 
 
9
 
 
10
class PersistHelpersTest(LandscapeTest):
 
11
 
 
12
    paths = [
 
13
        ("ab", ("ab",)),
 
14
        ("ab.cd", ("ab", "cd")),
 
15
        ("ab.cd.de", ("ab", "cd", "de")),
 
16
        ("ab[0]", ("ab", 0)),
 
17
        ("ab[0][1]", ("ab", 0, 1)),
 
18
        ("ab.cd[1]", ("ab", "cd", 1)),
 
19
        ("ab[0].cd[1]", ("ab", 0, "cd", 1)),
 
20
        ("ab.cd.de[2]", ("ab", "cd", "de", 2)),
 
21
        ]
 
22
 
 
23
    def test_path_string_to_tuple(self):
 
24
        for path_string, path_tuple in self.paths:
 
25
            self.assertEquals(path_string_to_tuple(path_string), path_tuple)
 
26
 
 
27
    def test_path_string_to_tuple_error(self):
 
28
        self.assertRaises(PersistError, path_string_to_tuple, "ab[0][c]")
 
29
 
 
30
    def test_path_tuple_to_string(self):
 
31
        for path_string, path_tuple in self.paths:
 
32
            self.assertEquals(path_tuple_to_string(path_tuple), path_string)
 
33
 
 
34
 
 
35
class BasePersistTest(LandscapeTest):
 
36
 
 
37
    set_items = [
 
38
        ("ab", 1),
 
39
        ("ab", 2),
 
40
        ("cd.ef", 3.4),
 
41
        ("cd.gh", "4"),
 
42
        ("cd.gh", "5"),
 
43
        ("cd.ij.kl", (1, 2.3, "4", [5], (6,))),
 
44
        ("cd.ij.mn", [1, 2.3, "4", [5], (6,)]),
 
45
        ("cd.ij.op[1]", 0),
 
46
        ("cd.ij.op[1]", 1),
 
47
        ("cd.ij.op[2]", 2),
 
48
        ("qr", {"s": {"t": "u"}}),
 
49
        ("v", [0, {}, 2]),
 
50
        ("v[1].v", "woot"),
 
51
        ]
 
52
 
 
53
    set_result = {
 
54
                  "ab": 2,
 
55
                  "cd": {
 
56
                         "ef": 3.4,
 
57
                         "gh": "5",
 
58
                         "ij": {
 
59
                                "kl": (1, 2.3, "4", [5], (6,)),
 
60
                                "mn": [1, 2.3, "4", [5], (6,)],
 
61
                                "op": [0, 1, 2]
 
62
                               },
 
63
                         },
 
64
                   "qr": {"s": {"t": "u"}},
 
65
                   "v": [0, {"v": "woot"}, 2],
 
66
                 }
 
67
 
 
68
    get_items = [
 
69
                 ("ab", 2),
 
70
                 ("cd.ef", 3.4),
 
71
                 ("cd.gh", "5"),
 
72
                 ("cd.ij.kl", (1, 2.3, "4", [5], (6,))),
 
73
                 ("cd.ij.kl[3]", [5]),
 
74
                 ("cd.ij.kl[3][0]", 5),
 
75
                 ("cd.ij.mn", [1, 2.3, "4", [5], (6,)]),
 
76
                 ("cd.ij.mn.4", "4"),
 
77
                 ("cd.ij.mn.5", None),
 
78
                 ("cd.ij.op", [0, 1, 2]),
 
79
                 ("cd.ij.op[0]", 0),
 
80
                 ("cd.ij.op[1]", 1),
 
81
                 ("cd.ij.op[2]", 2),
 
82
                 ("cd.ij.op[3]", None),
 
83
                 ("qr", {"s": {"t": "u"}}),
 
84
                 ("qr.s", {"t": "u"}),
 
85
                 ("qr.s.t", "u"),
 
86
                 ("x", None),
 
87
                 ("x.y.z", None),
 
88
                ]
 
89
 
 
90
    add_items = [
 
91
                 ("ab", 1),
 
92
                 ("ab", 2.3),
 
93
                 ("ab", "4"),
 
94
                 ("ab", [5]),
 
95
                 ("ab", (6,)),
 
96
                 ("ab", {}),
 
97
                 ("ab[5].cd", "foo"),
 
98
                 ("ab[5].cd", "bar"),
 
99
                ]
 
100
 
 
101
    add_result = {
 
102
                  "ab": [1, 2.3, "4", [5], (6,), {"cd": ["foo", "bar"]}],
 
103
                 }
 
104
 
 
105
    def setUp(self):
 
106
        LandscapeTest.setUp(self)
 
107
        self.persist = self.build_persist()
 
108
 
 
109
    def tearDown(self):
 
110
        del self.persist
 
111
        LandscapeTest.tearDown(self)
 
112
 
 
113
    def build_persist(self, *args, **kwargs):
 
114
        return Persist(*args, **kwargs)
 
115
 
 
116
    def format(self, result, expected):
 
117
        repr_result = pprint.pformat(result)
 
118
        repr_expected = pprint.pformat(expected)
 
119
        return "\nResult:\n%s\nExpected:\n%s\n" % (repr_result, repr_expected)
 
120
 
 
121
 
 
122
class GeneralPersistTest(BasePersistTest):
 
123
 
 
124
    def test_set(self):
 
125
        for path, value in self.set_items:
 
126
            self.persist.set(path, value)
 
127
        result = self.persist.get((), hard=True)
 
128
        self.assertEquals(result, self.set_result,
 
129
                          self.format(result, self.set_result))
 
130
 
 
131
    def test_set_tuple_paths(self):
 
132
        for path, value in self.set_items:
 
133
            self.persist.set(path_string_to_tuple(path), value)
 
134
        result = self.persist.get((), hard=True)
 
135
        self.assertEquals(result, self.set_result,
 
136
                          self.format(result, self.set_result))
 
137
 
 
138
    def test_set_from_result(self):
 
139
        for path in self.set_result:
 
140
            self.persist.set(path, self.set_result[path])
 
141
        result = self.persist.get((), hard=True)
 
142
        self.assertEquals(result, self.set_result,
 
143
                          self.format(result, self.set_result))
 
144
 
 
145
    def test_get(self):
 
146
        for path in self.set_result:
 
147
            self.persist.set(path, self.set_result[path])
 
148
        for path, value in self.get_items:
 
149
            self.assertEquals(self.persist.get(path), value)
 
150
 
 
151
    def test_get_tuple_paths(self):
 
152
        for path in self.set_result:
 
153
            self.persist.set(path_string_to_tuple(path), self.set_result[path])
 
154
        for path, value in self.get_items:
 
155
            self.assertEquals(self.persist.get(path), value)
 
156
 
 
157
    def test_add(self):
 
158
        for path, value in self.add_items:
 
159
            self.persist.add(path, value)
 
160
        result = self.persist.get((), hard=True)
 
161
        self.assertEquals(result, self.add_result,
 
162
                          self.format(result, self.add_result))
 
163
 
 
164
    def test_add_unique(self):
 
165
        self.persist.add("a", "b")
 
166
        self.assertEquals(self.persist.get("a"), ["b"])
 
167
        self.persist.add("a", "b")
 
168
        self.assertEquals(self.persist.get("a"), ["b", "b"])
 
169
        self.persist.add("a", "b", unique=True)
 
170
        self.assertEquals(self.persist.get("a"), ["b", "b"])
 
171
        self.persist.add("a", "c", unique=True)
 
172
        self.assertEquals(self.persist.get("a"), ["b", "b", "c"])
 
173
 
 
174
    def test_keys(self):
 
175
        self.persist.set("a", {"b": 1, "c": {"d": 2}, "e": list("foo")})
 
176
        keys = self.persist.keys
 
177
        self.assertEquals(set(keys((), hard=True)), set(["a"]))
 
178
        self.assertEquals(set(keys("a")), set(["b", "c", "e"]))
 
179
        self.assertEquals(set(keys("a.d")), set([]))
 
180
        self.assertEquals(set(keys("a.e")), set([0, 1, 2]))
 
181
        self.assertEquals(set(keys("a.f")), set([]))
 
182
        self.assertRaises(PersistError, keys, "a.b")
 
183
 
 
184
    def test_has(self):
 
185
        self.persist.set("a", {"b": 1, "c": {"d": 2}, "e": list("foo")})
 
186
        has = self.persist.has
 
187
        self.assertTrue(has("a"))
 
188
        self.assertTrue(has(("a", "b")))
 
189
        self.assertTrue(has("a.c"))
 
190
        self.assertTrue(has("a.c", "d"))
 
191
        self.assertTrue(has("a.c.d"))
 
192
        self.assertTrue(has("a.e"))
 
193
        self.assertTrue(has("a.e[0]"))
 
194
        self.assertTrue(has("a.e", "f"))
 
195
        self.assertTrue(has("a.e", "o"))
 
196
        self.assertFalse(has("b"))
 
197
        self.assertFalse(has("a.f"))
 
198
        self.assertFalse(has("a.c.f"))
 
199
        self.assertFalse(has("a.e[3]"))
 
200
        self.assertFalse(has("a.e", "g"))
 
201
        self.assertRaises(PersistError, has, "a.b.c")
 
202
 
 
203
    def test_remove(self):
 
204
        self.persist.set("a", {"b": [1], "c": {"d": 2}, "e": list("foot")})
 
205
        get = self.persist.get
 
206
        has = self.persist.has
 
207
        remove = self.persist.remove
 
208
 
 
209
        self.assertFalse(remove("a.f"))
 
210
 
 
211
        self.assertRaises(PersistError, remove, "a.c.d.e")
 
212
 
 
213
        self.assertTrue(remove(("a", "e", "o")))
 
214
        self.assertEquals(get("a.e"), ["f", "t"])
 
215
 
 
216
        self.assertFalse(remove("a.e[2]"))
 
217
        self.assertEquals(get("a.e"), ["f", "t"])
 
218
 
 
219
        self.assertTrue(remove("a.e[1]"))
 
220
        self.assertEquals(get("a.e"), ["f"])
 
221
 
 
222
        self.assertTrue(remove("a.e", "f"))
 
223
        self.assertFalse(has("a.e"))
 
224
 
 
225
        self.assertFalse(remove("a.b[1]"))
 
226
        self.assertEquals(get("a.b"), [1])
 
227
 
 
228
        self.assertTrue(remove("a.b", 1))
 
229
        self.assertFalse(has("a.b"))
 
230
 
 
231
        self.assertTrue(remove("a.c"))
 
232
        self.assertFalse(has("a.c"))
 
233
 
 
234
        self.assertFalse(has("a"))
 
235
 
 
236
    def test_move(self):
 
237
        self.persist.set("a", {"b": [1], "c": {"d": 2}})
 
238
 
 
239
        move = self.persist.move
 
240
        get = self.persist.get
 
241
 
 
242
        self.assertTrue(move("a.b", "a.c.b"))
 
243
        self.assertEquals(get("a"), {"c": {"b": [1], "d": 2}})
 
244
 
 
245
        self.assertTrue(move("a.c.b[0]", "a.c.b"))
 
246
        self.assertEquals(get("a"), {"c": {"b": 1, "d": 2}})
 
247
 
 
248
        self.assertTrue(move(("a", "c", "b"), ("a", "c", "b", 0)))
 
249
        self.assertEquals(get("a"), {"c": {"b": [1], "d": 2}})
 
250
 
 
251
    def test_copy_values_on_set(self):
 
252
        d = {"b": 1}
 
253
        d_orig = d.copy()
 
254
        self.persist.set("a", d)
 
255
        d["c"] = 2
 
256
        self.assertEquals(self.persist.get("a"), d_orig)
 
257
 
 
258
    def test_copy_values_on_add(self):
 
259
        d = {"b": 1}
 
260
        d_orig = d.copy()
 
261
        self.persist.add("a", d)
 
262
        d["c"] = 2
 
263
        self.assertEquals(self.persist.get("a[0]"), d_orig)
 
264
 
 
265
    def test_copy_values_on_get(self):
 
266
        self.persist.set("a", {"b": 1})
 
267
        d = self.persist.get("a")
 
268
        d_orig = d.copy()
 
269
        d["c"] = 2
 
270
        self.assertEquals(self.persist.get("a"), d_orig)
 
271
 
 
272
    def test_root_at(self):
 
273
        rooted = self.persist.root_at("my-module")
 
274
        rooted.set("option", 1)
 
275
        self.assertEquals(self.persist.get("my-module.option"), 1)
 
276
 
 
277
 
 
278
class SaveLoadPersistTest(BasePersistTest):
 
279
 
 
280
    helpers = [MakePathHelper]
 
281
 
 
282
    def test_readonly(self):
 
283
        self.assertFalse(self.persist.readonly)
 
284
        self.persist.readonly = True
 
285
        self.assertTrue(self.persist.readonly)
 
286
        self.persist.readonly = False
 
287
        self.assertFalse(self.persist.readonly)
 
288
 
 
289
        self.persist.readonly = True
 
290
        self.assertRaises(PersistReadOnlyError, self.persist.set, "ab", 2)
 
291
        self.assertRaises(PersistReadOnlyError, self.persist.add, "ab", 3)
 
292
        self.assertRaises(PersistReadOnlyError, self.persist.remove, "ab", 4)
 
293
        self.assertRaises(PersistReadOnlyError, self.persist.move, "ab", "cd")
 
294
 
 
295
        for keyword in ["weak", "soft"]:
 
296
            kwargs = {keyword: True}
 
297
            self.persist.set("ab", 2, **kwargs)
 
298
            self.persist.add("cd", 2, **kwargs)
 
299
            self.persist.remove("ab", **kwargs)
 
300
            self.persist.move("cd", "ef", **kwargs)
 
301
 
 
302
    def test_assert_writable(self):
 
303
        self.persist.assert_writable()
 
304
        self.persist.set("ab", 1)
 
305
        self.persist.readonly = True
 
306
        self.assertRaises(PersistReadOnlyError, self.persist.assert_writable)
 
307
 
 
308
    def test_modified(self):
 
309
        self.assertFalse(self.persist.modified)
 
310
        self.persist.set("ab", 1)
 
311
        self.assertTrue(self.persist.modified)
 
312
        self.persist.reset_modified()
 
313
        self.assertFalse(self.persist.modified)
 
314
        self.persist.add("cd", 2)
 
315
        self.assertTrue(self.persist.modified)
 
316
        self.persist.reset_modified()
 
317
        self.assertFalse(self.persist.modified)
 
318
        self.persist.remove("ab")
 
319
        self.assertTrue(self.persist.modified)
 
320
        self.persist.reset_modified()
 
321
        self.assertFalse(self.persist.modified)
 
322
        self.persist.move("cd", "ef")
 
323
        self.assertTrue(self.persist.modified)
 
324
 
 
325
    def test_save_and_load(self):
 
326
        for path in self.set_result:
 
327
            self.persist.set(path, self.set_result[path])
 
328
 
 
329
        filename = self.make_path()
 
330
        self.persist.save(filename)
 
331
 
 
332
        persist = self.build_persist()
 
333
        persist.load(filename)
 
334
 
 
335
        result = persist.get((), hard=True)
 
336
        self.assertEquals(result, self.set_result,
 
337
                          self.format(result, self.set_result))
 
338
 
 
339
    def test_save_on_unexistent_dir(self):
 
340
        dirname = self.make_path()
 
341
        filename = os.path.join(dirname, "foobar")
 
342
 
 
343
        self.assertFalse(os.path.exists(dirname))
 
344
        self.persist.save(filename)
 
345
        self.assertTrue(os.path.isfile(filename))
 
346
 
 
347
    def test_save_creates_backup(self):
 
348
        filename = self.make_path("foobar")
 
349
 
 
350
        self.assertFalse(os.path.exists(filename+".old"))
 
351
        self.persist.save(filename)
 
352
        self.assertTrue(os.path.exists(filename+".old"))
 
353
 
 
354
    def test_save_to_default_file(self):
 
355
        """
 
356
        Persist can be constructed with a filename, and Persist.save with no
 
357
        arguments will write to that filename.
 
358
        """
 
359
        filename = self.make_path()
 
360
        persist = self.build_persist(filename=filename)
 
361
        self.assertFalse(os.path.exists(filename))
 
362
        persist.save()
 
363
        self.assertTrue(os.path.exists(filename))
 
364
 
 
365
    def test_save_to_no_default_file(self):
 
366
        """
 
367
        If no default filename was given, calling Persist.save with no
 
368
        arguments will raise a PersistError.
 
369
        """
 
370
        self.assertRaises(PersistError, self.persist.save)
 
371
 
 
372
    def test_load_default_file(self):
 
373
        """
 
374
        If a Persist is created with a default filename, and the filename
 
375
        exists, it will be loaded.
 
376
        """
 
377
        filename = self.make_path()
 
378
        persist = self.build_persist(filename=filename)
 
379
        persist.set("foo", "bar")
 
380
        persist.save()
 
381
 
 
382
        persist = self.build_persist(filename=filename)
 
383
        self.assertEquals(persist.get("foo"), "bar")
 
384
 
 
385
    def test_load_restores_backup(self):
 
386
        filename = self.make_path("foobar")
 
387
 
 
388
        self.persist.set("a", 1)
 
389
        self.persist.save(filename+".old")
 
390
 
 
391
        persist = self.build_persist()
 
392
        persist.load(filename)
 
393
 
 
394
        self.assertEquals(persist.get("a"), 1)
 
395
 
 
396
    def test_load_empty_files_wont_break(self):
 
397
        filename = self.make_path("")
 
398
        self.persist.load(filename)
 
399
 
 
400
 
 
401
class PicklePersistTest(GeneralPersistTest, SaveLoadPersistTest):
 
402
 
 
403
    def build_persist(self, *args, **kwargs):
 
404
        return Persist(PickleBackend(), *args, **kwargs)
 
405
 
 
406
 
 
407
class ConfigObjPersistTest(GeneralPersistTest, SaveLoadPersistTest):
 
408
 
 
409
    def build_persist(self, *args, **kwargs):
 
410
        return Persist(ConfigObjBackend(), *args, **kwargs)
 
411
 
 
412
 
 
413
class RootedPersistTest(GeneralPersistTest):
 
414
 
 
415
    def build_persist(self, *args, **kwargs):
 
416
        return RootedPersist(Persist(), "root.path", *args, **kwargs)
 
417
 
 
418
    def test_readonly(self):
 
419
        self.assertFalse(self.persist.readonly)
 
420
        self.assertRaises(AttributeError,
 
421
                          setattr, self.persist, "readonly", True)
 
422
        self.persist.parent.readonly = True
 
423
        self.assertTrue(self.persist.readonly)
 
424
 
 
425
    def test_assert_writable(self):
 
426
        self.persist.assert_writable()
 
427
        self.persist.set("ab", 1)
 
428
        self.persist.parent.readonly = True
 
429
        self.assertRaises(PersistReadOnlyError, self.persist.assert_writable)
 
430
 
 
431
    def test_modified(self):
 
432
        self.assertFalse(self.persist.modified)
 
433
        self.persist.set("ab", 1)
 
434
        self.assertTrue(self.persist.modified)
 
435
        self.persist.parent.reset_modified()
 
436
        self.assertFalse(self.persist.modified)
 
437
        self.persist.add("cd", 2)
 
438
        self.assertTrue(self.persist.modified)
 
439
        self.persist.parent.reset_modified()
 
440
        self.assertFalse(self.persist.modified)
 
441
        self.persist.remove("ab")
 
442
        self.assertTrue(self.persist.modified)
 
443
        self.persist.parent.reset_modified()
 
444
        self.assertFalse(self.persist.modified)
 
445
        self.persist.move("cd", "ef")
 
446
        self.assertTrue(self.persist.modified)