~ahasenack/landscape-client/landscape-client-11.02-0ubuntu0.8.04.1

« back to all changes in this revision

Viewing changes to landscape/package/reporter.py

  • Committer: Andreas Hasenack
  • Date: 2011-05-05 14:12:15 UTC
  • Revision ID: andreas@canonical.com-20110505141215-5ymuyyh5es9pwa6p
Added hardy files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import urlparse
 
2
import logging
 
3
import time
 
4
import sys
 
5
import os
 
6
 
 
7
from twisted.internet.defer import Deferred, succeed
 
8
from twisted.internet.utils import getProcessOutputAndValue
 
9
 
 
10
from landscape.lib.sequenceranges import sequence_to_ranges
 
11
from landscape.lib.twisted_util import gather_results
 
12
from landscape.lib.fetch import fetch_async
 
13
from landscape.lib.fs import touch_file
 
14
 
 
15
from landscape.package.taskhandler import (
 
16
    PackageTaskHandlerConfiguration, PackageTaskHandler, run_task_handler)
 
17
from landscape.package.store import UnknownHashIDRequest
 
18
 
 
19
 
 
20
HASH_ID_REQUEST_TIMEOUT = 7200
 
21
MAX_UNKNOWN_HASHES_PER_REQUEST = 500
 
22
 
 
23
 
 
24
class PackageReporterConfiguration(PackageTaskHandlerConfiguration):
 
25
    """Specialized configuration for the Landscape package-reporter."""
 
26
 
 
27
    def make_parser(self):
 
28
        """
 
29
        Specialize L{Configuration.make_parser}, adding options
 
30
        reporter-specific options.
 
31
        """
 
32
        parser = super(PackageReporterConfiguration, self).make_parser()
 
33
        parser.add_option("--force-smart-update", default=False,
 
34
                          action="store_true",
 
35
                          help="Force running smart-update.")
 
36
        return parser
 
37
 
 
38
 
 
39
class PackageReporter(PackageTaskHandler):
 
40
    """Report information about the system packages.
 
41
 
 
42
    @cvar queue_name: Name of the task queue to pick tasks from.
 
43
    @cvar smart_update_interval: Time interval in minutes to pass to
 
44
        the C{--after} command line option of C{smart-update}.
 
45
    """
 
46
    config_factory = PackageReporterConfiguration
 
47
 
 
48
    queue_name = "reporter"
 
49
 
 
50
    smart_update_interval = 60
 
51
    smart_update_filename = "/usr/lib/landscape/smart-update"
 
52
    sources_list_filename = "/etc/apt/sources.list"
 
53
    sources_list_directory = "/etc/apt/sources.list.d"
 
54
 
 
55
    def run(self):
 
56
        result = Deferred()
 
57
 
 
58
        # Run smart-update before anything else, to make sure that
 
59
        # the SmartFacade will load freshly updated channels
 
60
        result.addCallback(lambda x: self.run_smart_update())
 
61
 
 
62
        # If the appropriate hash=>id db is not there, fetch it
 
63
        result.addCallback(lambda x: self.fetch_hash_id_db())
 
64
 
 
65
        # Attach the hash=>id database if available
 
66
        result.addCallback(lambda x: self.use_hash_id_db())
 
67
 
 
68
        # Now, handle any queued tasks.
 
69
        result.addCallback(lambda x: self.handle_tasks())
 
70
 
 
71
        # Then, remove any expired hash=>id translation requests.
 
72
        result.addCallback(lambda x: self.remove_expired_hash_id_requests())
 
73
 
 
74
        # After that, check if we have any unknown hashes to request.
 
75
        result.addCallback(lambda x: self.request_unknown_hashes())
 
76
 
 
77
        # Finally, verify if we have anything new to report to the server.
 
78
        result.addCallback(lambda x: self.detect_changes())
 
79
 
 
80
        result.callback(None)
 
81
        return result
 
82
 
 
83
    def fetch_hash_id_db(self):
 
84
        """
 
85
        Fetch the appropriate pre-canned database of hash=>id mappings
 
86
        from the server. If the database is already present, it won't
 
87
        be downloaded twice.
 
88
 
 
89
        The format of the database filename is <uuid>_<codename>_<arch>,
 
90
        and it will be downloaded from the HTTP directory set in
 
91
        config.package_hash_id_url, or config.url/hash-id-databases if
 
92
        the former is not set.
 
93
 
 
94
        Fetch failures are handled gracefully and logged as appropriate.
 
95
        """
 
96
 
 
97
        def fetch_it(hash_id_db_filename):
 
98
 
 
99
            if hash_id_db_filename is None:
 
100
                # Couldn't determine which hash=>id database to fetch,
 
101
                # just ignore the failure and go on
 
102
                return
 
103
 
 
104
            if os.path.exists(hash_id_db_filename):
 
105
                # We don't download twice
 
106
                return
 
107
 
 
108
            base_url = self._get_hash_id_db_base_url()
 
109
            if not base_url:
 
110
                logging.warning("Can't determine the hash=>id database url")
 
111
                return
 
112
 
 
113
            # Cast to str as pycurl doesn't like unicode
 
114
            url = str(base_url + os.path.basename(hash_id_db_filename))
 
115
 
 
116
            def fetch_ok(data):
 
117
                hash_id_db_fd = open(hash_id_db_filename, "w")
 
118
                hash_id_db_fd.write(data)
 
119
                hash_id_db_fd.close()
 
120
                logging.info("Downloaded hash=>id database from %s" % url)
 
121
 
 
122
            def fetch_error(failure):
 
123
                exception = failure.value
 
124
                logging.warning("Couldn't download hash=>id database: %s" %
 
125
                                str(exception))
 
126
 
 
127
            result = fetch_async(url,
 
128
                                 cainfo=self._config.get("ssl_public_key"))
 
129
            result.addCallback(fetch_ok)
 
130
            result.addErrback(fetch_error)
 
131
 
 
132
            return result
 
133
 
 
134
        result = self._determine_hash_id_db_filename()
 
135
        result.addCallback(fetch_it)
 
136
        return result
 
137
 
 
138
    def _get_hash_id_db_base_url(self):
 
139
 
 
140
        base_url = self._config.get("package_hash_id_url")
 
141
 
 
142
        if not base_url:
 
143
 
 
144
            if not self._config.get("url"):
 
145
                # We really have no idea where to download from
 
146
                return None
 
147
 
 
148
            # If config.url is http://host:123/path/to/message-system
 
149
            # then we'll use http://host:123/path/to/hash-id-databases
 
150
            base_url = urlparse.urljoin(self._config.url.rstrip("/"),
 
151
                                        "hash-id-databases")
 
152
 
 
153
        return base_url.rstrip("/") + "/"
 
154
 
 
155
    def _apt_sources_have_changed(self):
 
156
        """Return a boolean indicating if the APT sources were modified."""
 
157
        from landscape.monitor.packagemonitor import PackageMonitor
 
158
 
 
159
        filenames = []
 
160
 
 
161
        if os.path.exists(self.sources_list_filename):
 
162
            filenames.append(self.sources_list_filename)
 
163
 
 
164
        if os.path.exists(self.sources_list_directory):
 
165
            filenames.extend(
 
166
                [os.path.join(self.sources_list_directory, filename) for
 
167
                 filename in os.listdir(self.sources_list_directory)])
 
168
 
 
169
        for filename in filenames:
 
170
            seconds_since_last_change = (
 
171
                time.time() - os.path.getmtime(filename))
 
172
            if seconds_since_last_change < PackageMonitor.run_interval:
 
173
                return True
 
174
 
 
175
        return False
 
176
 
 
177
    def run_smart_update(self):
 
178
        """Run smart-update and log a warning in case of non-zero exit code.
 
179
 
 
180
        @return: a deferred returning (out, err, code)
 
181
        """
 
182
        if self._config.force_smart_update or self._apt_sources_have_changed():
 
183
            args = ()
 
184
        else:
 
185
            args = ("--after", "%d" % self.smart_update_interval)
 
186
        result = getProcessOutputAndValue(self.smart_update_filename,
 
187
                                          args=args)
 
188
 
 
189
        def callback((out, err, code)):
 
190
            # smart-update --after N will exit with error code 1 when it
 
191
            # doesn't actually run the update code because to enough time
 
192
            # has passed yet, but we don't actually consider it a failure.
 
193
            smart_failed = False
 
194
            if code != 0 and code != 1:
 
195
                smart_failed = True
 
196
            if code == 1 and err.strip() != "":
 
197
                smart_failed = True
 
198
            if smart_failed:
 
199
                logging.warning("'%s' exited with status %d (%s)" % (
 
200
                    self.smart_update_filename, code, err))
 
201
            logging.debug("'%s' exited with status %d (out='%s', err='%s'" % (
 
202
                self.smart_update_filename, code, out, err))
 
203
            touch_file(self._config.smart_update_stamp_filename)
 
204
            return (out, err, code)
 
205
 
 
206
        result.addCallback(callback)
 
207
        return result
 
208
 
 
209
    def handle_task(self, task):
 
210
        message = task.data
 
211
        if message["type"] == "package-ids":
 
212
            return self._handle_package_ids(message)
 
213
        if message["type"] == "resynchronize":
 
214
            return self._handle_resynchronize()
 
215
 
 
216
    def _handle_package_ids(self, message):
 
217
        unknown_hashes = []
 
218
 
 
219
        try:
 
220
            request = self._store.get_hash_id_request(message["request-id"])
 
221
        except UnknownHashIDRequest:
 
222
            # We've lost this request somehow.  It will be re-requested later.
 
223
            return succeed(None)
 
224
 
 
225
        hash_ids = {}
 
226
 
 
227
        for hash, id in zip(request.hashes, message["ids"]):
 
228
            if id is None:
 
229
                unknown_hashes.append(hash)
 
230
            else:
 
231
                hash_ids[hash] = id
 
232
 
 
233
        self._store.set_hash_ids(hash_ids)
 
234
 
 
235
        logging.info("Received %d package hash => id translations, %d hashes "
 
236
                     "are unknown." % (len(hash_ids), len(unknown_hashes)))
 
237
 
 
238
        if unknown_hashes:
 
239
            result = self._handle_unknown_packages(unknown_hashes)
 
240
        else:
 
241
            result = succeed(None)
 
242
 
 
243
        # Remove the request if everything goes well.
 
244
        result.addCallback(lambda x: request.remove())
 
245
 
 
246
        return result
 
247
 
 
248
    def _handle_resynchronize(self):
 
249
        self._store.clear_available()
 
250
        self._store.clear_available_upgrades()
 
251
        self._store.clear_installed()
 
252
        self._store.clear_locked()
 
253
        self._store.clear_package_locks()
 
254
 
 
255
        # Don't clear the hash_id_requests table because the messages
 
256
        # associated with the existing requests might still have to be
 
257
        # delivered, and if we clear the table and later create a new request,
 
258
        # that new request could get the same id of one of the deleted ones,
 
259
        # and when the pending message eventually gets delivered the reporter
 
260
        # would think that the message is associated to the newly created
 
261
        # request, as it have the same id has the deleted request the message
 
262
        # actually refers to. This would cause the ids in the message to be
 
263
        # possibly mapped to the wrong hashes.
 
264
        #
 
265
        # This problem would happen for example when switching the client from
 
266
        # one Landscape server to another, because the uuid-changed event would
 
267
        # cause a resynchronize task to be created by the monitor. See #417122.
 
268
 
 
269
        return succeed(None)
 
270
 
 
271
    def _handle_unknown_packages(self, hashes):
 
272
 
 
273
        self._facade.ensure_channels_reloaded()
 
274
 
 
275
        hashes = set(hashes)
 
276
        added_hashes = []
 
277
        packages = []
 
278
        for package in self._facade.get_packages():
 
279
            hash = self._facade.get_package_hash(package)
 
280
            if hash in hashes:
 
281
                added_hashes.append(hash)
 
282
                skeleton = self._facade.get_package_skeleton(package)
 
283
                packages.append({"type": skeleton.type,
 
284
                                 "name": skeleton.name,
 
285
                                 "version": skeleton.version,
 
286
                                 "section": skeleton.section,
 
287
                                 "summary": skeleton.summary,
 
288
                                 "description": skeleton.description,
 
289
                                 "size": skeleton.size,
 
290
                                 "installed-size": skeleton.installed_size,
 
291
                                 "relations": skeleton.relations})
 
292
 
 
293
        if packages:
 
294
            logging.info("Queuing messages with data for %d packages to "
 
295
                         "exchange urgently." % len(packages))
 
296
 
 
297
            message = {"type": "add-packages", "packages": packages}
 
298
 
 
299
            result = self._send_message_with_hash_id_request(message,
 
300
                                                             added_hashes)
 
301
        else:
 
302
            result = succeed(None)
 
303
 
 
304
        return result
 
305
 
 
306
    def remove_expired_hash_id_requests(self):
 
307
        now = time.time()
 
308
        timeout = now - HASH_ID_REQUEST_TIMEOUT
 
309
 
 
310
        def update_or_remove(is_pending, request):
 
311
            if is_pending:
 
312
                # Request is still in the queue.  Update the timestamp.
 
313
                request.timestamp = now
 
314
            elif request.timestamp < timeout:
 
315
                # Request was delivered, and is older than the threshold.
 
316
                request.remove()
 
317
 
 
318
        results = []
 
319
        for request in self._store.iter_hash_id_requests():
 
320
            if request.message_id is None:
 
321
                # May happen in some rare cases, when a send_message() is
 
322
                # interrupted abruptly.  If it just fails normally, the
 
323
                # request is removed and so we don't get here.
 
324
                request.remove()
 
325
            else:
 
326
                result = self._broker.is_message_pending(request.message_id)
 
327
                result.addCallback(update_or_remove, request)
 
328
                results.append(result)
 
329
 
 
330
        return gather_results(results)
 
331
 
 
332
    def request_unknown_hashes(self):
 
333
        """Detect available packages for which we have no hash=>id mappings.
 
334
 
 
335
        This method will verify if there are packages that Smart knows
 
336
        about but for which we don't have an id yet (no hash => id
 
337
        translation), and deliver a message (unknown-package-hashes)
 
338
        to request them.
 
339
 
 
340
        Hashes previously requested won't be requested again, unless they
 
341
        have already expired and removed from the database.
 
342
        """
 
343
        self._facade.ensure_channels_reloaded()
 
344
 
 
345
        unknown_hashes = set()
 
346
 
 
347
        for package in self._facade.get_packages():
 
348
            hash = self._facade.get_package_hash(package)
 
349
            if self._store.get_hash_id(hash) is None:
 
350
                unknown_hashes.add(self._facade.get_package_hash(package))
 
351
 
 
352
        # Discard unknown hashes in existent requests.
 
353
        for request in self._store.iter_hash_id_requests():
 
354
            unknown_hashes -= set(request.hashes)
 
355
 
 
356
        if not unknown_hashes:
 
357
            result = succeed(None)
 
358
        else:
 
359
            unknown_hashes = sorted(unknown_hashes)
 
360
            unknown_hashes = unknown_hashes[:MAX_UNKNOWN_HASHES_PER_REQUEST]
 
361
 
 
362
            logging.info("Queuing request for package hash => id "
 
363
                         "translation on %d hash(es)." % len(unknown_hashes))
 
364
 
 
365
            message = {"type": "unknown-package-hashes",
 
366
                       "hashes": unknown_hashes}
 
367
 
 
368
            result = self._send_message_with_hash_id_request(message,
 
369
                                                             unknown_hashes)
 
370
 
 
371
        return result
 
372
 
 
373
    def _send_message_with_hash_id_request(self, message, unknown_hashes):
 
374
        """Create a hash_id_request and send message with "request-id"."""
 
375
        request = self._store.add_hash_id_request(unknown_hashes)
 
376
        message["request-id"] = request.id
 
377
        result = self._broker.send_message(message, True)
 
378
 
 
379
        def set_message_id(message_id):
 
380
            request.message_id = message_id
 
381
 
 
382
        def send_message_failed(failure):
 
383
            request.remove()
 
384
            return failure
 
385
 
 
386
        return result.addCallbacks(set_message_id, send_message_failed)
 
387
 
 
388
    def detect_changes(self):
 
389
        """Detect all changes concerning packages.
 
390
 
 
391
        If some changes were detected with respect to our last run, then an
 
392
        event of type 'package-data-changed' will be fired in the broker
 
393
        reactor.
 
394
        """
 
395
 
 
396
        def changes_detected(results):
 
397
            # Release all smart locks, in case the changer runs after us.
 
398
            self._facade.deinit()
 
399
            if True in results:
 
400
                # Something has changed, notify the broker.
 
401
                return self._broker.fire_event("package-data-changed")
 
402
 
 
403
        result = gather_results([self.detect_packages_changes(),
 
404
                                 self.detect_package_locks_changes()])
 
405
        return result.addCallback(changes_detected)
 
406
 
 
407
    def detect_packages_changes(self):
 
408
        """Detect changes in the universe of known packages.
 
409
 
 
410
        This method will verify if there are packages that:
 
411
 
 
412
        - are now installed, and were not;
 
413
        - are now available, and were not;
 
414
        - are now locked, and were not;
 
415
        - were previously available but are not anymore;
 
416
        - were previously installed but are not anymore;
 
417
        - were previously locked but are not anymore;
 
418
 
 
419
        Additionally it will report package locks that:
 
420
 
 
421
        - are now set, and were not;
 
422
        - were previously set but are not anymore;
 
423
 
 
424
        In all cases, the server is notified of the new situation
 
425
        with a "packages" message.
 
426
 
 
427
        @return: A deferred resulting in C{True} if package changes were
 
428
            detected with respect to the previous run, or C{False} otherwise.
 
429
        """
 
430
        self._facade.ensure_channels_reloaded()
 
431
 
 
432
        old_installed = set(self._store.get_installed())
 
433
        old_available = set(self._store.get_available())
 
434
        old_upgrades = set(self._store.get_available_upgrades())
 
435
        old_locked = set(self._store.get_locked())
 
436
 
 
437
        current_installed = set()
 
438
        current_available = set()
 
439
        current_upgrades = set()
 
440
        current_locked = set()
 
441
 
 
442
        for package in self._facade.get_packages():
 
443
            hash = self._facade.get_package_hash(package)
 
444
            id = self._store.get_hash_id(hash)
 
445
            if id is not None:
 
446
                if package.installed:
 
447
                    current_installed.add(id)
 
448
                    for loader in package.loaders:
 
449
                        # Is the package also in a non-installed
 
450
                        # loader?  IOW, "available".
 
451
                        if not loader.getInstalled():
 
452
                            current_available.add(id)
 
453
                            break
 
454
                else:
 
455
                    current_available.add(id)
 
456
 
 
457
                # Are there any packages that this package is an upgrade for?
 
458
                for upgrade in package.upgrades:
 
459
                    for provides in upgrade.providedby:
 
460
                        for provides_package in provides.packages:
 
461
                            if provides_package.installed:
 
462
                                current_upgrades.add(id)
 
463
                                break
 
464
                        else:
 
465
                            continue
 
466
                        break
 
467
                    else:
 
468
                        continue
 
469
                    break
 
470
 
 
471
        for package in self._facade.get_locked_packages():
 
472
            hash = self._facade.get_package_hash(package)
 
473
            id = self._store.get_hash_id(hash)
 
474
            if id is not None:
 
475
                current_locked.add(id)
 
476
 
 
477
        new_installed = current_installed - old_installed
 
478
        new_available = current_available - old_available
 
479
        new_upgrades = current_upgrades - old_upgrades
 
480
        new_locked = current_locked - old_locked
 
481
 
 
482
        not_installed = old_installed - current_installed
 
483
        not_available = old_available - current_available
 
484
        not_upgrades = old_upgrades - current_upgrades
 
485
        not_locked = old_locked - current_locked
 
486
 
 
487
        message = {}
 
488
        if new_installed:
 
489
            message["installed"] = \
 
490
                list(sequence_to_ranges(sorted(new_installed)))
 
491
        if new_available:
 
492
            message["available"] = \
 
493
                list(sequence_to_ranges(sorted(new_available)))
 
494
        if new_upgrades:
 
495
            message["available-upgrades"] = \
 
496
                list(sequence_to_ranges(sorted(new_upgrades)))
 
497
        if new_locked:
 
498
            message["locked"] = \
 
499
                list(sequence_to_ranges(sorted(new_locked)))
 
500
 
 
501
        if not_installed:
 
502
            message["not-installed"] = \
 
503
                list(sequence_to_ranges(sorted(not_installed)))
 
504
        if not_available:
 
505
            message["not-available"] = \
 
506
                list(sequence_to_ranges(sorted(not_available)))
 
507
        if not_upgrades:
 
508
            message["not-available-upgrades"] = \
 
509
                list(sequence_to_ranges(sorted(not_upgrades)))
 
510
        if not_locked:
 
511
            message["not-locked"] = \
 
512
                list(sequence_to_ranges(sorted(not_locked)))
 
513
 
 
514
        if not message:
 
515
            return succeed(False)
 
516
 
 
517
        message["type"] = "packages"
 
518
        result = self._broker.send_message(message, True)
 
519
 
 
520
        logging.info("Queuing message with changes in known packages: "
 
521
                     "%d installed, %d available, %d available upgrades, "
 
522
                     "%d locked, %d not installed, %d not available, "
 
523
                     "%d not available upgrades, %d not locked."
 
524
                     % (len(new_installed), len(new_available),
 
525
                        len(new_upgrades), len(new_locked),
 
526
                        len(not_installed), len(not_available),
 
527
                        len(not_upgrades), len(not_locked)))
 
528
 
 
529
        def update_currently_known(result):
 
530
            if new_installed:
 
531
                self._store.add_installed(new_installed)
 
532
            if not_installed:
 
533
                self._store.remove_installed(not_installed)
 
534
            if new_available:
 
535
                self._store.add_available(new_available)
 
536
            if new_locked:
 
537
                self._store.add_locked(new_locked)
 
538
            if not_available:
 
539
                self._store.remove_available(not_available)
 
540
            if new_upgrades:
 
541
                self._store.add_available_upgrades(new_upgrades)
 
542
            if not_upgrades:
 
543
                self._store.remove_available_upgrades(not_upgrades)
 
544
            if not_locked:
 
545
                self._store.remove_locked(not_locked)
 
546
            # Something has changed wrt the former run, let's return True
 
547
            return True
 
548
 
 
549
        result.addCallback(update_currently_known)
 
550
 
 
551
        return result
 
552
 
 
553
    def detect_package_locks_changes(self):
 
554
        """Detect changes in known package locks.
 
555
 
 
556
        This method will verify if there are package locks that:
 
557
 
 
558
        - are now set, and were not;
 
559
        - were previously set but are not anymore;
 
560
 
 
561
        In all cases, the server is notified of the new situation
 
562
        with a "packages" message.
 
563
 
 
564
        @return: A deferred resulting in C{True} if package lock changes were
 
565
            detected with respect to the previous run, or C{False} otherwise.
 
566
        """
 
567
        old_package_locks = set(self._store.get_package_locks())
 
568
        current_package_locks = set(self._facade.get_package_locks())
 
569
 
 
570
        set_package_locks = current_package_locks - old_package_locks
 
571
        unset_package_locks = old_package_locks - current_package_locks
 
572
 
 
573
        message = {}
 
574
        if set_package_locks:
 
575
            message["created"] = sorted(set_package_locks)
 
576
        if unset_package_locks:
 
577
            message["deleted"] = sorted(unset_package_locks)
 
578
 
 
579
        if not message:
 
580
            return succeed(False)
 
581
 
 
582
        message["type"] = "package-locks"
 
583
        result = self._broker.send_message(message, True)
 
584
 
 
585
        logging.info("Queuing message with changes in known package locks:"
 
586
                     " %d created, %d deleted." %
 
587
                     (len(set_package_locks), len(unset_package_locks)))
 
588
 
 
589
        def update_currently_known(result):
 
590
            if set_package_locks:
 
591
                self._store.add_package_locks(set_package_locks)
 
592
            if unset_package_locks:
 
593
                self._store.remove_package_locks(unset_package_locks)
 
594
            return True
 
595
 
 
596
        result.addCallback(update_currently_known)
 
597
 
 
598
        return result
 
599
 
 
600
 
 
601
def main(args):
 
602
    return run_task_handler(PackageReporter, args)
 
603
 
 
604
 
 
605
def find_reporter_command():
 
606
    dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
 
607
    return os.path.join(dirname, "landscape-package-reporter")