~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/tests/test_deployment.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
 
 
3
from landscape.lib.bpickle import dumps
 
4
from landscape.lib.fetch import fetch
 
5
from landscape.lib.persist import Persist
 
6
from landscape.tests.mocker import ANY
 
7
 
 
8
from landscape.broker.broker import BUS_NAME
 
9
from landscape.broker.deployment import BrokerConfiguration
 
10
from landscape.tests.helpers import (
 
11
    LandscapeTest, LandscapeIsolatedTest, FakeRemoteBrokerHelper,
 
12
    RemoteBrokerHelper, EnvironSaverHelper)
 
13
 
 
14
 
 
15
class DBusTestTest(LandscapeIsolatedTest):
 
16
 
 
17
    helpers = [RemoteBrokerHelper]
 
18
 
 
19
    def test_session_bus(self):
 
20
        """
 
21
        Deploying the broker should start a MessageExchange listener
 
22
        on the session bus.
 
23
        """
 
24
        service = self.broker_service
 
25
        self.assertTrue(service.bus.get_object(service.dbus_object.bus_name,
 
26
                                               service.dbus_object.object_path,
 
27
                                               introspect=False))
 
28
 
 
29
 
 
30
class DeploymentTest(LandscapeIsolatedTest):
 
31
    # Ideally most of these tests won't need to be isolated.  But since the
 
32
    # deployment.BrokerService listens on DBUS unconditionally during
 
33
    # startService(),
 
34
    # we need them to be for now.
 
35
 
 
36
    helpers = [FakeRemoteBrokerHelper]
 
37
 
 
38
    def test_pinger(self):
 
39
        """
 
40
        The BrokerDBusObject sets up an active pinger which will cause exchanges
 
41
        to occur.
 
42
        """
 
43
        patched_fetch = self.mocker.replace("landscape.lib.fetch.fetch")
 
44
 
 
45
        # The FakeRemoteBrokerHelper defines this URL in the configuration
 
46
        patched_fetch("http://localhost:91910/", post=True,
 
47
                      data="insecure_id=42", headers=ANY)
 
48
        self.mocker.result(dumps({"messages": True}))
 
49
        self.mocker.count(2)
 
50
 
 
51
        self.mocker.replay()
 
52
        self.broker_service.identity.insecure_id = 42
 
53
        self.broker_service.startService()
 
54
        # 30 is the default interval between pings, and 60 is the urgent
 
55
        # exchange interval.  If we wait 60 seconds, we should get 2
 
56
        # pings and one exchange.
 
57
        self.broker_service.reactor.advance(60)
 
58
        self.assertEquals(len(self.broker_service.transport.payloads), 1)
 
59
 
 
60
    def test_post_exit_event_will_stop_reactor(self):
 
61
        reactor_mock = self.mocker.replace("twisted.internet.reactor")
 
62
        reactor_mock.stop()
 
63
        self.mocker.replay()
 
64
 
 
65
        self.broker_service.reactor.fire("post-exit")
 
66
 
 
67
 
 
68
class ConfigurationTests(LandscapeTest):
 
69
    helpers = [EnvironSaverHelper]
 
70
 
 
71
    def test_loading_sets_http_proxies(self):
 
72
        if "http_proxy" in os.environ:
 
73
            del os.environ["http_proxy"]
 
74
        if "https_proxy" in os.environ:
 
75
            del os.environ["https_proxy"]
 
76
 
 
77
        configuration = BrokerConfiguration()
 
78
        configuration.load(["--http-proxy", "foo",
 
79
                            "--https-proxy", "bar",
 
80
                            "--url", "whatever"])
 
81
        self.assertEquals(os.environ["http_proxy"], "foo")
 
82
        self.assertEquals(os.environ["https_proxy"], "bar")
 
83
 
 
84
    def test_loading_without_http_proxies_does_not_touch_environment(self):
 
85
        os.environ["http_proxy"] = "heyo"
 
86
        os.environ["https_proxy"] = "baroo"
 
87
 
 
88
        configuration = BrokerConfiguration()
 
89
        configuration.load(["--url", "whatever"])
 
90
        self.assertEquals(os.environ["http_proxy"], "heyo")
 
91
        self.assertEquals(os.environ["https_proxy"], "baroo")
 
92
 
 
93
    def test_loading_resets_http_proxies(self):
 
94
        """
 
95
        User scenario:
 
96
 
 
97
        Runs landscape-config, fat-fingers a random character into the
 
98
        http_proxy field when he didn't mean to. runs it again, this time
 
99
        leaving it blank. The proxy should be reset to whatever
 
100
        environment-supplied proxy there was at startup.
 
101
        """
 
102
        os.environ["http_proxy"] = "original"
 
103
        os.environ["https_proxy"] = "originals"
 
104
 
 
105
        configuration = BrokerConfiguration()
 
106
        configuration.load(["--http-proxy", "x",
 
107
                            "--https-proxy", "y",
 
108
                            "--url", "whatever"])
 
109
        self.assertEquals(os.environ["http_proxy"], "x")
 
110
        self.assertEquals(os.environ["https_proxy"], "y")
 
111
 
 
112
        configuration.load(["--url", "whatever"])
 
113
        self.assertEquals(os.environ["http_proxy"], "original")
 
114
        self.assertEquals(os.environ["https_proxy"], "originals")
 
115
 
 
116
    def test_intervals_are_ints(self):
 
117
        filename = self.makeFile("[client]\n"
 
118
                                 "urgent_exchange_interval = 12\n"
 
119
                                 "exchange_interval = 34\n")
 
120
 
 
121
        configuration = BrokerConfiguration()
 
122
        configuration.load(["--config", filename, "--url", "whatever"])
 
123
 
 
124
        self.assertEquals(configuration.urgent_exchange_interval, 12)
 
125
        self.assertEquals(configuration.exchange_interval, 34)