~ubuntu-core-dev/update-manager/main

« back to all changes in this revision

Viewing changes to UpdateManager/Core/MyCache.py

  • Committer: Benjamin Drung
  • Date: 2023-02-13 12:45:23 UTC
  • Revision ID: benjamin.drung@canonical.com-20230213124523-4f1nv2fyvsdl35jb
Fix pycodestyle complains by formatting Python code with black

```
black -C -l 79 .
```

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
from __future__ import absolute_import, print_function
24
24
 
25
25
import warnings
 
26
 
26
27
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
27
28
import apt
28
29
import apt_pkg
29
30
import logging
30
31
import os
 
32
 
31
33
try:
32
34
    from urllib.error import HTTPError
33
35
    from urllib.request import urlopen
44
46
import re
45
47
import DistUpgrade.DistUpgradeCache
46
48
from gettext import gettext as _
 
49
 
47
50
try:
48
51
    from launchpadlib.launchpad import Launchpad
49
52
except ImportError:
55
58
 
56
59
 
57
60
class HttpsChangelogsUnsupportedError(Exception):
58
 
    """ https changelogs with credentials are unsupported because of the
59
 
        lack of certitifcation validation in urllib2 which allows MITM
60
 
        attacks to steal the credentials
 
61
    """https changelogs with credentials are unsupported because of the
 
62
    lack of certitifcation validation in urllib2 which allows MITM
 
63
    attacks to steal the credentials
61
64
    """
 
65
 
62
66
    pass
63
67
 
64
68
 
76
80
        # check if the dpkg journal is ok (we need to do that here
77
81
        # too because libapt will only do it when it tries to lock
78
82
        # the packaging system)
79
 
        assert(not self._dpkgJournalDirty())
 
83
        assert not self._dpkgJournalDirty()
80
84
        # init the regular cache
81
85
        self._initDepCache()
82
86
        self.all_changes = {}
84
88
        # on broken packages, try to fix via saveDistUpgrade()
85
89
        if self._depcache.broken_count > 0:
86
90
            self.saveDistUpgrade()
87
 
        assert (self._depcache.broken_count == 0
88
 
                and self._depcache.del_count == 0)
 
91
        assert (
 
92
            self._depcache.broken_count == 0 and self._depcache.del_count == 0
 
93
        )
89
94
        self.launchpad = None
90
95
        # generate versioned_kernel_pkgs_regexp for later use
91
96
        apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
92
 
            "APT::VersionedKernelPackages")
 
97
            "APT::VersionedKernelPackages"
 
98
        )
93
99
        if apt_versioned_kernel_pkgs:
94
 
            self.versioned_kernel_pkgs_regexp = re.compile("(" + "|".join(
95
 
                ["^" + p for p in apt_versioned_kernel_pkgs]) + ")")
 
100
            self.versioned_kernel_pkgs_regexp = re.compile(
 
101
                "("
 
102
                + "|".join(["^" + p for p in apt_versioned_kernel_pkgs])
 
103
                + ")"
 
104
            )
96
105
            running_kernel_version = subprocess.check_output(
97
 
                ["uname", "-r"], universal_newlines=True).rstrip()
98
 
            self.running_kernel_pkgs_regexp = re.compile("(" + "|".join(
99
 
                [("^" + p + ".*" + running_kernel_version)
100
 
                 if not p.startswith(".*") else (running_kernel_version + p)
101
 
                 for p in apt_versioned_kernel_pkgs]) + ")")
 
106
                ["uname", "-r"], universal_newlines=True
 
107
            ).rstrip()
 
108
            self.running_kernel_pkgs_regexp = re.compile(
 
109
                "("
 
110
                + "|".join(
 
111
                    [
 
112
                        ("^" + p + ".*" + running_kernel_version)
 
113
                        if not p.startswith(".*")
 
114
                        else (running_kernel_version + p)
 
115
                        for p in apt_versioned_kernel_pkgs
 
116
                    ]
 
117
                )
 
118
                + ")"
 
119
            )
102
120
        else:
103
121
            self.versioned_kernel_pkgs_regexp = None
104
122
            self.running_kernel_pkgs_regexp = None
108
126
        test if the dpkg journal is dirty
109
127
        (similar to debSystem::CheckUpdates)
110
128
        """
111
 
        d = os.path.dirname(
112
 
            apt_pkg.config.find_file("Dir::State::status")) + "/updates"
 
129
        d = (
 
130
            os.path.dirname(apt_pkg.config.find_file("Dir::State::status"))
 
131
            + "/updates"
 
132
        )
113
133
        for f in os.listdir(d):
114
134
            if re.match("[0-9]+", f):
115
135
                return True
116
136
        return False
117
137
 
118
138
    def _initDepCache(self):
119
 
        #apt_pkg.config.set("Debug::pkgPolicy","1")
120
 
        #self.depcache = apt_pkg.GetDepCache(self.cache)
121
 
        #self._depcache = apt_pkg.GetDepCache(self._cache)
 
139
        # apt_pkg.config.set("Debug::pkgPolicy","1")
 
140
        # self.depcache = apt_pkg.GetDepCache(self.cache)
 
141
        # self._depcache = apt_pkg.GetDepCache(self._cache)
122
142
        self._depcache.read_pinfile()
123
143
        if os.path.exists(SYNAPTIC_PINFILE):
124
144
            self._depcache.read_pinfile(SYNAPTIC_PINFILE)
129
149
 
130
150
    @property
131
151
    def required_download(self):
132
 
        """ get the size of the packages that are required to download """
 
152
        """get the size of the packages that are required to download"""
133
153
        pm = apt_pkg.PackageManager(self._depcache)
134
154
        fetcher = apt_pkg.Acquire()
135
155
        pm.get_archives(fetcher, self._list, self._records)
154
174
                continue
155
175
            match = True
156
176
            for base_dep in dep_or:
157
 
                if (base_dep.name != target.package.shortname
 
177
                if (
 
178
                    base_dep.name != target.package.shortname
158
179
                    or not apt_pkg.check_dep(
159
 
                        target.version, base_dep.relation, base_dep.version)):
 
180
                        target.version, base_dep.relation, base_dep.version
 
181
                    )
 
182
                ):
160
183
                    match = False
161
184
            if match:
162
185
                return True
169
192
        for cpkg in self:
170
193
            candidate = cpkg.candidate
171
194
            if candidate is not None:
172
 
                if (self._check_dependencies(
173
 
                        target, candidate.get_dependencies("Conflicts"))
174
 
                    and self._check_dependencies(
175
 
                        target, candidate.get_dependencies("Replaces"))):
 
195
                if self._check_dependencies(
 
196
                    target, candidate.get_dependencies("Conflicts")
 
197
                ) and self._check_dependencies(
 
198
                    target, candidate.get_dependencies("Replaces")
 
199
                ):
176
200
                    logging.info(
177
 
                        "%s Conflicts/Replaces %s; allowing removal" % (
178
 
                            candidate.package.shortname, pkg.shortname))
 
201
                        "%s Conflicts/Replaces %s; allowing removal"
 
202
                        % (candidate.package.shortname, pkg.shortname)
 
203
                    )
179
204
                    return True
180
205
        return False
181
206
 
182
207
    def saveDistUpgrade(self):
183
 
        """ this functions mimics a upgrade but will never remove anything """
184
 
        #self._apply_dselect_upgrade()
 
208
        """this functions mimics a upgrade but will never remove anything"""
 
209
        # self._apply_dselect_upgrade()
185
210
        self._depcache.upgrade(True)
186
211
        wouldDelete = self._depcache.del_count
187
212
        if wouldDelete > 0:
192
217
                    wouldDelete -= 1
193
218
        if wouldDelete > 0:
194
219
            self.clear()
195
 
            assert (self._depcache.broken_count == 0
196
 
                    and self._depcache.del_count == 0)
 
220
            assert (
 
221
                self._depcache.broken_count == 0
 
222
                and self._depcache.del_count == 0
 
223
            )
197
224
        else:
198
225
            assert self._depcache.broken_count == 0
199
 
        #self._apply_dselect_upgrade()
 
226
        # self._apply_dselect_upgrade()
200
227
        self._depcache.upgrade()
201
228
        return wouldDelete
202
229
 
203
230
    def _strip_epoch(self, verstr):
204
 
        " strip of the epoch "
 
231
        "strip of the epoch"
205
232
        vers_no_epoch = verstr.split(":")
206
233
        if len(vers_no_epoch) > 1:
207
234
            verstr = "".join(vers_no_epoch[1:])
208
235
        return verstr
209
236
 
210
 
    def _get_changelog_or_news(self, name, fname, strict_versioning=False,
211
 
                               changelogs_uri=None):
212
 
        " helper that fetches the file in question "
 
237
    def _get_changelog_or_news(
 
238
        self, name, fname, strict_versioning=False, changelogs_uri=None
 
239
    ):
 
240
        "helper that fetches the file in question"
213
241
        # don't touch the gui in this function, it needs to be thread-safe
214
242
        pkg = self[name]
215
243
 
240
268
        if changelogs_uri:
241
269
            uri = changelogs_uri
242
270
        else:
243
 
            uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg,
244
 
                                    srcver, fname)
 
271
            uri = CHANGELOGS_URI % (
 
272
                src_section,
 
273
                prefix,
 
274
                srcpkg,
 
275
                srcpkg,
 
276
                srcver,
 
277
                fname,
 
278
            )
245
279
 
246
280
        # https uris are not supported when they contain a username/password
247
281
        # because the urllib2 https implementation will not check certificates
251
285
        if res.scheme == "https" and res.username:
252
286
            raise HttpsChangelogsUnsupportedError(
253
287
                "https locations with username/password are not"
254
 
                "supported to fetch changelogs")
 
288
                "supported to fetch changelogs"
 
289
            )
255
290
 
256
291
        # print("Trying: %s " % uri)
257
292
        changelog = urlopen(uri)
258
 
        #print(changelog.read())
 
293
        # print(changelog.read())
259
294
        # do only get the lines that are new
260
295
        alllines = ""
261
296
        regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(srcpkg))
282
317
                #
283
318
                # for NEWS.Debian we do require the changelogver > installed
284
319
                if strict_versioning:
285
 
                    if (installed
286
 
                            and apt_pkg.version_compare(changelogver,
287
 
                                                        installed) < 0):
 
320
                    if (
 
321
                        installed
 
322
                        and apt_pkg.version_compare(changelogver, installed)
 
323
                        < 0
 
324
                    ):
288
325
                        break
289
326
                else:
290
 
                    if (installed
291
 
                            and apt_pkg.version_compare(changelogver,
292
 
                                                        installed) == 0):
 
327
                    if (
 
328
                        installed
 
329
                        and apt_pkg.version_compare(changelogver, installed)
 
330
                        == 0
 
331
                    ):
293
332
                        break
294
333
            alllines = alllines + line
295
334
        return alllines
300
339
        Return None in case of an error.
301
340
        """
302
341
        if not Launchpad:
303
 
            logging.warning("Launchpadlib not available, cannot retrieve PPA "
304
 
                            "changelog")
 
342
            logging.warning(
 
343
                "Launchpadlib not available, cannot retrieve PPA changelog"
 
344
            )
305
345
            return None
306
346
 
307
347
        cdt = self[name].candidate
308
348
        for uri in cdt.uris:
309
 
            if urlsplit(uri).hostname != 'ppa.launchpad.net':
 
349
            if urlsplit(uri).hostname != "ppa.launchpad.net":
310
350
                continue
311
 
            match = re.search('http.*/(.*)/(.*)/ubuntu/.*', uri)
 
351
            match = re.search("http.*/(.*)/(.*)/ubuntu/.*", uri)
312
352
            if match is not None:
313
353
                user, ppa = match.group(1), match.group(2)
314
354
                break
318
358
 
319
359
        # Login on launchpad if we are not already
320
360
        if self.launchpad is None:
321
 
            self.launchpad = Launchpad.login_anonymously('update-manager',
322
 
                                                         'production',
323
 
                                                         version='devel')
 
361
            self.launchpad = Launchpad.login_anonymously(
 
362
                "update-manager", "production", version="devel"
 
363
            )
324
364
 
325
365
        archive = self.launchpad.archives.getByReference(
326
 
            reference='~%s/ubuntu/%s' % (user, ppa)
 
366
            reference="~%s/ubuntu/%s" % (user, ppa)
327
367
        )
328
368
        if archive is None:
329
 
            logging.error("Unable to retrieve the archive from the Launchpad "
330
 
                          "API.")
 
369
            logging.error(
 
370
                "Unable to retrieve the archive from the Launchpad API."
 
371
            )
331
372
            return
332
373
 
333
 
        spphs = archive.getPublishedSources(source_name=cdt.source_name,
334
 
                                            exact_match=True,
335
 
                                            version=cdt.source_version)
 
374
        spphs = archive.getPublishedSources(
 
375
            source_name=cdt.source_name,
 
376
            exact_match=True,
 
377
            version=cdt.source_version,
 
378
        )
336
379
        if not spphs:
337
 
            logging.error("No published sources were retrieved from the "
338
 
                          "Launchpad API.")
 
380
            logging.error(
 
381
                "No published sources were retrieved from the "
 
382
                "Launchpad API."
 
383
            )
339
384
            return
340
385
 
341
386
        return spphs[0].changelogUrl()
358
403
        return base_uri + "/%s_%s.changelog" % (srcpkg, srcver)
359
404
 
360
405
    def _guess_third_party_changelogs_uri_by_binary(self, name):
361
 
        """ guess changelogs uri based on ArchiveURI by replacing .deb
362
 
            with .changelog
 
406
        """guess changelogs uri based on ArchiveURI by replacing .deb
 
407
        with .changelog
363
408
        """
364
409
        # there is always a pkg and a pkg.candidate, no need to add
365
410
        # check here
378
423
            pass
379
424
 
380
425
    def get_news(self, name):
381
 
        " get the NEWS.Debian file from the changelogs location "
 
426
        "get the NEWS.Debian file from the changelogs location"
382
427
        try:
383
428
            news = self._get_changelog_or_news(name, "NEWS.Debian", True)
384
429
        except Exception:
390
435
        # Special case for PPAs
391
436
        changelogs_uri_ppa = None
392
437
        for origin in origins:
393
 
            if origin.origin.startswith('LP-PPA-'):
 
438
            if origin.origin.startswith("LP-PPA-"):
394
439
                try:
395
440
                    changelogs_uri_ppa = self._extract_ppa_changelog_uri(name)
396
441
                    break
397
442
                except Exception:
398
 
                    logging.exception("Unable to connect to the Launchpad "
399
 
                                      "API.")
 
443
                    logging.exception(
 
444
                        "Unable to connect to the Launchpad API."
 
445
                    )
400
446
        # Try non official changelog location
401
 
        changelogs_uri_binary = \
 
447
        changelogs_uri_binary = (
402
448
            self._guess_third_party_changelogs_uri_by_binary(name)
403
 
        changelogs_uri_source = \
 
449
        )
 
450
        changelogs_uri_source = (
404
451
            self._guess_third_party_changelogs_uri_by_source(name)
 
452
        )
405
453
        error_message = ""
406
 
        for changelogs_uri in [changelogs_uri_ppa,
407
 
                               changelogs_uri_binary,
408
 
                               changelogs_uri_source]:
 
454
        for changelogs_uri in [
 
455
            changelogs_uri_ppa,
 
456
            changelogs_uri_binary,
 
457
            changelogs_uri_source,
 
458
        ]:
409
459
            if changelogs_uri:
410
460
                try:
411
461
                    changelog = self._get_changelog_or_news(
412
 
                        name, "changelog", False, changelogs_uri)
 
462
                        name, "changelog", False, changelogs_uri
 
463
                    )
413
464
                    self.all_changes[name] += changelog
414
465
                except (HTTPError, HttpsChangelogsUnsupportedError):
415
466
                    # no changelogs_uri or 404
416
467
                    error_message = _(
417
468
                        "This update does not come from a "
418
 
                        "source that supports changelogs.")
 
469
                        "source that supports changelogs."
 
470
                    )
419
471
                except (IOError, BadStatusLine, socket.error):
420
472
                    # network errors and others
421
473
                    logging.exception("error on changelog fetching")
422
474
                    error_message = _(
423
475
                        "Failed to download the list of changes. \n"
424
 
                        "Please check your Internet connection.")
 
476
                        "Please check your Internet connection."
 
477
                    )
425
478
        self.all_changes[name] += error_message
426
479
 
427
480
    def get_changelog(self, name):
428
 
        " get the changelog file from the changelog location "
 
481
        "get the changelog file from the changelog location"
429
482
        origins = self[name].candidate.origins
430
 
        self.all_changes[name] = _("Changes for %s versions:\n"
431
 
                                   "Installed version: %s\n"
432
 
                                   "Available version: %s\n\n") % \
433
 
            (name, getattr(self[name].installed, "version", None),
434
 
             self[name].candidate.version)
 
483
        self.all_changes[name] = _(
 
484
            "Changes for %s versions:\n"
 
485
            "Installed version: %s\n"
 
486
            "Available version: %s\n\n"
 
487
        ) % (
 
488
            name,
 
489
            getattr(self[name].installed, "version", None),
 
490
            self[name].candidate.version,
 
491
        )
435
492
        if self.CHANGELOG_ORIGIN not in [o.origin for o in origins]:
436
493
            self._fetch_changelog_for_third_party_package(name, origins)
437
494
            return
438
495
        # fixup epoch handling version
439
496
        srcpkg = self[name].candidate.source_name
440
 
        srcver_epoch = self[name].candidate.source_version.replace(':', '%3A')
 
497
        srcver_epoch = self[name].candidate.source_version.replace(":", "%3A")
441
498
        try:
442
499
            changelog = self._get_changelog_or_news(name, "changelog")
443
500
            if len(changelog) == 0:
444
 
                changelog = _("The changelog does not contain any relevant "
445
 
                              "changes.\n\n"
446
 
                              "Please use http://launchpad.net/ubuntu/+source/"
447
 
                              "%s/%s/+changelog\n"
448
 
                              "until the changes become available or try "
449
 
                              "again later.") % (srcpkg, srcver_epoch)
 
501
                changelog = _(
 
502
                    "The changelog does not contain any relevant changes.\n\n"
 
503
                    "Please use http://launchpad.net/ubuntu/+source/"
 
504
                    "%s/%s/+changelog\n"
 
505
                    "until the changes become available or try again later."
 
506
                ) % (srcpkg, srcver_epoch)
450
507
        except HTTPError:
451
 
            changelog = _("The list of changes is not available yet.\n\n"
452
 
                          "Please use http://launchpad.net/ubuntu/+source/"
453
 
                          "%s/%s/+changelog\n"
454
 
                          "until the changes become available or try again "
455
 
                          "later.") % (srcpkg, srcver_epoch)
 
508
            changelog = _(
 
509
                "The list of changes is not available yet.\n\n"
 
510
                "Please use http://launchpad.net/ubuntu/+source/"
 
511
                "%s/%s/+changelog\n"
 
512
                "until the changes become available or try again later."
 
513
            ) % (srcpkg, srcver_epoch)
456
514
        except (IOError, BadStatusLine, socket.error) as e:
457
515
            print("caught exception: ", e)
458
 
            changelog = _("Failed to download the list "
459
 
                          "of changes. \nPlease "
460
 
                          "check your Internet "
461
 
                          "connection.")
 
516
            changelog = _(
 
517
                "Failed to download the list of changes. \n"
 
518
                "Please check your Internet connection."
 
519
            )
462
520
        self.all_changes[name] += changelog