7
from twisted.internet.defer import Deferred, succeed
8
from twisted.internet.utils import getProcessOutputAndValue
10
from landscape.lib.sequenceranges import sequence_to_ranges
11
from landscape.lib.twisted_util import gather_results
12
from landscape.lib.fetch import fetch_async
13
from landscape.lib.fs import touch_file
15
from landscape.package.taskhandler import (
16
PackageTaskHandlerConfiguration, PackageTaskHandler, run_task_handler)
17
from landscape.package.store import UnknownHashIDRequest
20
HASH_ID_REQUEST_TIMEOUT = 7200
21
MAX_UNKNOWN_HASHES_PER_REQUEST = 500
24
class PackageReporterConfiguration(PackageTaskHandlerConfiguration):
25
"""Specialized configuration for the Landscape package-reporter."""
27
def make_parser(self):
29
Specialize L{Configuration.make_parser}, adding options
30
reporter-specific options.
32
parser = super(PackageReporterConfiguration, self).make_parser()
33
parser.add_option("--force-smart-update", default=False,
35
help="Force running smart-update.")
39
class PackageReporter(PackageTaskHandler):
40
"""Report information about the system packages.
42
@cvar queue_name: Name of the task queue to pick tasks from.
43
@cvar smart_update_interval: Time interval in minutes to pass to
44
the C{--after} command line option of C{smart-update}.
46
config_factory = PackageReporterConfiguration
48
queue_name = "reporter"
50
smart_update_interval = 60
51
smart_update_filename = "/usr/lib/landscape/smart-update"
52
sources_list_filename = "/etc/apt/sources.list"
53
sources_list_directory = "/etc/apt/sources.list.d"
58
# Run smart-update before anything else, to make sure that
59
# the SmartFacade will load freshly updated channels
60
result.addCallback(lambda x: self.run_smart_update())
62
# If the appropriate hash=>id db is not there, fetch it
63
result.addCallback(lambda x: self.fetch_hash_id_db())
65
# Attach the hash=>id database if available
66
result.addCallback(lambda x: self.use_hash_id_db())
68
# Now, handle any queued tasks.
69
result.addCallback(lambda x: self.handle_tasks())
71
# Then, remove any expired hash=>id translation requests.
72
result.addCallback(lambda x: self.remove_expired_hash_id_requests())
74
# After that, check if we have any unknown hashes to request.
75
result.addCallback(lambda x: self.request_unknown_hashes())
77
# Finally, verify if we have anything new to report to the server.
78
result.addCallback(lambda x: self.detect_changes())
83
def fetch_hash_id_db(self):
85
Fetch the appropriate pre-canned database of hash=>id mappings
86
from the server. If the database is already present, it won't
89
The format of the database filename is <uuid>_<codename>_<arch>,
90
and it will be downloaded from the HTTP directory set in
91
config.package_hash_id_url, or config.url/hash-id-databases if
92
the former is not set.
94
Fetch failures are handled gracefully and logged as appropriate.
97
def fetch_it(hash_id_db_filename):
99
if hash_id_db_filename is None:
100
# Couldn't determine which hash=>id database to fetch,
101
# just ignore the failure and go on
104
if os.path.exists(hash_id_db_filename):
105
# We don't download twice
108
base_url = self._get_hash_id_db_base_url()
110
logging.warning("Can't determine the hash=>id database url")
113
# Cast to str as pycurl doesn't like unicode
114
url = str(base_url + os.path.basename(hash_id_db_filename))
117
hash_id_db_fd = open(hash_id_db_filename, "w")
118
hash_id_db_fd.write(data)
119
hash_id_db_fd.close()
120
logging.info("Downloaded hash=>id database from %s" % url)
122
def fetch_error(failure):
123
exception = failure.value
124
logging.warning("Couldn't download hash=>id database: %s" %
127
result = fetch_async(url,
128
cainfo=self._config.get("ssl_public_key"))
129
result.addCallback(fetch_ok)
130
result.addErrback(fetch_error)
134
result = self._determine_hash_id_db_filename()
135
result.addCallback(fetch_it)
138
def _get_hash_id_db_base_url(self):
140
base_url = self._config.get("package_hash_id_url")
144
if not self._config.get("url"):
145
# We really have no idea where to download from
148
# If config.url is http://host:123/path/to/message-system
149
# then we'll use http://host:123/path/to/hash-id-databases
150
base_url = urlparse.urljoin(self._config.url.rstrip("/"),
153
return base_url.rstrip("/") + "/"
155
def _apt_sources_have_changed(self):
156
"""Return a boolean indicating if the APT sources were modified."""
157
from landscape.monitor.packagemonitor import PackageMonitor
161
if os.path.exists(self.sources_list_filename):
162
filenames.append(self.sources_list_filename)
164
if os.path.exists(self.sources_list_directory):
166
[os.path.join(self.sources_list_directory, filename) for
167
filename in os.listdir(self.sources_list_directory)])
169
for filename in filenames:
170
seconds_since_last_change = (
171
time.time() - os.path.getmtime(filename))
172
if seconds_since_last_change < PackageMonitor.run_interval:
177
def run_smart_update(self):
178
"""Run smart-update and log a warning in case of non-zero exit code.
180
@return: a deferred returning (out, err, code)
182
if self._config.force_smart_update or self._apt_sources_have_changed():
185
args = ("--after", "%d" % self.smart_update_interval)
186
result = getProcessOutputAndValue(self.smart_update_filename,
189
def callback((out, err, code)):
190
# smart-update --after N will exit with error code 1 when it
191
# doesn't actually run the update code because to enough time
192
# has passed yet, but we don't actually consider it a failure.
194
if code != 0 and code != 1:
196
if code == 1 and err.strip() != "":
199
logging.warning("'%s' exited with status %d (%s)" % (
200
self.smart_update_filename, code, err))
201
logging.debug("'%s' exited with status %d (out='%s', err='%s'" % (
202
self.smart_update_filename, code, out, err))
203
touch_file(self._config.smart_update_stamp_filename)
204
return (out, err, code)
206
result.addCallback(callback)
209
def handle_task(self, task):
211
if message["type"] == "package-ids":
212
return self._handle_package_ids(message)
213
if message["type"] == "resynchronize":
214
return self._handle_resynchronize()
216
def _handle_package_ids(self, message):
220
request = self._store.get_hash_id_request(message["request-id"])
221
except UnknownHashIDRequest:
222
# We've lost this request somehow. It will be re-requested later.
227
for hash, id in zip(request.hashes, message["ids"]):
229
unknown_hashes.append(hash)
233
self._store.set_hash_ids(hash_ids)
235
logging.info("Received %d package hash => id translations, %d hashes "
236
"are unknown." % (len(hash_ids), len(unknown_hashes)))
239
result = self._handle_unknown_packages(unknown_hashes)
241
result = succeed(None)
243
# Remove the request if everything goes well.
244
result.addCallback(lambda x: request.remove())
248
def _handle_resynchronize(self):
249
self._store.clear_available()
250
self._store.clear_available_upgrades()
251
self._store.clear_installed()
252
self._store.clear_locked()
253
self._store.clear_package_locks()
255
# Don't clear the hash_id_requests table because the messages
256
# associated with the existing requests might still have to be
257
# delivered, and if we clear the table and later create a new request,
258
# that new request could get the same id of one of the deleted ones,
259
# and when the pending message eventually gets delivered the reporter
260
# would think that the message is associated to the newly created
261
# request, as it have the same id has the deleted request the message
262
# actually refers to. This would cause the ids in the message to be
263
# possibly mapped to the wrong hashes.
265
# This problem would happen for example when switching the client from
266
# one Landscape server to another, because the uuid-changed event would
267
# cause a resynchronize task to be created by the monitor. See #417122.
271
def _handle_unknown_packages(self, hashes):
273
self._facade.ensure_channels_reloaded()
278
for package in self._facade.get_packages():
279
hash = self._facade.get_package_hash(package)
281
added_hashes.append(hash)
282
skeleton = self._facade.get_package_skeleton(package)
283
packages.append({"type": skeleton.type,
284
"name": skeleton.name,
285
"version": skeleton.version,
286
"section": skeleton.section,
287
"summary": skeleton.summary,
288
"description": skeleton.description,
289
"size": skeleton.size,
290
"installed-size": skeleton.installed_size,
291
"relations": skeleton.relations})
294
logging.info("Queuing messages with data for %d packages to "
295
"exchange urgently." % len(packages))
297
message = {"type": "add-packages", "packages": packages}
299
result = self._send_message_with_hash_id_request(message,
302
result = succeed(None)
306
def remove_expired_hash_id_requests(self):
308
timeout = now - HASH_ID_REQUEST_TIMEOUT
310
def update_or_remove(is_pending, request):
312
# Request is still in the queue. Update the timestamp.
313
request.timestamp = now
314
elif request.timestamp < timeout:
315
# Request was delivered, and is older than the threshold.
319
for request in self._store.iter_hash_id_requests():
320
if request.message_id is None:
321
# May happen in some rare cases, when a send_message() is
322
# interrupted abruptly. If it just fails normally, the
323
# request is removed and so we don't get here.
326
result = self._broker.is_message_pending(request.message_id)
327
result.addCallback(update_or_remove, request)
328
results.append(result)
330
return gather_results(results)
332
def request_unknown_hashes(self):
333
"""Detect available packages for which we have no hash=>id mappings.
335
This method will verify if there are packages that Smart knows
336
about but for which we don't have an id yet (no hash => id
337
translation), and deliver a message (unknown-package-hashes)
340
Hashes previously requested won't be requested again, unless they
341
have already expired and removed from the database.
343
self._facade.ensure_channels_reloaded()
345
unknown_hashes = set()
347
for package in self._facade.get_packages():
348
hash = self._facade.get_package_hash(package)
349
if self._store.get_hash_id(hash) is None:
350
unknown_hashes.add(self._facade.get_package_hash(package))
352
# Discard unknown hashes in existent requests.
353
for request in self._store.iter_hash_id_requests():
354
unknown_hashes -= set(request.hashes)
356
if not unknown_hashes:
357
result = succeed(None)
359
unknown_hashes = sorted(unknown_hashes)
360
unknown_hashes = unknown_hashes[:MAX_UNKNOWN_HASHES_PER_REQUEST]
362
logging.info("Queuing request for package hash => id "
363
"translation on %d hash(es)." % len(unknown_hashes))
365
message = {"type": "unknown-package-hashes",
366
"hashes": unknown_hashes}
368
result = self._send_message_with_hash_id_request(message,
373
def _send_message_with_hash_id_request(self, message, unknown_hashes):
374
"""Create a hash_id_request and send message with "request-id"."""
375
request = self._store.add_hash_id_request(unknown_hashes)
376
message["request-id"] = request.id
377
result = self._broker.send_message(message, True)
379
def set_message_id(message_id):
380
request.message_id = message_id
382
def send_message_failed(failure):
386
return result.addCallbacks(set_message_id, send_message_failed)
388
def detect_changes(self):
389
"""Detect all changes concerning packages.
391
If some changes were detected with respect to our last run, then an
392
event of type 'package-data-changed' will be fired in the broker
396
def changes_detected(results):
397
# Release all smart locks, in case the changer runs after us.
398
self._facade.deinit()
400
# Something has changed, notify the broker.
401
return self._broker.fire_event("package-data-changed")
403
result = gather_results([self.detect_packages_changes(),
404
self.detect_package_locks_changes()])
405
return result.addCallback(changes_detected)
407
def detect_packages_changes(self):
408
"""Detect changes in the universe of known packages.
410
This method will verify if there are packages that:
412
- are now installed, and were not;
413
- are now available, and were not;
414
- are now locked, and were not;
415
- were previously available but are not anymore;
416
- were previously installed but are not anymore;
417
- were previously locked but are not anymore;
419
Additionally it will report package locks that:
421
- are now set, and were not;
422
- were previously set but are not anymore;
424
In all cases, the server is notified of the new situation
425
with a "packages" message.
427
@return: A deferred resulting in C{True} if package changes were
428
detected with respect to the previous run, or C{False} otherwise.
430
self._facade.ensure_channels_reloaded()
432
old_installed = set(self._store.get_installed())
433
old_available = set(self._store.get_available())
434
old_upgrades = set(self._store.get_available_upgrades())
435
old_locked = set(self._store.get_locked())
437
current_installed = set()
438
current_available = set()
439
current_upgrades = set()
440
current_locked = set()
442
for package in self._facade.get_packages():
443
hash = self._facade.get_package_hash(package)
444
id = self._store.get_hash_id(hash)
446
if package.installed:
447
current_installed.add(id)
448
for loader in package.loaders:
449
# Is the package also in a non-installed
450
# loader? IOW, "available".
451
if not loader.getInstalled():
452
current_available.add(id)
455
current_available.add(id)
457
# Are there any packages that this package is an upgrade for?
458
for upgrade in package.upgrades:
459
for provides in upgrade.providedby:
460
for provides_package in provides.packages:
461
if provides_package.installed:
462
current_upgrades.add(id)
471
for package in self._facade.get_locked_packages():
472
hash = self._facade.get_package_hash(package)
473
id = self._store.get_hash_id(hash)
475
current_locked.add(id)
477
new_installed = current_installed - old_installed
478
new_available = current_available - old_available
479
new_upgrades = current_upgrades - old_upgrades
480
new_locked = current_locked - old_locked
482
not_installed = old_installed - current_installed
483
not_available = old_available - current_available
484
not_upgrades = old_upgrades - current_upgrades
485
not_locked = old_locked - current_locked
489
message["installed"] = \
490
list(sequence_to_ranges(sorted(new_installed)))
492
message["available"] = \
493
list(sequence_to_ranges(sorted(new_available)))
495
message["available-upgrades"] = \
496
list(sequence_to_ranges(sorted(new_upgrades)))
498
message["locked"] = \
499
list(sequence_to_ranges(sorted(new_locked)))
502
message["not-installed"] = \
503
list(sequence_to_ranges(sorted(not_installed)))
505
message["not-available"] = \
506
list(sequence_to_ranges(sorted(not_available)))
508
message["not-available-upgrades"] = \
509
list(sequence_to_ranges(sorted(not_upgrades)))
511
message["not-locked"] = \
512
list(sequence_to_ranges(sorted(not_locked)))
515
return succeed(False)
517
message["type"] = "packages"
518
result = self._broker.send_message(message, True)
520
logging.info("Queuing message with changes in known packages: "
521
"%d installed, %d available, %d available upgrades, "
522
"%d locked, %d not installed, %d not available, "
523
"%d not available upgrades, %d not locked."
524
% (len(new_installed), len(new_available),
525
len(new_upgrades), len(new_locked),
526
len(not_installed), len(not_available),
527
len(not_upgrades), len(not_locked)))
529
def update_currently_known(result):
531
self._store.add_installed(new_installed)
533
self._store.remove_installed(not_installed)
535
self._store.add_available(new_available)
537
self._store.add_locked(new_locked)
539
self._store.remove_available(not_available)
541
self._store.add_available_upgrades(new_upgrades)
543
self._store.remove_available_upgrades(not_upgrades)
545
self._store.remove_locked(not_locked)
546
# Something has changed wrt the former run, let's return True
549
result.addCallback(update_currently_known)
553
def detect_package_locks_changes(self):
554
"""Detect changes in known package locks.
556
This method will verify if there are package locks that:
558
- are now set, and were not;
559
- were previously set but are not anymore;
561
In all cases, the server is notified of the new situation
562
with a "packages" message.
564
@return: A deferred resulting in C{True} if package lock changes were
565
detected with respect to the previous run, or C{False} otherwise.
567
old_package_locks = set(self._store.get_package_locks())
568
current_package_locks = set(self._facade.get_package_locks())
570
set_package_locks = current_package_locks - old_package_locks
571
unset_package_locks = old_package_locks - current_package_locks
574
if set_package_locks:
575
message["created"] = sorted(set_package_locks)
576
if unset_package_locks:
577
message["deleted"] = sorted(unset_package_locks)
580
return succeed(False)
582
message["type"] = "package-locks"
583
result = self._broker.send_message(message, True)
585
logging.info("Queuing message with changes in known package locks:"
586
" %d created, %d deleted." %
587
(len(set_package_locks), len(unset_package_locks)))
589
def update_currently_known(result):
590
if set_package_locks:
591
self._store.add_package_locks(set_package_locks)
592
if unset_package_locks:
593
self._store.remove_package_locks(unset_package_locks)
596
result.addCallback(update_currently_known)
602
return run_task_handler(PackageReporter, args)
605
def find_reporter_command():
606
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
607
return os.path.join(dirname, "landscape-package-reporter")