~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/package/tests/test_facade.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import gdbm
 
2
import time
 
3
import os
 
4
 
 
5
from smart.control import Control
 
6
from smart.cache import Provides
 
7
 
 
8
import smart
 
9
 
 
10
from landscape.package.facade import (
 
11
    SmartFacade, TransactionError, DependencyError, SmartError)
 
12
 
 
13
from landscape.tests.helpers import LandscapeTest
 
14
from landscape.package.tests.helpers import (
 
15
    SmartFacadeHelper, HASH1, HASH2, HASH3, PKGNAME1)
 
16
 
 
17
 
 
18
class SmartFacadeTest(LandscapeTest):
 
19
 
 
20
    helpers = [SmartFacadeHelper]
 
21
 
 
22
    def test_get_packages(self):
 
23
        self.facade.reload_channels()
 
24
        pkgs = self.facade.get_packages()
 
25
        self.assertEquals(sorted(pkg.name for pkg in pkgs),
 
26
                          ["name1", "name2", "name3"])
 
27
 
 
28
    def test_get_packages_wont_return_non_debian_packages(self):
 
29
        self.facade.reload_channels()
 
30
        ctrl_mock = self.mocker.patch(Control)
 
31
        class StubPackage(object): pass
 
32
        cache_mock = ctrl_mock.getCache()
 
33
        cache_mock.getPackages()
 
34
        self.mocker.result([StubPackage(), StubPackage()])
 
35
        self.mocker.replay()
 
36
        self.assertEquals(self.facade.get_packages(), [])
 
37
 
 
38
    def test_get_packages_by_name(self):
 
39
        self.facade.reload_channels()
 
40
        pkgs = self.facade.get_packages_by_name("name1")
 
41
        self.assertEquals([pkg.name for pkg in pkgs], ["name1"])
 
42
        pkgs = self.facade.get_packages_by_name("name2")
 
43
        self.assertEquals([pkg.name for pkg in pkgs], ["name2"])
 
44
 
 
45
    def test_get_packages_by_name_wont_return_non_debian_packages(self):
 
46
        self.facade.reload_channels()
 
47
        ctrl_mock = self.mocker.patch(Control)
 
48
        class StubPackage(object): pass
 
49
        cache_mock = ctrl_mock.getCache()
 
50
        cache_mock.getPackages("name")
 
51
        self.mocker.result([StubPackage(), StubPackage()])
 
52
        self.mocker.replay()
 
53
        self.assertEquals(self.facade.get_packages_by_name("name"), [])
 
54
 
 
55
    def test_get_package_skeleton(self):
 
56
        self.facade.reload_channels()
 
57
        pkg1 = self.facade.get_packages_by_name("name1")[0]
 
58
        pkg2 = self.facade.get_packages_by_name("name2")[0]
 
59
        skeleton1 = self.facade.get_package_skeleton(pkg1)
 
60
        skeleton2 = self.facade.get_package_skeleton(pkg2)
 
61
        self.assertEquals(skeleton1.get_hash(), HASH1)
 
62
        self.assertEquals(skeleton2.get_hash(), HASH2)
 
63
 
 
64
    def test_build_skeleton_with_info(self):
 
65
        self.facade.reload_channels()
 
66
        pkg = self.facade.get_packages_by_name("name1")[0]
 
67
        skeleton = self.facade.get_package_skeleton(pkg, True)
 
68
        self.assertEquals(skeleton.section, "Group1")
 
69
        self.assertEquals(skeleton.summary, "Summary1")
 
70
        self.assertEquals(skeleton.description, "Description1")
 
71
        self.assertEquals(skeleton.size, 1038)
 
72
        self.assertEquals(skeleton.installed_size, 28672)
 
73
 
 
74
    def test_get_package_hash(self):
 
75
        self.facade.reload_channels()
 
76
        pkg = self.facade.get_packages_by_name("name1")[0]
 
77
        self.assertEquals(self.facade.get_package_hash(pkg), HASH1)
 
78
        pkg = self.facade.get_packages_by_name("name2")[0]
 
79
        self.assertEquals(self.facade.get_package_hash(pkg), HASH2)
 
80
 
 
81
    def test_get_package_by_hash(self):
 
82
        self.facade.reload_channels()
 
83
        pkg = self.facade.get_package_by_hash(HASH1)
 
84
        self.assertEquals(pkg.name, "name1")
 
85
        pkg = self.facade.get_package_by_hash(HASH2)
 
86
        self.assertEquals(pkg.name, "name2")
 
87
        pkg = self.facade.get_package_by_hash("none")
 
88
        self.assertEquals(pkg, None)
 
89
 
 
90
    def test_reload_channels_clears_hash_cache(self):
 
91
        # Load hashes.
 
92
        self.facade.reload_channels()
 
93
        start = time.time()
 
94
 
 
95
        # Hold a reference to packages.
 
96
        pkg1 = self.facade.get_packages_by_name("name1")[0]
 
97
        pkg2 = self.facade.get_packages_by_name("name2")[0]
 
98
        pkg3 = self.facade.get_packages_by_name("name3")[0]
 
99
        self.assertTrue(pkg1 and pkg2)
 
100
 
 
101
        # Remove the package from the repository.
 
102
        os.unlink(os.path.join(self.repository_dir, PKGNAME1))
 
103
 
 
104
        # Forcibly change the mtime of our repository, so that Smart
 
105
        # will consider it as changed (if the change is inside the
 
106
        # same second the directory's mtime will be the same)
 
107
        mtime = int(time.time()+1)
 
108
        os.utime(self.repository_dir, (mtime, mtime))
 
109
 
 
110
        # Reload channels.
 
111
        self.facade.reload_channels()
 
112
 
 
113
        # Only packages with name2 and name3 should be loaded, and they're
 
114
        # not the same objects anymore.
 
115
        self.assertEquals(
 
116
            sorted([pkg.name for pkg in self.facade.get_packages()]),
 
117
            ["name2", "name3"])
 
118
        self.assertNotEquals(set(self.facade.get_packages()),
 
119
                             set([pkg2, pkg3]))
 
120
 
 
121
        # The hash cache shouldn't include either of the old packages.
 
122
        self.assertEquals(self.facade.get_package_hash(pkg1), None)
 
123
        self.assertEquals(self.facade.get_package_hash(pkg2), None)
 
124
        self.assertEquals(self.facade.get_package_hash(pkg3), None)
 
125
 
 
126
        # Also, the hash for package1 shouldn't be present at all.
 
127
        self.assertEquals(self.facade.get_package_by_hash(HASH1), None)
 
128
 
 
129
        # While HASH2 and HASH3 should point to the new packages.
 
130
        new_pkgs = self.facade.get_packages()
 
131
        self.assertTrue(self.facade.get_package_by_hash(HASH2)
 
132
                        in new_pkgs)
 
133
        self.assertTrue(self.facade.get_package_by_hash(HASH3)
 
134
                        in new_pkgs)
 
135
 
 
136
        # Which are not the old packages.
 
137
        self.assertFalse(pkg2 in new_pkgs)
 
138
        self.assertFalse(pkg3 in new_pkgs)
 
139
 
 
140
    def test_perform_changes_with_nothing_to_do(self):
 
141
        """perform_changes() should return None when there's nothing to do.
 
142
        """
 
143
        self.facade.reload_channels()
 
144
        self.assertEquals(self.facade.perform_changes(), None)
 
145
 
 
146
    def test_reset_marks(self):
 
147
        """perform_changes() should return None when there's nothing to do.
 
148
        """
 
149
        self.facade.reload_channels()
 
150
        pkg = self.facade.get_packages_by_name("name1")[0]
 
151
        self.facade.mark_install(pkg)
 
152
        self.facade.reset_marks()
 
153
        self.assertEquals(self.facade.perform_changes(), None)
 
154
 
 
155
    def test_mark_install_transaction_error(self):
 
156
        """
 
157
        Mark package 'name1' for installation, and try to perform changes.
 
158
        It should fail because 'name1' depends on 'requirename1'.
 
159
        """
 
160
        self.facade.reload_channels()
 
161
 
 
162
        pkg = self.facade.get_packages_by_name("name1")[0]
 
163
        self.facade.mark_install(pkg)
 
164
        try:
 
165
            self.facade.perform_changes()
 
166
        except TransactionError, exception:
 
167
            pass
 
168
        else:
 
169
            exception = None
 
170
        self.assertTrue(exception, "TransactionError not raised")
 
171
        self.assertIn("requirename", exception.args[0])
 
172
 
 
173
    def test_mark_install_dependency_error(self):
 
174
        """
 
175
        Now we artificially inject the needed dependencies of 'name1'
 
176
        in 'name2', but we don't mark 'name2' for installation, and
 
177
        that should make perform_changes() fail with a dependency
 
178
        error on the needed package.
 
179
        """
 
180
        self.facade.reload_channels()
 
181
 
 
182
        provide1 = Provides("prerequirename1", "prerequireversion1")
 
183
        provide2 = Provides("requirename1", "requireversion1")
 
184
        pkg2 = self.facade.get_packages_by_name("name2")[0]
 
185
        pkg2.provides += (provide1, provide2)
 
186
 
 
187
        # We have to satisfy *both* packages.
 
188
        provide1 = Provides("prerequirename2", "prerequireversion2")
 
189
        provide2 = Provides("requirename2", "requireversion2")
 
190
        pkg1 = self.facade.get_packages_by_name("name1")[0]
 
191
        pkg1.provides += (provide1, provide2)
 
192
 
 
193
        # Ask Smart to reprocess relationships.
 
194
        self.facade.reload_cache()
 
195
 
 
196
        self.assertEquals(pkg1.requires[0].providedby[0].packages[0], pkg2)
 
197
        self.assertEquals(pkg1.requires[1].providedby[0].packages[0], pkg2)
 
198
 
 
199
        self.facade.mark_install(pkg1)
 
200
        try:
 
201
            self.facade.perform_changes()
 
202
        except DependencyError, exception:
 
203
            pass
 
204
        else:
 
205
            exception = None
 
206
        self.assertTrue(exception, "DependencyError not raised")
 
207
        self.assertEquals(exception.packages, [pkg2])
 
208
 
 
209
    def test_mark_remove_dependency_error(self):
 
210
        """
 
211
        Besides making 'name1' satisfy 'name2' and the contrary.  We'll
 
212
        mark both packages installed, so that we can get an error on
 
213
        removal.
 
214
        """
 
215
        self.facade.reload_channels()
 
216
 
 
217
        provide1 = Provides("prerequirename1", "prerequireversion1")
 
218
        provide2 = Provides("requirename1", "requireversion1")
 
219
        pkg2 = self.facade.get_packages_by_name("name2")[0]
 
220
        pkg2.provides += (provide1, provide2)
 
221
 
 
222
        # We have to satisfy *both* packages.
 
223
        provide1 = Provides("prerequirename2", "prerequireversion2")
 
224
        provide2 = Provides("requirename2", "requireversion2")
 
225
        pkg1 = self.facade.get_packages_by_name("name1")[0]
 
226
        pkg1.provides += (provide1, provide2)
 
227
 
 
228
        # Ask Smart to reprocess relationships.
 
229
        self.facade.reload_cache()
 
230
 
 
231
        pkg1.installed = True
 
232
        pkg2.installed = True
 
233
 
 
234
        self.assertEquals(pkg1.requires[0].providedby[0].packages[0], pkg2)
 
235
        self.assertEquals(pkg1.requires[1].providedby[0].packages[0], pkg2)
 
236
 
 
237
        self.facade.mark_remove(pkg2)
 
238
        try:
 
239
            output = self.facade.perform_changes()
 
240
        except DependencyError, exception:
 
241
            output = ""
 
242
        else:
 
243
            exception = None
 
244
        self.assertTrue(exception, "DependencyError not raised. Output: %s"
 
245
                                   % repr(output))
 
246
        self.assertEquals(exception.packages, [pkg1])
 
247
 
 
248
    def test_mark_upgrade_dependency_error(self):
 
249
        """Artificially make pkg2 upgrade pkg1, and mark pkg1 for upgrade."""
 
250
 
 
251
        # The backend only works after initialized.
 
252
        from smart.backends.deb.base import DebUpgrades, DebConflicts
 
253
 
 
254
        self.facade.reload_channels()
 
255
 
 
256
        pkg1 = self.facade.get_packages_by_name("name1")[0]
 
257
        pkg2 = self.facade.get_packages_by_name("name2")[0]
 
258
 
 
259
        # Artificially make pkg2 be self-satisfied, and make it upgrade and
 
260
        # conflict with pkg1.
 
261
        pkg2.requires = []
 
262
        pkg2.upgrades = [DebUpgrades("name1", "=", "version1-release1")]
 
263
        pkg2.conflicts = [DebConflicts("name1", "=", "version1-release1")]
 
264
 
 
265
        # pkg1 will also be self-satisfied.
 
266
        pkg1.requires = []
 
267
 
 
268
        # Ask Smart to reprocess relationships.
 
269
        self.facade.reload_cache()
 
270
 
 
271
        # Mark the pkg1 as installed.  Must be done after reloading
 
272
        # the cache as reloading will reset it to the loader installed
 
273
        # status.
 
274
        pkg1.installed = True
 
275
 
 
276
        # Check that the linkage worked.
 
277
        self.assertEquals(pkg2.upgrades[0].providedby[0].packages[0], pkg1)
 
278
 
 
279
        # Perform the upgrade test.
 
280
        self.facade.mark_upgrade(pkg1)
 
281
        try:
 
282
            self.facade.perform_changes()
 
283
        except DependencyError, exception:
 
284
            pass
 
285
        else:
 
286
            exception = None
 
287
        self.assertTrue(exception, "DependencyError not raised")
 
288
 
 
289
        # Both packages should be included in the dependency error. One
 
290
        # must be removed, and the other installed.
 
291
        self.assertEquals(set(exception.packages), set([pkg1, pkg2]))
 
292
 
 
293
    def test_perform_changes_with_logged_error(self):
 
294
        self.log_helper.ignore_errors(".*dpkg")
 
295
 
 
296
        self.facade.reload_channels()
 
297
 
 
298
        pkg = self.facade.get_packages_by_name("name1")[0]
 
299
        pkg.requires = ()
 
300
 
 
301
        self.facade.reload_cache()
 
302
 
 
303
        self.facade.mark_install(pkg)
 
304
 
 
305
        try:
 
306
            output = self.facade.perform_changes()
 
307
        except SmartError, exception:
 
308
            output = ""
 
309
        else:
 
310
            exception = None
 
311
 
 
312
        self.assertTrue(exception,
 
313
                        "SmartError not raised. Output: %s" % repr(output))
 
314
        # We can't check the whole message because the dpkg error can be
 
315
        # localized. We can't use str(exception) either because it can contain
 
316
        # unicode
 
317
        self.assertIn("ERROR", exception.args[0])
 
318
        self.assertIn("(2)", exception.args[0])
 
319
        self.assertIn("\n[unpack] name1_version1-release1\ndpkg: ",
 
320
                      exception.args[0])
 
321
 
 
322
    def test_perform_changes_is_non_interactive(self):
 
323
        from smart.backends.deb.pm import DebPackageManager
 
324
 
 
325
        self.facade.reload_channels()
 
326
 
 
327
        pkg = self.facade.get_packages_by_name("name1")[0]
 
328
        pkg.requires = ()
 
329
 
 
330
        self.facade.reload_cache()
 
331
 
 
332
        self.facade.mark_install(pkg)
 
333
 
 
334
        environ = []
 
335
        def check_environ(self, argv, output):
 
336
            environ.append(os.environ.get("DEBIAN_FRONTEND"))
 
337
            environ.append(os.environ.get("APT_LISTCHANGES_FRONTEND"))
 
338
            return 0
 
339
 
 
340
        DebPackageManager.dpkg, olddpkg = check_environ, DebPackageManager.dpkg
 
341
 
 
342
        try:
 
343
            self.facade.perform_changes()
 
344
        finally:
 
345
            DebPackageManager.dpkg = olddpkg
 
346
 
 
347
        self.assertEquals(environ, ["noninteractive", "none",
 
348
                                    "noninteractive", "none"])
 
349
 
 
350
    def test_deinit_cleans_the_state(self):
 
351
        self.facade.reload_channels()
 
352
        self.assertTrue(self.facade.get_package_by_hash(HASH1))
 
353
        self.facade.deinit()
 
354
        self.assertFalse(self.facade.get_package_by_hash(HASH1))
 
355
 
 
356
    def test_deinit_deinits_smart(self):
 
357
        self.facade.reload_channels()
 
358
        self.assertTrue(smart.iface.object)
 
359
        self.facade.deinit()
 
360
        self.assertFalse(smart.iface.object)
 
361
 
 
362
    def test_deinit_when_smart_wasnt_initialized(self):
 
363
        self.assertFalse(smart.iface.object)
 
364
        # Nothing bad should happen.
 
365
        self.facade.deinit()
 
366
 
 
367
    def test_reload_channels_wont_consider_non_debian_packages(self):
 
368
        class StubPackage(object): pass
 
369
        pkg = StubPackage()
 
370
 
 
371
        ctrl_mock = self.mocker.patch(Control)
 
372
        cache_mock = ctrl_mock.getCache()
 
373
        cache_mock.getPackages()
 
374
        self.mocker.result([pkg])
 
375
        self.mocker.replay()
 
376
 
 
377
        self.facade.reload_channels()
 
378
        self.assertEquals(self.facade.get_package_hash(pkg), None)