~free.ekanayaka/landscape-client/karmic-updates-1.4.4-0ubuntu0.9.10

« back to all changes in this revision

Viewing changes to landscape/broker/broker.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""The DBUS service which interfaces to the broker."""
 
2
 
 
3
import logging
 
4
 
 
5
from dbus.service import signal
 
6
import dbus.glib
 
7
 
 
8
from landscape.lib.dbus_util import (get_object, Object, method,
 
9
                                     byte_array, array_to_string)
 
10
from landscape.lib.bpickle import loads, dumps
 
11
from landscape.lib.twisted_util import gather_results
 
12
 
 
13
from landscape.manager.manager import FAILED
 
14
 
 
15
 
 
16
BUS_NAME = "com.canonical.landscape.Broker"
 
17
OBJECT_PATH = "/com/canonical/landscape/Broker"
 
18
IFACE_NAME = BUS_NAME
 
19
 
 
20
 
 
21
class BrokerDBusObject(Object):
 
22
    """A DBus-published object which allows adding messages to the queue."""
 
23
 
 
24
    bus_name = BUS_NAME
 
25
    object_path = OBJECT_PATH
 
26
 
 
27
    def __init__(self, config, reactor, exchange, registration,
 
28
                 message_store, bus):
 
29
        """
 
30
        @param exchange: The
 
31
            L{MessageExchange<landscape.exchange.MessageExchange>} to send
 
32
            messages with.
 
33
        @param bus: The L{Bus} that represents where we're listening.
 
34
        """
 
35
        super(BrokerDBusObject, self).__init__(bus)
 
36
        self._registered_plugins = set()
 
37
        self.bus = bus
 
38
        self.config = config
 
39
        self.reactor = reactor
 
40
        self.exchange = exchange
 
41
        self.registration = registration
 
42
        self.message_store = message_store
 
43
        reactor.call_on("message", self._broadcast_message)
 
44
        reactor.call_on("impending-exchange", self.impending_exchange)
 
45
        reactor.call_on("exchange-failed", self.exchange_failed)
 
46
        reactor.call_on("registration-done", self.registration_done)
 
47
        reactor.call_on("registration-failed", self.registration_failed)
 
48
        reactor.call_on("message-type-acceptance-changed",
 
49
                        self.message_type_acceptance_changed)
 
50
        reactor.call_on("resynchronize-clients", self.resynchronize)
 
51
 
 
52
    @signal(IFACE_NAME)
 
53
    def resynchronize(self):
 
54
        pass
 
55
 
 
56
    @signal(IFACE_NAME)
 
57
    def impending_exchange(self):
 
58
        pass
 
59
 
 
60
    @signal(IFACE_NAME)
 
61
    def exchange_failed(self):
 
62
        pass
 
63
 
 
64
    def _broadcast_message(self, message):
 
65
        blob = byte_array(dumps(message))
 
66
        results = []
 
67
        for plugin in self.get_plugin_objects():
 
68
            results.append(plugin.message(blob))
 
69
        return gather_results(results).addCallback(self._message_delivered,
 
70
                                                   message)
 
71
 
 
72
    def _message_delivered(self, results, message):
 
73
        """
 
74
        If the message wasn't handled, and it's an operation request (i.e. it
 
75
        has an operation-id), then respond with a failing operation result
 
76
        indicating as such.
 
77
        """
 
78
        opid = message.get("operation-id")
 
79
        if (True not in results
 
80
            and opid is not None
 
81
            and message["type"] != "resynchronize"):
 
82
            mtype = message["type"]
 
83
            logging.error("Nobody handled the %s message." % (mtype,))
 
84
 
 
85
            result_text = """\
 
86
Landscape client failed to handle this request (%s) because the
 
87
plugin which should handle it isn't available.  This could mean that the
 
88
plugin has been intentionally disabled, or that the client isn't running
 
89
properly, or you may be running an older version of the client that doesn't
 
90
support this feature.
 
91
 
 
92
Please contact the Landscape team for more information.
 
93
""" % (mtype,)
 
94
            response = {
 
95
                "type": "operation-result",
 
96
                "status": FAILED,
 
97
                "result-text": result_text,
 
98
                "operation-id": opid}
 
99
            self.exchange.send(response, urgent=True)
 
100
 
 
101
 
 
102
    @method(IFACE_NAME)
 
103
    def ping(self):
 
104
        """Return True"""
 
105
        return True
 
106
 
 
107
    @method(IFACE_NAME)
 
108
    def send_message(self, message, urgent=False):
 
109
        """Queue the given message in the message exchange.
 
110
 
 
111
        This method is DBUS-published.
 
112
 
 
113
        @param message: The message dict.
 
114
        @param urgent: If True, exchange urgently. Defaults to False.
 
115
        """
 
116
        message = loads(array_to_string(message))
 
117
        try:
 
118
            logging.debug("Got a %r message over DBUS." % (message["type"],))
 
119
        except (KeyError, TypeError), e:
 
120
            logging.exception(str(e))
 
121
        return self.exchange.send(message, urgent=urgent)
 
122
 
 
123
    @method(IFACE_NAME)
 
124
    def is_message_pending(self, message_id):
 
125
        return self.message_store.is_pending(message_id)
 
126
 
 
127
    @method(IFACE_NAME)
 
128
    def reload_configuration(self):
 
129
        self.config.reload()
 
130
        # Now we'll kill off everything else so that they can be restarted and
 
131
        # notice configuration changes.
 
132
        return self.stop_plugins()
 
133
 
 
134
    @method(IFACE_NAME)
 
135
    def register(self):
 
136
        return self.registration.register()
 
137
 
 
138
    @signal(IFACE_NAME)
 
139
    def registration_done(self):
 
140
        pass
 
141
 
 
142
    @signal(IFACE_NAME)
 
143
    def registration_failed(self):
 
144
        pass
 
145
 
 
146
    @method(IFACE_NAME, out_signature="as")
 
147
    def get_accepted_message_types(self):
 
148
        return self.message_store.get_accepted_types()
 
149
 
 
150
    @signal(IFACE_NAME)
 
151
    def message_type_acceptance_changed(self, type, accepted):
 
152
        pass
 
153
 
 
154
    @method(IFACE_NAME)
 
155
    def register_plugin(self, bus_name, object_path):
 
156
        self._registered_plugins.add((bus_name, object_path))
 
157
 
 
158
    @method(IFACE_NAME)
 
159
    def get_registered_plugins(self):
 
160
        return list(self._registered_plugins)
 
161
 
 
162
    def get_plugin_objects(self, retry_timeout=None):
 
163
        return [get_object(self.bus, bus_name, object_path,
 
164
                           retry_timeout=retry_timeout)
 
165
                for bus_name, object_path in self._registered_plugins]
 
166
 
 
167
    def stop_plugins(self):
 
168
        """Tell all plugins to exit."""
 
169
        results = []
 
170
        # We disable our timeout with retry_timeout=0 here.  The process might
 
171
        # already have exited, or be truly wedged, so the default DBus timeout
 
172
        # is good enough.
 
173
        for plugin in self.get_plugin_objects(retry_timeout=0):
 
174
            results.append(plugin.exit())
 
175
        result = gather_results(results, consume_errors=True)
 
176
        result.addCallback(lambda ignored: None)
 
177
        return result
 
178
 
 
179
    @method(IFACE_NAME)
 
180
    def exit(self):
 
181
        """Request a graceful exit from the broker.
 
182
 
 
183
        Before this method returns, all plugins will be notified of the
 
184
        broker's intention of exiting, so that they have the chance to
 
185
        stop whatever they're doing in a graceful way, and then exit
 
186
        themselves.
 
187
 
 
188
        This method will only return a result when all plugins returned
 
189
        their own results.
 
190
        """
 
191
        # Fire pre-exit before calling any of the plugins, so that everything
 
192
        # in the broker acknowledges that we're about to exit and asking
 
193
        # plugins to die.  This prevents any exchanges from happening,
 
194
        # for instance.
 
195
        self.reactor.fire("pre-exit")
 
196
 
 
197
        result = self.stop_plugins()
 
198
 
 
199
        def fire_post_exit(ignored):
 
200
            # Fire it shortly, to give us a chance to send a DBUS reply.
 
201
            self.reactor.call_later(1, lambda: self.reactor.fire("post-exit"))
 
202
        result.addBoth(fire_post_exit)
 
203
 
 
204
        return result