~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/exchange.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""The part of the broker which deals with communications with the server."""
 
2
import time
 
3
import logging
 
4
import md5
 
5
 
 
6
from twisted.internet.defer import succeed
 
7
 
 
8
from landscape.lib.message import got_next_expected, ANCIENT
 
9
from landscape.log import format_delta
 
10
from landscape import API
 
11
 
 
12
 
 
13
class MessageExchange(object):
 
14
    """
 
15
    The Message Exchange is the place where messages are sent to go
 
16
    out to the Landscape server.
 
17
 
 
18
    The Message Exchange will accumulate messages in its message store
 
19
    and periodically deliver them to the server.
 
20
    """
 
21
 
 
22
    plugin_name = "message-exchange"
 
23
 
 
24
    def __init__(self, reactor, store, transport, registration_info,
 
25
                 exchange_interval=60*60,
 
26
                 urgent_exchange_interval=10,
 
27
                 monitor_interval=None,
 
28
                 max_messages=100,
 
29
                 create_time=time.time):
 
30
        self._reactor = reactor
 
31
        self._message_store = store
 
32
        self._create_time = create_time
 
33
        self._transport = transport
 
34
        self._registration_info = registration_info
 
35
        self._exchange_interval = exchange_interval
 
36
        self._urgent_exchange_interval = urgent_exchange_interval
 
37
        self._max_messages = max_messages
 
38
        self._notification_id = None
 
39
        self._exchange_id = None
 
40
        self._exchanging = False
 
41
        self._urgent_exchange = False
 
42
 
 
43
        reactor.call_on("message", self._handle_message)
 
44
        reactor.call_on("resynchronize-clients", self._resynchronize)
 
45
        reactor.call_on("pre-exit", self.stop)
 
46
 
 
47
    def get_exchange_intervals(self):
 
48
        return (self._urgent_exchange_interval, self._exchange_interval)
 
49
 
 
50
    def send(self, message, urgent=False):
 
51
        """Include a message to be sent in an exchange.
 
52
 
 
53
        If urgent is True, an exchange with the server will be
 
54
        scheduled urgently.
 
55
        """
 
56
        if "timestamp" not in message:
 
57
            message["timestamp"] = int(self._reactor.time())
 
58
        message_id = self._message_store.add(message)
 
59
        if urgent:
 
60
            self.schedule_exchange(urgent=True)
 
61
        return message_id
 
62
 
 
63
    def start(self):
 
64
        """Start scheduling exchanges. The first one will be urgent."""
 
65
        self.schedule_exchange(urgent=True)
 
66
 
 
67
    def stop(self):
 
68
        if self._exchange_id is not None:
 
69
            self._reactor.cancel_call(self._exchange_id)
 
70
            self._exchange_id = None
 
71
        if self._notification_id is not None:
 
72
            self._reactor.cancel_call(self._notification_id)
 
73
            self._notification_id = None
 
74
 
 
75
    def _handle_accepted_types(self, message):
 
76
        """
 
77
        When the server updates us about the types of message it
 
78
        accepts, update our message store.
 
79
 
 
80
        If this makes existing held messages available for sending,
 
81
        urgently exchange messages.
 
82
 
 
83
        If new types are made available, a
 
84
        C{("message-type-accepted", type_name)} reactor event will
 
85
        be fired.
 
86
        """
 
87
        old_types = set(self._message_store.get_accepted_types())
 
88
        new_types = set(message["types"])
 
89
        diff = get_accepted_types_diff(old_types, new_types)
 
90
        self._message_store.set_accepted_types(new_types)
 
91
        logging.info("Accepted types changed: %s", diff)
 
92
        if self._message_store.get_pending_messages(1):
 
93
            self.schedule_exchange(urgent=True)
 
94
        for type in old_types - new_types:
 
95
            self._reactor.fire("message-type-acceptance-changed", type, False)
 
96
        for type in new_types - old_types:
 
97
            self._reactor.fire("message-type-acceptance-changed", type, True)
 
98
 
 
99
    def _handle_message(self, message):
 
100
        message_type = message["type"]
 
101
        if message_type == "accepted-types":
 
102
            self._handle_accepted_types(message)
 
103
        elif message_type == "resynchronize":
 
104
            self._handle_resynchronize(message)
 
105
        elif message_type == "set-intervals":
 
106
            self._handle_set_intervals(message)
 
107
 
 
108
    def _handle_resynchronize(self, message):
 
109
        opid = message["operation-id"]
 
110
        self._message_store.add({"type": "resynchronize",
 
111
                                 "operation-id": opid})
 
112
        self._reactor.fire("resynchronize-clients")
 
113
 
 
114
    def _resynchronize(self):
 
115
        self.schedule_exchange(urgent=True)
 
116
 
 
117
    def _handle_set_intervals(self, message):
 
118
        if "exchange" in message:
 
119
            self._exchange_interval = message["exchange"]
 
120
            logging.info("Exchange interval set to %d seconds." %
 
121
                         self._exchange_interval)
 
122
        if "urgent-exchange" in message:
 
123
            self._urgent_exchange_interval = message["urgent-exchange"]
 
124
            logging.info("Urgent exchange interval set to %d seconds." %
 
125
                         self._urgent_exchange_interval)
 
126
 
 
127
    def exchange(self):
 
128
        """Send pending messages to the server and process responses.
 
129
 
 
130
        @return: A deferred that is fired when exchange has completed.
 
131
 
 
132
        XXX Actually that is a lie right now. It returns before exchange is
 
133
        actually complete.
 
134
        """
 
135
        if self._exchanging:
 
136
            return
 
137
 
 
138
        self._exchanging = True
 
139
 
 
140
        self._reactor.fire("pre-exchange")
 
141
 
 
142
        payload = self.make_payload()
 
143
 
 
144
        start_time = self._create_time()
 
145
        if self._urgent_exchange:
 
146
            logging.info("Starting urgent message exchange with %s."
 
147
                         % self._transport.get_url())
 
148
        else:
 
149
            logging.info("Starting message exchange with %s."
 
150
                         % self._transport.get_url())
 
151
 
 
152
        def handle_result(result):
 
153
            self._exchanging = False
 
154
            if result:
 
155
                if self._urgent_exchange:
 
156
                    logging.info("Switching to normal exchange mode.")
 
157
                    self._urgent_exchange = False
 
158
                self._handle_result(payload, result)
 
159
            else:
 
160
                self._reactor.fire("exchange-failed")
 
161
                logging.info("Message exchange failed.")
 
162
 
 
163
            self.schedule_exchange(force=True)
 
164
            self._reactor.fire("exchange-done")
 
165
            logging.info("Message exchange completed in %s.",
 
166
                         format_delta(self._create_time() - start_time))
 
167
 
 
168
        self._reactor.call_in_thread(handle_result, None,
 
169
                                     self._transport.exchange, payload,
 
170
                                     self._registration_info.secure_id,
 
171
                                     payload.get("server-api"))
 
172
        # exchange will eventually return a Deferred, especially when
 
173
        # mp-better-transport-factoring is merged.
 
174
        return succeed(None)
 
175
 
 
176
    def is_urgent(self):
 
177
        """Return a bool showing whether there is an urgent exchange scheduled.
 
178
        """
 
179
        return self._urgent_exchange
 
180
 
 
181
    def schedule_exchange(self, urgent=False, force=False):
 
182
        """Schedule an exchange to happen.
 
183
 
 
184
        The exchange will occur after some time based on whether C{urgent} is
 
185
        True. An C{impending-exchange} reactor event will be emitted
 
186
        approximately 10 seconds before the exchange is started.
 
187
 
 
188
        @param urgent: If true, ensure an exchange happens within the
 
189
                       urgent interval.  This will reschedule the exchange
 
190
                       if necessary.  If another urgent exchange is already
 
191
                       scheduled, nothing happens.
 
192
        @param force: If true, an exchange will necessarily be scheduled,
 
193
                      even if it was already scheduled before.
 
194
        """
 
195
        # The 'not self._exchanging' check below is currently untested.
 
196
        # It's a bit tricky to test as it is preventing rehooking 'exchange'
 
197
        # while there's a background thread doing the exchange itself.
 
198
        if (not self._exchanging and
 
199
            (force or self._exchange_id is None or
 
200
             urgent and not self._urgent_exchange)):
 
201
            if urgent:
 
202
                self._urgent_exchange = True
 
203
            if self._exchange_id:
 
204
                self._reactor.cancel_call(self._exchange_id)
 
205
 
 
206
            if self._urgent_exchange:
 
207
                interval = self._urgent_exchange_interval
 
208
            else:
 
209
                interval = self._exchange_interval
 
210
 
 
211
            if self._notification_id is not None:
 
212
                self._reactor.cancel_call(self._notification_id)
 
213
            notification_interval = interval - 10
 
214
            self._notification_id = self._reactor.call_later(
 
215
                notification_interval, self._notify_impending_exchange)
 
216
 
 
217
            self._exchange_id = self._reactor.call_later(interval,
 
218
                                                         self.exchange)
 
219
 
 
220
    def _notify_impending_exchange(self):
 
221
        self._reactor.fire("impending-exchange")
 
222
 
 
223
    def make_payload(self):
 
224
        """Return a dict representing the complete payload."""
 
225
        store = self._message_store
 
226
        accepted_types_str = ";".join(store.get_accepted_types())
 
227
        accepted_types_digest = md5.new(accepted_types_str).digest()
 
228
        messages = store.get_pending_messages(self._max_messages)
 
229
        total_messages = store.count_pending_messages()
 
230
        if messages:
 
231
            # Each message is tagged with the API that the client was
 
232
            # using at the time the message got added to the store.  The
 
233
            # logic below will make sure that all messages which are added
 
234
            # to the payload being built will have the same api, and any
 
235
            # other messages will be postponed to the next exchange.
 
236
            server_api = messages[0].get("api")
 
237
            for i, message in enumerate(messages):
 
238
                if message.get("api") != server_api:
 
239
                    break
 
240
            else:
 
241
                i = None
 
242
            if i is not None:
 
243
                del messages[i:]
 
244
 
 
245
            # DEPRECATED Remove this once API 2.0 is gone:
 
246
            if server_api is None:
 
247
                # The per-message API logic was introduced on API 2.1, so a
 
248
                # missing API must be 2.0.
 
249
                server_api = "2.0"
 
250
        else:
 
251
            server_api = API
 
252
        payload = {"server-api": server_api,
 
253
                   "client-api": API,
 
254
                   "sequence": store.get_sequence(),
 
255
                   "messages": messages,
 
256
                   "total-messages": total_messages,
 
257
                   "next-expected-sequence": store.get_server_sequence(),
 
258
                   "accepted-types": accepted_types_digest,
 
259
                  }
 
260
        return payload
 
261
 
 
262
    def _handle_result(self, payload, result):
 
263
        message_store = self._message_store
 
264
 
 
265
        next_expected = result.get("next-expected-sequence")
 
266
        old_sequence = message_store.get_sequence()
 
267
        if next_expected is None:
 
268
            next_expected = message_store.get_sequence()
 
269
            next_expected += len(payload["messages"])
 
270
 
 
271
        message_store_state = got_next_expected(message_store, next_expected)
 
272
        message_store.commit()
 
273
        if message_store_state == ANCIENT:
 
274
            # The server has probably lost some data we sent it. The
 
275
            # slate has been wiped clean (by got_next_expected), now
 
276
            # let's fire an event to tell all the plugins that they
 
277
            # ought to generate new messages so the server gets some
 
278
            # up-to-date data.
 
279
            logging.info("Server asked for ancient data: resynchronizing all "
 
280
                         "state with the server.")
 
281
 
 
282
            message_store.add({"type": "resynchronize"})
 
283
            self._reactor.fire("resynchronize-clients")
 
284
 
 
285
 
 
286
        sequence = message_store.get_server_sequence()
 
287
        for message in result.get("messages", ()):
 
288
            self._reactor.fire("message", message)
 
289
            sequence += 1
 
290
            message_store.set_server_sequence(sequence)
 
291
            message_store.commit()
 
292
 
 
293
        if message_store.get_pending_messages(1):
 
294
            logging.info("Pending messages remain after the last exchange.")
 
295
            # Either the server asked us for old messages, or we
 
296
            # otherwise have more messages even after transferring
 
297
            # what we could.
 
298
            if next_expected != old_sequence:
 
299
                self.schedule_exchange(urgent=True)
 
300
 
 
301
 
 
302
def get_accepted_types_diff(old_types, new_types):
 
303
    old_types = set(old_types)
 
304
    new_types = set(new_types)
 
305
    added_types = new_types - old_types
 
306
    stable_types = old_types & new_types
 
307
    removed_types = old_types - new_types
 
308
    diff = []
 
309
    diff.extend(["+%s" % type for type in added_types])
 
310
    diff.extend(["%s" % type for type in stable_types])
 
311
    diff.extend(["-%s" % type for type in removed_types])
 
312
    return " ".join(diff)