~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.10.0

« back to all changes in this revision

Viewing changes to landscape/broker/tests/test_deployment.py

  • Committer: Bazaar Package Importer
  • Author(s): Free Ekanayaka
  • Date: 2010-06-28 18:07:18 UTC
  • mfrom: (1.2.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20100628180718-g9l55c9c5bnch03b
Tags: 1.5.2.1-0ubuntu0.9.10.0
Filter duplicate network interfaces in get_active_interfaces (LP: #597000)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import os
2
 
 
3
 
from landscape.lib.bpickle import dumps
4
 
from landscape.tests.mocker import ANY
5
 
 
6
 
from landscape.broker.deployment import BrokerConfiguration, BrokerService
7
 
from landscape.tests.helpers import (
8
 
    LandscapeTest, LandscapeIsolatedTest, FakeRemoteBrokerHelper,
9
 
    RemoteBrokerHelper, EnvironSaverHelper)
10
 
from landscape.reactor import FakeReactor
11
 
from landscape.broker.transport import FakeTransport
12
 
from landscape.lib.fetch import fetch_async
13
 
 
14
 
 
15
 
class DBusTestTest(LandscapeIsolatedTest):
16
 
 
17
 
    helpers = [RemoteBrokerHelper]
18
 
 
19
 
    def test_session_bus(self):
20
 
        """
21
 
        Deploying the broker should start a MessageExchange listener
22
 
        on the session bus.
23
 
        """
24
 
        service = self.broker_service
25
 
        self.assertTrue(service.bus.get_object(service.dbus_object.bus_name,
26
 
                                               service.dbus_object.object_path,
27
 
                                               introspect=False))
28
 
 
29
 
 
30
 
class DeploymentTest(LandscapeIsolatedTest):
31
 
    # Ideally most of these tests won't need to be isolated.  But since the
32
 
    # deployment.BrokerService listens on DBUS unconditionally during
33
 
    # startService(),
34
 
    # we need them to be for now.
35
 
 
36
 
    helpers = [FakeRemoteBrokerHelper]
37
 
 
38
 
    def test_pinger(self):
39
 
        """
40
 
        The BrokerDBusObject sets up an active pinger which will cause
41
 
        exchanges to occur.
42
 
        """
43
 
        patched_fetch = self.mocker.replace("landscape.lib.fetch.fetch")
44
 
 
45
 
        # The FakeRemoteBrokerHelper defines this URL in the configuration
46
 
        patched_fetch("http://localhost:91910/", post=True,
47
 
                      data="insecure_id=42", headers=ANY)
48
 
        self.mocker.result(dumps({"messages": True}))
49
 
        self.mocker.count(2)
50
 
 
51
 
        self.mocker.replay()
52
 
        self.broker_service.identity.insecure_id = 42
53
 
        self.broker_service.startService()
54
 
        # 30 is the default interval between pings, and 60 is the urgent
55
 
        # exchange interval.  If we wait 60 seconds, we should get 2
56
 
        # pings and one exchange.
57
 
        self.broker_service.reactor.advance(60)
58
 
        self.assertEquals(len(self.broker_service.transport.payloads), 1)
59
 
 
60
 
    def test_post_exit_event_will_stop_reactor(self):
61
 
        reactor_mock = self.mocker.replace("twisted.internet.reactor")
62
 
        reactor_mock.stop()
63
 
        self.mocker.replay()
64
 
 
65
 
        self.broker_service.reactor.fire("post-exit")
66
 
 
67
 
 
68
 
    def test_registration_instantiation(self):
69
 
        class MyBrokerConfiguration(BrokerConfiguration):
70
 
            default_config_filenames = [self.config_filename]
71
 
 
72
 
        config = MyBrokerConfiguration()
73
 
        config.load(["--bus", "session", "--data-path", self.data_path])
74
 
 
75
 
        class FakeBrokerService(BrokerService):
76
 
            """A broker which uses a fake reactor and fake transport."""
77
 
            reactor_factory = FakeReactor
78
 
            transport_factory = FakeTransport
79
 
 
80
 
        self.assertFalse(config.cloud)
81
 
        service = FakeBrokerService(config)
82
 
        self.assertFalse(service.registration._cloud)
83
 
        self.assertIdentical(service.registration._fetch_async, fetch_async)
84
 
 
85
 
        config.cloud = True
86
 
        service = FakeBrokerService(config)
87
 
        self.assertTrue(service.registration._cloud)
88
 
 
89
 
 
90
 
class ConfigurationTests(LandscapeTest):
91
 
    helpers = [EnvironSaverHelper]
92
 
 
93
 
    def test_loading_sets_http_proxies(self):
94
 
        if "http_proxy" in os.environ:
95
 
            del os.environ["http_proxy"]
96
 
        if "https_proxy" in os.environ:
97
 
            del os.environ["https_proxy"]
98
 
 
99
 
        configuration = BrokerConfiguration()
100
 
        configuration.load(["--http-proxy", "foo",
101
 
                            "--https-proxy", "bar",
102
 
                            "--url", "whatever"])
103
 
        self.assertEquals(os.environ["http_proxy"], "foo")
104
 
        self.assertEquals(os.environ["https_proxy"], "bar")
105
 
 
106
 
    def test_loading_without_http_proxies_does_not_touch_environment(self):
107
 
        os.environ["http_proxy"] = "heyo"
108
 
        os.environ["https_proxy"] = "baroo"
109
 
 
110
 
        configuration = BrokerConfiguration()
111
 
        configuration.load(["--url", "whatever"])
112
 
        self.assertEquals(os.environ["http_proxy"], "heyo")
113
 
        self.assertEquals(os.environ["https_proxy"], "baroo")
114
 
 
115
 
    def test_loading_resets_http_proxies(self):
116
 
        """
117
 
        User scenario:
118
 
 
119
 
        Runs landscape-config, fat-fingers a random character into the
120
 
        http_proxy field when he didn't mean to. runs it again, this time
121
 
        leaving it blank. The proxy should be reset to whatever
122
 
        environment-supplied proxy there was at startup.
123
 
        """
124
 
        os.environ["http_proxy"] = "original"
125
 
        os.environ["https_proxy"] = "originals"
126
 
 
127
 
        configuration = BrokerConfiguration()
128
 
        configuration.load(["--http-proxy", "x",
129
 
                            "--https-proxy", "y",
130
 
                            "--url", "whatever"])
131
 
        self.assertEquals(os.environ["http_proxy"], "x")
132
 
        self.assertEquals(os.environ["https_proxy"], "y")
133
 
 
134
 
        configuration.load(["--url", "whatever"])
135
 
        self.assertEquals(os.environ["http_proxy"], "original")
136
 
        self.assertEquals(os.environ["https_proxy"], "originals")
137
 
 
138
 
    def test_intervals_are_ints(self):
139
 
        filename = self.makeFile("[client]\n"
140
 
                                 "urgent_exchange_interval = 12\n"
141
 
                                 "exchange_interval = 34\n")
142
 
 
143
 
        configuration = BrokerConfiguration()
144
 
        configuration.load(["--config", filename, "--url", "whatever"])
145
 
 
146
 
        self.assertEquals(configuration.urgent_exchange_interval, 12)
147
 
        self.assertEquals(configuration.exchange_interval, 34)