~free.ekanayaka/landscape-client/lucid-1.5.2.1-0ubuntu0.10.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/exchange.py

  • Committer: Bazaar Package Importer
  • Author(s): Mathias Gug, Free Ekanayaka
  • Date: 2009-07-22 14:54:50 UTC
  • mfrom: (1.1.9 upstream)
  • Revision ID: james.westby@ubuntu.com-20090722145450-pvbp13gh8734c8ft
Tags: 1.3.2.2-0ubuntu0.9.10.1
[ Free Ekanayaka ]
* New upstream release:
  - Include the README file in landscape-client (LP: #396260)
  - Fix client capturing stderr from run_command when constructing
    hash-id-databases url (LP: #397480)
  - Use substvars to conditionally depend on update-motd or
    libpam-modules (LP: #393454)
  - Fix reporting wrong version to the server (LP: #391225)
  - The init script does not wait for the network to be available
    before checking for EC2 user data (LP: #383336)
  - When the broker is restarted by the watchdog, the state of the client
    is inconsistent (LP: #380633)
  - Package stays unknown forever in the client with hash-id-databases
    support (LP: #381356)
  - Standard error not captured when calling smart-update (LP: #387441)
  - Changer calls reporter without switching groups, just user (LP: #388092)
  - Run smart update in the package-reporter instead of having a cronjob (LP: #362355)
  - Package changer does not inherit proxy settings (LP: #381241)
  - The ./test script doesn't work in landscape-client (LP: #381613)
  - The source package should build on all supported releases (LP: #385098)
  - Strip smart update's output (LP: #387331)
  - The fetch() timeout isn't based on activity (#389224)
  - Client can use a UUID of "None" when fetching the hash-id-database (LP: #381291)
  - Registration should use the fqdn rather than just the hostname (LP: #385730)

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
                 monitor_interval=None,
28
28
                 max_messages=100,
29
29
                 create_time=time.time):
 
30
        """
 
31
        @param reactor: A L{TwistedReactor} used to fire events in response
 
32
            to messages received by the server.
 
33
        @param store: A L{MessageStore} used to queue outgoing messages.
 
34
        @param transport: A L{HTTPTransport} used to deliver messages.
 
35
        @param exchange_interval: time interval between subsequent
 
36
            exchanges of non-urgent messages.
 
37
        @param urgent_exachange_interval: time interval between subsequent
 
38
            exchanges of urgent messages.
 
39
        """
30
40
        self._reactor = reactor
31
41
        self._message_store = store
32
42
        self._create_time = create_time
50
60
        reactor.call_on("pre-exit", self.stop)
51
61
 
52
62
    def get_exchange_intervals(self):
 
63
        """Return a binary tuple with urgent and normal exchange intervals."""
53
64
        return (self._urgent_exchange_interval, self._exchange_interval)
54
65
 
55
66
    def send(self, message, urgent=False):
57
68
 
58
69
        If urgent is True, an exchange with the server will be
59
70
        scheduled urgently.
 
71
 
 
72
        @param message: Same as in L{MessageStore.add}.
60
73
        """
61
74
        if "timestamp" not in message:
62
75
            message["timestamp"] = int(self._reactor.time())
85
98
        If this makes existing held messages available for sending,
86
99
        urgently exchange messages.
87
100
 
88
 
        If new types are made available, a
89
 
        C{("message-type-accepted", type_name)} reactor event will
90
 
        be fired.
 
101
        If new types are made available or old types are dropped a
 
102
        C{("message-type-acceptance-changed", type, bool)} reactor
 
103
        event will be fired.
91
104
        """
92
105
        old_types = set(self._message_store.get_accepted_types())
93
106
        new_types = set(message["types"])
123
136
    def exchange(self):
124
137
        """Send pending messages to the server and process responses.
125
138
 
126
 
        @return: A deferred that is fired when exchange has completed.
 
139
        An C{pre-exchange} reactor event will be emitted just before the
 
140
        actual exchange takes place.
 
141
 
 
142
        An C{exchange-done} or C{exchange-failed} reactor event will be
 
143
        emitted after a successful or failed exchange.
 
144
 
 
145
        @return: A L{Deferred} that is fired when exchange has completed.
127
146
 
128
147
        XXX Actually that is a lie right now. It returns before exchange is
129
148
        actually complete.
182
201
        approximately 10 seconds before the exchange is started.
183
202
 
184
203
        @param urgent: If true, ensure an exchange happens within the
185
 
                       urgent interval.  This will reschedule the exchange
186
 
                       if necessary.  If another urgent exchange is already
187
 
                       scheduled, nothing happens.
 
204
            urgent interval.  This will reschedule the exchange if necessary.
 
205
            If another urgent exchange is already scheduled, nothing happens.
188
206
        @param force: If true, an exchange will necessarily be scheduled,
189
 
                      even if it was already scheduled before.
 
207
            even if it was already scheduled before.
190
208
        """
191
209
        # The 'not self._exchanging' check below is currently untested.
192
210
        # It's a bit tricky to test as it is preventing rehooking 'exchange'
217
235
        self._reactor.fire("impending-exchange")
218
236
 
219
237
    def make_payload(self):
220
 
        """Return a dict representing the complete payload."""
 
238
        """Return a dict representing the complete exchange payload.
 
239
 
 
240
        The payload will contain all pending messages eligible for
 
241
        delivery, up to a maximum of C{max_messages} as passed to
 
242
        the L{__init__} method.
 
243
        """
221
244
        store = self._message_store
222
245
        accepted_types_digest = self._hash_types(store.get_accepted_types())
223
246
        messages = store.get_pending_messages(self._max_messages)
263
286
        return md5.new(accepted_types_str).digest()
264
287
 
265
288
    def _handle_result(self, payload, result):
 
289
        """Handle a response from the server.
 
290
 
 
291
        Called by L{exchange} after a batch of messages has been
 
292
        successfully delivered to the server.
 
293
 
 
294
        If the C{server_uuid} changed, a C{"server-uuid-changed"} event
 
295
        will be fired.
 
296
 
 
297
        Call L{handle_message} for each message in C{result}.
 
298
 
 
299
        @param payload: The payload that was sent to the server.
 
300
        @param result: The response got in reply to the C{payload}.
 
301
        """
266
302
        message_store = self._message_store
267
303
        self._client_accepted_types_hash = result.get("client-accepted-types-hash")
268
304
        next_expected = result.get("next-expected-sequence")
309
345
                self.schedule_exchange(urgent=True)
310
346
 
311
347
    def register_message(self, type, handler):
312
 
        """
313
 
        Register a handler to be called when a message of the given
314
 
        type has been received from the server.
 
348
        """Register a handler for the given message type.
 
349
 
 
350
        The C{handler} callable will to be executed when a message of
 
351
        type C{type} has been received from the server.
315
352
 
316
353
        Multiple handlers for the same type will be called in the
317
354
        order they were registered.