~laney/ubuntu-archive-tools/cm-show-fix-released

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#! /usr/bin/python

# Copyright (C) 2009, 2010, 2011, 2012 Canonical Ltd.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Apply suitable overrides to new kernel binaries, matching previous ones."""

from __future__ import print_function

import atexit
from collections import defaultdict
from contextlib import closing
import gzip
from optparse import OptionParser, Values
import os
import shutil
import sys
import tempfile
try:
    from urllib.request import urlopen
except ImportError:
    from urllib2 import urlopen

import apt_pkg
from launchpadlib.launchpad import Launchpad
from lazr.restfulclient.errors import ServerError
from ubuntutools.question import YesNoQuestion

import lputils


CONSUMER_KEY = "kernel-overrides"


tempdir = None


def ensure_tempdir():
    global tempdir
    if not tempdir:
        tempdir = tempfile.mkdtemp(prefix='kernel-overrides')
        atexit.register(shutil.rmtree, tempdir)


class FakeBPPH:
    def __init__(self, pkg, component, das):
        self.binary_package_name = pkg
        self.component_name = component
        self.distro_arch_series = das


def get_published_binaries(options, source):
    """If getPublishedBinaries times out, fall back to doing it by hand."""
    try:
        for binary in source.getPublishedBinaries():
            if not binary.is_debug:
                yield binary
    except ServerError as e:
        if e.response.status != 503:
            raise
        print("getPublishedBinaries timed out; fetching Packages instead ...")
        ensure_tempdir()
        for section_name in ("", "debian-installer"):
            for component in ("main", "restricted", "universe", "multiverse"):
                for das in options.old.series.architectures:
                    arch = das.architecture_tag
                    if arch in ("amd64", "i386"):
                        base = "http://archive.ubuntu.com/ubuntu"
                    else:
                        base = "http://ports.ubuntu.com/ubuntu-ports"
                    url = ("%s/dists/%s/%s%s/binary-%s/Packages.gz" %
                           (base, options.old.suite, component,
                            "/%s" % section_name if section_name else "",
                            arch))
                    path = os.path.join(
                        tempdir, "Ubuntu_%s_%s%s_Packages_%s" %
                        (options.old.suite, component,
                         "_%s" % section_name if section_name else "", arch))
                    with closing(urlopen(url)) as url_file:
                        with open("%s.gz" % path, "wb") as comp_file:
                            comp_file.write(url_file.read())
                    with closing(gzip.GzipFile("%s.gz" % path)) as gz_file:
                        with open(path, "wb") as out_file:
                            out_file.write(gz_file.read())
                    with open(path) as packages_file:
                        apt_packages = apt_pkg.TagFile(packages_file)
                        for section in apt_packages:
                            pkg = section["Package"]
                            src = section.get("Source", pkg).split(" ", 1)[0]
                            if src != options.source:
                                continue
                            yield FakeBPPH(pkg, component, das)


def find_current_binaries(options):
    print("Checking existing binaries in %s ..." % options.old.suite,
          file=sys.stderr)
    sources = options.old.archive.getPublishedSources(
        source_name=options.source, distro_series=options.old.series,
        pocket=options.old.pocket, exact_match=True, status="Published")
    for source in sources:
        binaries = defaultdict(dict)
        for binary in get_published_binaries(options, source):
            print(".", end="")
            sys.stdout.flush()
            arch = binary.distro_arch_series.architecture_tag
            name = binary.binary_package_name
            component = binary.component_name
            if name not in binaries[arch]:
                binaries[arch][name] = component
        if binaries:
            print()
            return binaries
    print()
    return []


def find_matching_uploads(options, newabi):
    print("Checking %s uploads to %s ..." %
          (options.queue.lower(), options.suite), file=sys.stderr)
    uploads = options.series.getPackageUploads(
        name=options.source, exact_match=True, archive=options.archive,
        pocket=options.pocket, status=options.queue)
    for upload in uploads:
        if upload.contains_build:
            # display_name is inaccurate for the theoretical case of an
            # upload containing multiple builds, but in practice it's close
            # enough.
            source = upload.display_name.split(",")[0]
            if source == options.source:
                binaries = upload.getBinaryProperties()
                binaries = [b for b in binaries if "customformat" not in b]
                if [b for b in binaries if newabi in b["version"]]:
                    yield upload, binaries


def equal_except_abi(old, new, abi):
    """Are OLD and NEW the same package name aside from ABI?"""
    # Make sure new always contains the ABI.
    if abi in old:
        old, new = new, old
    if abi not in new:
        return False

    left, _, right = new.partition(abi)
    if not old.startswith(left) or not old.endswith(right):
        return False
    old_abi = old[len(left):]
    if right:
        old_abi = old_abi[:-len(right)]
    return old_abi[0].isdigit() and old_abi[-1].isdigit()


def apply_kernel_overrides(options, newabi):
    current_binaries = find_current_binaries(options)
    all_changes = []

    for upload, binaries in find_matching_uploads(options, newabi):
        print("%s/%s (%s):" %
              (upload.package_name, upload.package_version,
               upload.display_arches.split(",")[0]))
        changes = []
        for binary in binaries:
            if binary["architecture"] not in current_binaries:
                continue
            current_binaries_arch = current_binaries[binary["architecture"]]
            for name, component in current_binaries_arch.items():
                if (binary["component"] != component and
                        equal_except_abi(name, binary["name"], newabi)):
                    print("\t%s: %s -> %s" %
                          (binary["name"], binary["component"], component))
                    changes.append(
                        {"name": binary["name"], "component": component})
        if changes:
            all_changes.append((upload, changes))

    if all_changes:
        if options.dry_run:
            print("Dry run; no changes made.")
        else:
            if not options.confirm_all:
                if YesNoQuestion().ask("Override", "no") == "no":
                    return
            for upload, changes in all_changes:
                upload.overrideBinaries(changes=changes)


def main():
    parser = OptionParser(usage="usage: %prog [options] NEW-ABI")
    parser.add_option(
        "-l", "--launchpad", dest="launchpad_instance", default="production")
    parser.add_option(
        "-d", "--distribution", metavar="DISTRO", default="ubuntu",
        help="look in distribution DISTRO")
    parser.add_option(
        "-S", "--suite", metavar="SUITE",
        help="look in suite SUITE (default: <current series>-proposed)")
    parser.add_option(
        "--old-suite", metavar="SUITE",
        help="look for previous binaries in suite SUITE "
             "(default: value of --suite without -proposed)")
    parser.add_option(
        "-s", "--source", metavar="SOURCE", default="linux",
        help="operate on source package SOURCE")
    parser.add_option(
        "-Q", "--queue", metavar="QUEUE", default="new",
        help="consider packages in QUEUE")
    parser.add_option(
        "-n", "--dry-run", default=False, action="store_true",
        help="don't make any modifications")
    parser.add_option(
        "-y", "--confirm-all", default=False, action="store_true",
        help="do not ask for confirmation")
    options, args = parser.parse_args()
    if len(args) != 1:
        parser.error("must supply NEW-ABI")
    newabi = args[0]

    options.launchpad = Launchpad.login_with(
        CONSUMER_KEY, options.launchpad_instance, version="devel")

    if options.suite is None:
        distribution = options.launchpad.distributions[options.distribution]
        options.suite = "%s-proposed" % distribution.current_series.name
    if options.old_suite is None:
        options.old_suite = options.suite
        if options.old_suite.endswith("-proposed"):
            options.old_suite = options.old_suite[:-9]
    options.queue = options.queue.title()
    options.version = None
    lputils.setup_location(options)
    options.old = Values()
    options.old.launchpad = options.launchpad
    options.old.distribution = options.distribution
    options.old.suite = options.old_suite
    lputils.setup_location(options.old)

    apply_kernel_overrides(options, newabi)


if __name__ == '__main__':
    main()