1
"""The part of the broker which deals with communications with the server."""
6
from twisted.internet.defer import succeed
8
from landscape.lib.message import got_next_expected, ANCIENT
9
from landscape.log import format_delta
10
from landscape import API
13
class MessageExchange(object):
15
The Message Exchange is the place where messages are sent to go
16
out to the Landscape server.
18
The Message Exchange will accumulate messages in its message store
19
and periodically deliver them to the server.
22
plugin_name = "message-exchange"
24
def __init__(self, reactor, store, transport, registration_info,
25
exchange_interval=60*60,
26
urgent_exchange_interval=10,
27
monitor_interval=None,
29
create_time=time.time):
30
self._reactor = reactor
31
self._message_store = store
32
self._create_time = create_time
33
self._transport = transport
34
self._registration_info = registration_info
35
self._exchange_interval = exchange_interval
36
self._urgent_exchange_interval = urgent_exchange_interval
37
self._max_messages = max_messages
38
self._notification_id = None
39
self._exchange_id = None
40
self._exchanging = False
41
self._urgent_exchange = False
43
reactor.call_on("message", self._handle_message)
44
reactor.call_on("resynchronize-clients", self._resynchronize)
45
reactor.call_on("pre-exit", self.stop)
47
def get_exchange_intervals(self):
48
return (self._urgent_exchange_interval, self._exchange_interval)
50
def send(self, message, urgent=False):
51
"""Include a message to be sent in an exchange.
53
If urgent is True, an exchange with the server will be
56
if "timestamp" not in message:
57
message["timestamp"] = int(self._reactor.time())
58
message_id = self._message_store.add(message)
60
self.schedule_exchange(urgent=True)
64
"""Start scheduling exchanges. The first one will be urgent."""
65
self.schedule_exchange(urgent=True)
68
if self._exchange_id is not None:
69
self._reactor.cancel_call(self._exchange_id)
70
self._exchange_id = None
71
if self._notification_id is not None:
72
self._reactor.cancel_call(self._notification_id)
73
self._notification_id = None
75
def _handle_accepted_types(self, message):
77
When the server updates us about the types of message it
78
accepts, update our message store.
80
If this makes existing held messages available for sending,
81
urgently exchange messages.
83
If new types are made available, a
84
C{("message-type-accepted", type_name)} reactor event will
87
old_types = set(self._message_store.get_accepted_types())
88
new_types = set(message["types"])
89
diff = get_accepted_types_diff(old_types, new_types)
90
self._message_store.set_accepted_types(new_types)
91
logging.info("Accepted types changed: %s", diff)
92
if self._message_store.get_pending_messages(1):
93
self.schedule_exchange(urgent=True)
94
for type in old_types - new_types:
95
self._reactor.fire("message-type-acceptance-changed", type, False)
96
for type in new_types - old_types:
97
self._reactor.fire("message-type-acceptance-changed", type, True)
99
def _handle_message(self, message):
100
message_type = message["type"]
101
if message_type == "accepted-types":
102
self._handle_accepted_types(message)
103
elif message_type == "resynchronize":
104
self._handle_resynchronize(message)
105
elif message_type == "set-intervals":
106
self._handle_set_intervals(message)
108
def _handle_resynchronize(self, message):
109
opid = message["operation-id"]
110
self._message_store.add({"type": "resynchronize",
111
"operation-id": opid})
112
self._reactor.fire("resynchronize-clients")
114
def _resynchronize(self):
115
self.schedule_exchange(urgent=True)
117
def _handle_set_intervals(self, message):
118
if "exchange" in message:
119
self._exchange_interval = message["exchange"]
120
logging.info("Exchange interval set to %d seconds." %
121
self._exchange_interval)
122
if "urgent-exchange" in message:
123
self._urgent_exchange_interval = message["urgent-exchange"]
124
logging.info("Urgent exchange interval set to %d seconds." %
125
self._urgent_exchange_interval)
128
"""Send pending messages to the server and process responses.
130
@return: A deferred that is fired when exchange has completed.
132
XXX Actually that is a lie right now. It returns before exchange is
138
self._exchanging = True
140
self._reactor.fire("pre-exchange")
142
payload = self.make_payload()
144
start_time = self._create_time()
145
if self._urgent_exchange:
146
logging.info("Starting urgent message exchange with %s."
147
% self._transport.get_url())
149
logging.info("Starting message exchange with %s."
150
% self._transport.get_url())
152
def handle_result(result):
153
self._exchanging = False
155
if self._urgent_exchange:
156
logging.info("Switching to normal exchange mode.")
157
self._urgent_exchange = False
158
self._handle_result(payload, result)
160
self._reactor.fire("exchange-failed")
161
logging.info("Message exchange failed.")
163
self.schedule_exchange(force=True)
164
self._reactor.fire("exchange-done")
165
logging.info("Message exchange completed in %s.",
166
format_delta(self._create_time() - start_time))
168
self._reactor.call_in_thread(handle_result, None,
169
self._transport.exchange, payload,
170
self._registration_info.secure_id,
171
payload.get("server-api"))
172
# exchange will eventually return a Deferred, especially when
173
# mp-better-transport-factoring is merged.
177
"""Return a bool showing whether there is an urgent exchange scheduled.
179
return self._urgent_exchange
181
def schedule_exchange(self, urgent=False, force=False):
182
"""Schedule an exchange to happen.
184
The exchange will occur after some time based on whether C{urgent} is
185
True. An C{impending-exchange} reactor event will be emitted
186
approximately 10 seconds before the exchange is started.
188
@param urgent: If true, ensure an exchange happens within the
189
urgent interval. This will reschedule the exchange
190
if necessary. If another urgent exchange is already
191
scheduled, nothing happens.
192
@param force: If true, an exchange will necessarily be scheduled,
193
even if it was already scheduled before.
195
# The 'not self._exchanging' check below is currently untested.
196
# It's a bit tricky to test as it is preventing rehooking 'exchange'
197
# while there's a background thread doing the exchange itself.
198
if (not self._exchanging and
199
(force or self._exchange_id is None or
200
urgent and not self._urgent_exchange)):
202
self._urgent_exchange = True
203
if self._exchange_id:
204
self._reactor.cancel_call(self._exchange_id)
206
if self._urgent_exchange:
207
interval = self._urgent_exchange_interval
209
interval = self._exchange_interval
211
if self._notification_id is not None:
212
self._reactor.cancel_call(self._notification_id)
213
notification_interval = interval - 10
214
self._notification_id = self._reactor.call_later(
215
notification_interval, self._notify_impending_exchange)
217
self._exchange_id = self._reactor.call_later(interval,
220
def _notify_impending_exchange(self):
221
self._reactor.fire("impending-exchange")
223
def make_payload(self):
224
"""Return a dict representing the complete payload."""
225
store = self._message_store
226
accepted_types_str = ";".join(store.get_accepted_types())
227
accepted_types_digest = md5.new(accepted_types_str).digest()
228
messages = store.get_pending_messages(self._max_messages)
229
total_messages = store.count_pending_messages()
231
# Each message is tagged with the API that the client was
232
# using at the time the message got added to the store. The
233
# logic below will make sure that all messages which are added
234
# to the payload being built will have the same api, and any
235
# other messages will be postponed to the next exchange.
236
server_api = messages[0].get("api")
237
for i, message in enumerate(messages):
238
if message.get("api") != server_api:
245
# DEPRECATED Remove this once API 2.0 is gone:
246
if server_api is None:
247
# The per-message API logic was introduced on API 2.1, so a
248
# missing API must be 2.0.
252
payload = {"server-api": server_api,
254
"sequence": store.get_sequence(),
255
"messages": messages,
256
"total-messages": total_messages,
257
"next-expected-sequence": store.get_server_sequence(),
258
"accepted-types": accepted_types_digest,
262
def _handle_result(self, payload, result):
263
message_store = self._message_store
265
next_expected = result.get("next-expected-sequence")
266
old_sequence = message_store.get_sequence()
267
if next_expected is None:
268
next_expected = message_store.get_sequence()
269
next_expected += len(payload["messages"])
271
message_store_state = got_next_expected(message_store, next_expected)
272
message_store.commit()
273
if message_store_state == ANCIENT:
274
# The server has probably lost some data we sent it. The
275
# slate has been wiped clean (by got_next_expected), now
276
# let's fire an event to tell all the plugins that they
277
# ought to generate new messages so the server gets some
279
logging.info("Server asked for ancient data: resynchronizing all "
280
"state with the server.")
282
message_store.add({"type": "resynchronize"})
283
self._reactor.fire("resynchronize-clients")
286
sequence = message_store.get_server_sequence()
287
for message in result.get("messages", ()):
288
self._reactor.fire("message", message)
290
message_store.set_server_sequence(sequence)
291
message_store.commit()
293
if message_store.get_pending_messages(1):
294
logging.info("Pending messages remain after the last exchange.")
295
# Either the server asked us for old messages, or we
296
# otherwise have more messages even after transferring
298
if next_expected != old_sequence:
299
self.schedule_exchange(urgent=True)
302
def get_accepted_types_diff(old_types, new_types):
303
old_types = set(old_types)
304
new_types = set(new_types)
305
added_types = new_types - old_types
306
stable_types = old_types & new_types
307
removed_types = old_types - new_types
309
diff.extend(["+%s" % type for type in added_types])
310
diff.extend(["%s" % type for type in stable_types])
311
diff.extend(["-%s" % type for type in removed_types])
312
return " ".join(diff)