~vorlon/ubuntu-archive-tools/what-next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
#! /usr/bin/env python

from __future__ import print_function

import atexit
from collections import namedtuple
import gzip
import optparse
import os
import re
import shutil
import subprocess
import tempfile
try:
    from urllib.parse import unquote
    from urllib.request import urlretrieve
except ImportError:
    from urllib import unquote, urlretrieve

import apt_pkg
from launchpadlib.launchpad import Launchpad


# from dak, more or less
re_no_epoch = re.compile(r"^\d+:")
re_strip_revision = re.compile(r"-[^-]+$")
re_changelog_versions = re.compile(r"^\w[-+0-9a-z.]+ \(([^\(\) \t]+)\)")

default_mirrors = ":".join([
    '/home/ubuntu-archive/mirror/ubuntu',
    '/srv/archive.ubuntu.com/ubuntu',
])
tempdir = None

series_by_name = {}


def ensure_tempdir():
    global tempdir
    if not tempdir:
        tempdir = tempfile.mkdtemp(prefix='copy-report')
        atexit.register(shutil.rmtree, tempdir)


def decompress_open(tagfile):
    if tagfile.startswith('http:') or tagfile.startswith('ftp:'):
        url = tagfile
        tagfile = urlretrieve(url)[0]

    if tagfile.endswith('.gz'):
        ensure_tempdir()
        decompressed = tempfile.mktemp(dir=tempdir)
        fin = gzip.GzipFile(filename=tagfile)
        with open(decompressed, 'wb') as fout:
            fout.write(fin.read())
        return open(decompressed, 'r')
    else:
        return open(tagfile, 'r')


Section = namedtuple("Section", ["version", "directory", "files"])


def tagfiletodict(tagfile):
    suite = {}
    for section in apt_pkg.TagFile(decompress_open(tagfile)):
        files = [s.strip().split()[2] for s in section["Files"].split('\n')]
        suite[section["Package"]] = Section(
            version=section["Version"], directory=section["Directory"],
            files=files)
    return suite


def find_dsc(options, pkg, section):
    dsc_filename = [s for s in section.files if s.endswith('.dsc')][0]
    for mirror in options.mirrors:
        path = '%s/%s/%s' % (mirror, section.directory, dsc_filename)
        if os.path.exists(path):
            yield path
    ensure_tempdir()
    spph = options.archive.getPublishedSources(
        source_name=pkg, version=section.version, exact_match=True)[0]
    outdir = tempfile.mkdtemp(dir=tempdir)
    filenames = []
    for url in spph.sourceFileUrls():
        filename = os.path.join(outdir, unquote(os.path.basename(url)))
        urlretrieve(url, filename)
        filenames.append(filename)
    yield [s for s in filenames if s.endswith('.dsc')][0]


class BrokenSourcePackage(Exception):
    pass


def get_changelog_versions(pkg, dsc, version):
    ensure_tempdir()

    upstream_version = re_no_epoch.sub('', version)
    upstream_version = re_strip_revision.sub('', upstream_version)

    with open(os.devnull, 'w') as devnull:
        ret = subprocess.call(
            ['dpkg-source', '-q', '--no-check', '-sn', '-x', dsc],
            stdout=devnull, cwd=tempdir)

    # It's in the archive, so these assertions must hold.
    if ret != 0:
        raise BrokenSourcePackage(dsc)

    unpacked = '%s/%s-%s' % (tempdir, pkg, upstream_version)
    assert os.path.isdir(unpacked)
    changelog_path = '%s/debian/changelog' % unpacked
    assert os.path.exists(changelog_path)

    with open(changelog_path) as changelog:
        versions = set()
        for line in changelog:
            m = re_changelog_versions.match(line)
            if m:
                versions.add(m.group(1))

    shutil.rmtree(unpacked)

    return versions


def descended_from(options, pkg, section1, section2):
    if apt_pkg.version_compare(section1.version, section2.version) <= 0:
        return False
    exception = None
    for dsc in find_dsc(options, pkg, section1):
        try:
            versions = get_changelog_versions(pkg, dsc, section1.version)
        except BrokenSourcePackage as exception:
            continue
        return section1.version in versions
    raise exception


Candidate = namedtuple(
    "Candidate", ["package", "suite1", "suite2", "version1", "version2"])


def get_series(options, name):
    if name not in series_by_name:
        series_by_name[name] = options.distro.getSeries(name_or_version=name)
    return series_by_name[name]


def already_copied(options, candidate):
    if "-" in candidate.suite2:
        series, pocket = candidate.suite2.split("-", 1)
        pocket = pocket.title()
    else:
        series = candidate.suite2
        pocket = "Release"
    series = get_series(options, series)
    pubs = options.archive.getPublishedSources(
        source_name=candidate.package, version=candidate.version1,
        exact_match=True, distro_series=series, pocket=pocket)
    for pub in pubs:
        if pub.status in ("Pending", "Published"):
            return True
    return False


def copy(options, candidate):
    if "-" in candidate.suite2:
        to_series, to_pocket = candidate.suite2.split("-", 1)
        to_pocket = to_pocket.title()
    else:
        to_series = candidate.suite2
        to_pocket = "Release"
    options.archive.copyPackage(
        source_name=candidate.package, version=candidate.version1,
        from_archive=options.archive, to_pocket=to_pocket, to_series=to_series,
        include_binaries=True, auto_approve=True)


def candidate_string(candidate):
    string = ('copy-package -y -b -s %s --to-suite %s -e %s %s' %
              (candidate.suite1, candidate.suite2, candidate.version1,
               candidate.package))
    if candidate.version2 is not None:
        string += '  # %s: %s' % (candidate.suite2, candidate.version2)
    return string


def main():
    apt_pkg.init_system()

    parser = optparse.OptionParser(usage="usage: %prog [options] [suites]")
    parser.add_option(
        "-l", "--launchpad", dest="launchpad_instance", default="production")
    parser.add_option(
        "--quick", action="store_true", help="don't examine changelogs")
    parser.add_option(
        "--copy-safe", action="store_true",
        help="automatically copy safe candidates")
    parser.add_option(
        "--mirrors", default=default_mirrors,
        help="colon-separated list of local mirrors")
    options, args = parser.parse_args()

    options.launchpad = Launchpad.login_with(
        "copy-report", options.launchpad_instance, version="devel")
    options.distro = options.launchpad.distributions["ubuntu"]
    options.archive = options.distro.main_archive
    options.mirrors = options.mirrors.split(":")

    if args:
        suites = args
    else:
        suites = reversed([
            series.name
            for series in options.launchpad.distributions["ubuntu"].series
            if series.status in ("Supported", "Current Stable Release")])

    yes = []
    maybe = []
    no = []

    for suite in suites:
        for component in 'main', 'restricted', 'universe', 'multiverse':
            tagfile1 = '%s/dists/%s-security/%s/source/Sources.gz' % (
                options.mirrors[0], suite, component)
            tagfile2 = '%s/dists/%s-updates/%s/source/Sources.gz' % (
                options.mirrors[0], suite, component)
            name1 = '%s-security' % suite
            name2 = '%s-updates' % suite

            suite1 = tagfiletodict(tagfile1)
            suite2 = tagfiletodict(tagfile2)

            for package in sorted(suite1):
                section1 = suite1[package]
                section2 = suite2.get(package)
                if (section2 is None or
                    (not options.quick and
                     descended_from(options, package, section1, section2))):
                    candidate = Candidate(
                        package=package, suite1=name1, suite2=name2,
                        version1=section1.version, version2=None)
                    if not already_copied(options, candidate):
                        yes.append(candidate)
                elif apt_pkg.version_compare(
                        section1.version, section2.version) > 0:
                    candidate = Candidate(
                        package=package, suite1=name1, suite2=name2,
                        version1=section1.version, version2=section2.version)
                    if already_copied(options, candidate):
                        pass
                    elif not options.quick:
                        no.append(candidate)
                    else:
                        maybe.append(candidate)

    if yes:
        print("The following packages can be copied safely:")
        print("--------------------------------------------")
        print()
        for candidate in yes:
            print(candidate_string(candidate))
        print()

        if options.copy_safe:
            for candidate in yes:
                copy(options, candidate)

    if maybe:
        print("Check that these packages are descendants before copying:")
        print("---------------------------------------------------------")
        print()
        for candidate in maybe:
            print('#%s' % candidate_string(candidate))
        print()

    if no:
        print("The following packages need to be merged by hand:")
        print("-------------------------------------------------")
        print()
        for candidate in no:
            print('#%s' % candidate_string(candidate))
        print()


if __name__ == '__main__':
    main()