~laney/ubuntu-archive-tools/retry-autopkgtest-regressions-bileto-v2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
#! /usr/bin/python
# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3.

from __future__ import print_function

from collections import defaultdict
import logging
import optparse
import os
import re
import subprocess
import sys
import tempfile

import apt_pkg
from launchpadlib.errors import HTTPError
from launchpadlib.launchpad import Launchpad


re_extract_src_version = re.compile(r"(\S+)\s*\((.*)\)")


class ArchiveCruftCheckerError(Exception):
    """ArchiveCruftChecker specific exception.

    Mostly used to describe errors in the initialization of this object.
    """


class TagFileNotFound(Exception):
    """Raised when an archive tag file could not be found."""


class ArchiveCruftChecker:
    """Perform overall checks to identify and remove obsolete records.

    Use initialize() method to validate passed parameters and build the
    infrastructure variables. It will raise ArchiveCruftCheckerError if
    something goes wrong.
    """

    # XXX cprov 2006-05-15: the default archive path should come
    # from the config.
    def __init__(self, launchpad_instance='production',
                 distribution_name='ubuntu', suite=None,
                 archive_path='/srv/launchpad.net/ubuntu-archive'):
        """Store passed arguments.

        Also initialize empty variables for storing preliminary results.
        """
        self.launchpad = Launchpad.login_anonymously(
            'archive-cruft-check', launchpad_instance)
        self.distribution_name = distribution_name
        self.suite = suite
        self.archive_path = archive_path
        # initialize a group of variables to store temporary results
        # available versions of published sources
        self.source_versions = {}
        # available binaries produced by published sources
        self.source_binaries = {}
        # 'Not Build From Source' binaries
        self.nbs = defaultdict(lambda: defaultdict(dict))
        # published binary package names
        self.bin_pkgs = defaultdict(list)
        # Architecture specific binary packages
        self.arch_any = defaultdict(lambda: "0")
        # proposed NBS (before clean up)
        self.dubious_nbs = defaultdict(lambda: defaultdict(set))
        # NBS after clean up
        self.real_nbs = defaultdict(lambda: defaultdict(set))
        # definitive NBS organized for clean up
        self.nbs_to_remove = []

    @property
    def components_and_di(self):
        components_and_di = []
        for component in self.components:
            components_and_di.append(component)
            components_and_di.append('%s/debian-installer' % (component))
        return components_and_di

    @property
    def dist_archive(self):
        return os.path.join(
            self.archive_path, self.distro.name, 'dists', self.suite)

    def gunzipTagFileContent(self, filename):
        """Gunzip the contents of passed filename.

        Check filename presence, if not present in the filesystem,
        raises ArchiveCruftCheckerError. Use an tempfile.mkstemp()
        to store the uncompressed content. Invoke system available
        gunzip`, raises ArchiveCruftCheckError if it fails.

        This method doesn't close the file descriptor used and does not
        remove the temporary file from the filesystem, those actions
        are required in the callsite. (apt_pkg.TagFile is lazy)

        Return a tuple containing:
         * temp file descriptor
         * temp filename
         * the contents parsed by apt_pkg.TagFile()
        """
        if not os.path.exists(filename):
            raise TagFileNotFound("File does not exist: %s" % filename)

        temp_fd, temp_filename = tempfile.mkstemp()
        subprocess.check_call(['gunzip', '-c', filename], stdout=temp_fd)

        os.lseek(temp_fd, 0, os.SEEK_SET)
        temp_file = os.fdopen(temp_fd)
        # XXX cprov 2006-05-15: maybe we need some sort of data integrity
        # check at this point, and maybe keep the uncompressed file
        # for debug purposes, let's see how it behaves in real conditions.
        parsed_contents = apt_pkg.TagFile(temp_file)

        return temp_file, temp_filename, parsed_contents

    def processSources(self):
        """Process archive sources index.

        Build source_binaries, source_versions and bin_pkgs lists.
        """
        logging.debug("Considering Sources:")
        for component in self.components:
            filename = os.path.join(
                self.dist_archive, "%s/source/Sources.gz" % component)

            logging.debug("Processing %s" % filename)
            try:
                temp_fd, temp_filename, parsed_sources = (
                    self.gunzipTagFileContent(filename))
            except TagFileNotFound as warning:
                logging.warning(warning)
                return
            try:
                for section in parsed_sources:
                    source = section.find("Package")
                    source_version = section.find("Version")
                    binaries = section.find("Binary")
                    for binary in [
                            item.strip() for item in binaries.split(',')]:
                        self.bin_pkgs[binary].append(source)

                    self.source_binaries[source] = binaries
                    self.source_versions[source] = source_version
            finally:
                # close fd and remove temporary file used to store
                # uncompressed tag file content from the filesystem.
                temp_fd.close()
                os.unlink(temp_filename)

    def buildNBS(self):
        """Build the group of 'not build from source' binaries"""
        # Checks based on the Packages files
        logging.debug("Building not built from source list (NBS):")
        for component in self.components_and_di:
            for architecture in self.architectures:
                self.buildArchNBS(component, architecture)

    def buildArchNBS(self, component, architecture):
        """Build NBS per architecture.

        Store results in self.nbs, also build architecture specific
        binaries group (stored in self.arch_any)
        """
        filename = os.path.join(
            self.dist_archive,
            "%s/binary-%s/Packages.gz" % (component, architecture))

        logging.debug("Processing %s" % filename)
        try:
            temp_fd, temp_filename, parsed_packages = (
                self.gunzipTagFileContent(filename))
        except TagFileNotFound as warning:
            logging.warn(warning)
            return

        try:
            for section in parsed_packages:
                package = section.find('Package')
                source = section.find('Source', "")
                version = section.find('Version')
                architecture = section.find('Architecture')

                if source == "":
                    source = package

                if source.find("(") != -1:
                    m = re_extract_src_version.match(source)
                    source = m.group(1)
                    version = m.group(2)

                if package not in self.bin_pkgs:
                    self.nbs[source][package][version] = ""

                if architecture != "all":
                    if apt_pkg.version_compare(
                            version, self.arch_any[package]) < 1:
                        self.arch_any[package] = version
        finally:
            # close fd and remove temporary file used to store uncompressed
            # tag file content from the filesystem.
            temp_fd.close()
            os.unlink(temp_filename)

    def addNBS(self, nbs_d, source, version, package):
        """Add a new entry in given organized nbs_d list

        Ensure the package is still published in the suite before add.
        """
        result = self.archive.getPublishedBinaries(
            binary_name=package, exact_match=True, status='Published')
        result = [bpph for bpph in result
                  if bpph.distro_arch_series_link in self.das_urls]

        if result:
            nbs_d[source][version].add(package)

    def refineNBS(self):
        """ Distinguish dubious from real NBS.

        They are 'dubious' if the version numbers match and 'real'
        if the versions don't match.
        It stores results in self.dubious_nbs and self.real_nbs.
        """
        for source in self.nbs:
            for package in self.nbs[source]:
                versions = sorted(
                    self.nbs[source][package], cmp=apt_pkg.version_compare)
                latest_version = versions.pop()

                source_version = self.source_versions.get(source, "0")

                if apt_pkg.version_compare(latest_version,
                                           source_version) == 0:
                    # We don't actually do anything with dubious_nbs for
                    # now, so let's not waste time computing it.
                    #self.addNBS(self.dubious_nbs, source, latest_version,
                    #            package)
                    pass
                else:
                    self.addNBS(self.real_nbs, source, latest_version,
                                package)

    def outputNBS(self):
        """Properly display built NBS entries.

        Also organize the 'real' NBSs for removal in self.nbs_to_remove
        attribute.
        """
        output = "Not Built from Source\n"
        output += "---------------------\n\n"

        nbs_keys = sorted(self.real_nbs)

        for source in nbs_keys:
            proposed_bin = self.source_binaries.get(
                source, "(source does not exist)")
            proposed_version = self.source_versions.get(source, "??")
            output += (" * %s_%s builds: %s\n"
                       % (source, proposed_version, proposed_bin))
            output += "\tbut no longer builds:\n"
            versions = sorted(
                self.real_nbs[source], cmp=apt_pkg.version_compare)

            for version in versions:
                packages = sorted(self.real_nbs[source][version])

                for pkg in packages:
                    self.nbs_to_remove.append(pkg)

                output += "        o %s: %s\n" % (
                    version, ", ".join(packages))

            output += "\n"

        if self.nbs_to_remove:
            print(output)
        else:
            logging.debug("No NBS found")

    def run(self):
        """Initialize and build required lists of obsolete entries in archive.

        Check integrity of passed parameters and store organised data.
        The result list is the self.nbs_to_remove which should contain
        obsolete packages not currently able to be built from again.
        Another preliminary lists can be inspected in order to have better
        idea of what was computed.
        If anything goes wrong mid-process, it raises ArchiveCruftCheckError,
        otherwise a list of packages to be removes is printed.
        """
        try:
            self.distro = self.launchpad.distributions[
                self.distribution_name]
        except KeyError:
            raise ArchiveCruftCheckerError(
                "Invalid distribution: '%s'" % self.distribution_name)

        if not self.suite:
            self.distroseries = self.distro.current_series
            self.suite = self.distroseries.name
        else:
            try:
                self.distroseries = self.distro.getSeries(
                    name_or_version=self.suite.split('-')[0])
            except HTTPError:
                raise ArchiveCruftCheckerError(
                    "Invalid suite: '%s'" % self.suite)

        if not os.path.exists(self.dist_archive):
            raise ArchiveCruftCheckerError(
                "Invalid archive path: '%s'" % self.dist_archive)

        self.archive = self.distro.main_archive
        self.distroarchseries = list(self.distroseries.architectures)
        self.das_urls = [das.self_link for das in self.distroarchseries]
        self.architectures = [a.architecture_tag
                              for a in self.distroarchseries]
        self.components = self.distroseries.component_names

        apt_pkg.init()
        self.processSources()
        self.buildNBS()
        self.refineNBS()
        self.outputNBS()


def main():
    parser = optparse.OptionParser()

    parser.add_option(
        "-l", "--launchpad", dest="launchpad_instance", default="production")
    parser.add_option(
        "-d", "--distro", dest="distro", default="ubuntu", help="check DISTRO")
    parser.add_option(
        "-s", "--suite", dest="suite", help="only act on SUITE")
    parser.add_option(
        "-n", "--no-action", dest="action", default=True, action="store_false",
        help="unused compatibility option")
    parser.add_option(
        "-v", "--verbose", dest="verbose", default=False, action="store_true",
        help="emit verbose debugging messages")

    options, args = parser.parse_args()

    if args:
        archive_path = args[0]
    else:
        logging.error('Archive path is required')
        return 1

    if options.verbose:
        logging.basicConfig(level=logging.DEBUG)

    checker = ArchiveCruftChecker(
        launchpad_instance=options.launchpad_instance,
        distribution_name=options.distro, suite=options.suite,
        archive_path=archive_path)
    checker.run()

    return 0


if __name__ == '__main__':
    sys.exit(main())