1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
#!/usr/bin/python3
"""Subscribe uploaders to the unapproved queue to their SRU bugs
Author: Robie Basak <robie.basak@canonical.com>
Subscriptions are made on the principle that if you signed an upload, you need
to follow up on review comments on the SRU team. The SRU team have been having
trouble getting response to comments in bugs, noticed that often the sponsor
isn't subscribed, and hopes this will help.
This is intended to be run from a cronjob or timer. Run with --debug to see
what is going on, and additionally with --dry-run to see what would happen.
Principle of operation: get all key fingerprints of everyone in
~ubuntu-uploaders. Then download the .dsc file from everything in Unapproved
targetted at the proposed pockets of stable series and see who signed them.
Then subscribe this person to all bugs referenced in Launchpad-Bugs-Fixed in
the changes file of those uploads if they aren't subscribed at "Discussion"
level already.
To stop this script from acting on a particular bug, tag it "bot-stop-nagging".
It takes a while to get going; hitting the Launchpad API for all uploaders'
keys takes about a minute, then a further minute to fetch all the keys to get
their subkey fingerprints from keyserver.ubuntu.com.
All Ubuntu uploaders are supposed to be in ~ubuntu-uploaders but this isn't
enforced except by the correct action of the DMB. If somebody has uploaded but
is not a member of the team, this script will not find their key and therefore
will not subscribe them to their bugs.
The key 6309259455EFCAA8 also signs uploads and is ~ci-train-bot; it isn't a
member of ~ubuntu-uploaders and is therefore also ignored. We don't have a way
of identifying the actual sponsor in this case.
"""
import argparse
import itertools
import logging
import re
import subprocess
import tempfile
import urllib
import debian.deb822
import launchpadlib.launchpad
GPG_OUTPUT_PATTERN = re.compile(
r"""gpg: Signature made.*
gpg: +using (?:RSA|DSA) key (?P<fingerprint>[0-9A-F]+)
"""
)
def find_specific_url(urls, suffix):
for url in urls:
if url.endswith(suffix):
return url
raise ValueError
def get_upload_source_urls(upload):
if upload.contains_source:
return upload.sourceFileUrls()
elif upload.contains_copy:
try:
copy_source_archive = upload.copy_source_archive
# Trigger a problem ValueError exception now rather than later
# This is magic launchpadlib behaviour: accessing an attribute of
# copy_source_archive may fail later on an access permission issue
# due to lazy loading.
getattr(copy_source_archive, "self_link")
except ValueError as e:
raise RuntimeError(
f"Cannot access {upload} copy_source_archive attribute: no permission?"
) from e
return next(
iter(
upload.copy_source_archive.getPublishedSources(
source_name=upload.package_name,
version=upload.package_version,
exact_match=True,
order_by_date=True,
)
)
).sourceFileUrls()
else:
raise RuntimeError(f"Cannot find source for {upload}")
def find_dsc_signing_fingerprint(dsc_url):
with urllib.request.urlopen(
dsc_url
) as dsc_fobj, tempfile.TemporaryDirectory() as tmpdir:
result = subprocess.run(
["gpg", "--verify"],
input=dsc_fobj.read(),
capture_output=True,
env={"GNUPGHOME": tmpdir},
)
if result.returncode != 2:
raise RuntimeError("Unknown exit status from gpg")
m = GPG_OUTPUT_PATTERN.search(result.stderr.decode())
if not m:
raise ValueError("Signing key fingerprint not found")
return m.group("fingerprint")
def find_changes_bugs(changes_url):
with urllib.request.urlopen(changes_url) as changes_fobj:
changes = debian.deb822.Changes(changes_fobj)
try:
bugs_str = changes["Launchpad-Bugs-Fixed"]
except KeyError:
return []
return bugs_str.split()
def ensure_subscribed(person, bug, dry_run):
for subscription in bug.subscriptions:
if (
subscription.person_link == person.self_link
and subscription.bug_notification_level == "Discussion"
):
logging.debug(f"{person.name} is already subscribed to {bug.id}")
return
logging.debug(f"Subscribing {person.name} to {bug.id}")
if "bot-stop-nagging" in bug.tags:
logging.debug("bot-stop-nagging detected; not subscribing")
return
if not dry_run:
bug.subscribe(level="Discussion", person=person)
def parse_fingerprints(output):
for line in output.decode().splitlines():
fields = line.split(":")
if fields[0] == "fpr":
yield fields[9]
def fetch_subkey_fingerprints(primary_fingerprints):
with tempfile.TemporaryDirectory() as tmpdir:
for primary_fingerprint in primary_fingerprints:
logging.debug(f"Fetching key for {primary_fingerprint}")
result = subprocess.run(
[
"gpg",
"--keyserver",
"keyserver.ubuntu.com",
"--recv-key",
primary_fingerprint,
],
env={"GNUPGHOME": tmpdir},
capture_output=True,
)
if (
result.returncode == 2
and result.stderr == "gpg: keyserver receive failed: No data\n".encode()
):
# Some keys cannot be fetched. See: https://irclogs.ubuntu.com/2022/09/07/%23launchpad.html#t14:03
continue
result.check_returncode()
result = subprocess.run(
[
"gpg",
"--list-keys",
"--with-colons",
"--with-fingerprint",
"--with-fingerprint", # duplicate to also give subkey fingerprints
primary_fingerprint,
],
env={"GNUPGHOME": tmpdir},
capture_output=True,
check=True,
)
yield primary_fingerprint, (
fpr
for fpr in parse_fingerprints(result.stdout)
if fpr != primary_fingerprint
)
def determine_fingerprint_to_person_map(lp):
logging.debug("Fetching uploader key fingerprints")
fingerprint_to_person = {
gpg_key.fingerprint: person
for person, gpg_key in itertools.chain.from_iterable(
((person, gpg_key) for gpg_key in person.gpg_keys)
for person in lp.people["ubuntu-uploaders"].participants
)
}
logging.debug(f"{len(fingerprint_to_person)} fingerprints fetched")
# Add subkey fingerprints
logging.debug(f"Retrieving subkey fingerprints from keyserver.ubuntu.com")
fingerprint_to_person.update(
{
subkey_fingerprint: fingerprint_to_person[primary_fingerprint]
for primary_fingerprint, subkey_fingerprint in itertools.chain.from_iterable(
(
(primary_fingerprint, subkey_fingerprint)
for subkey_fingerprint in subkey_fingerprints
)
for primary_fingerprint, subkey_fingerprints in fetch_subkey_fingerprints(
fingerprint_to_person.keys()
)
)
}
)
logging.debug("Subkey fingerprints fetched")
# Some signers are only embedding 16 nibble keyids, so identify using these
# as well. It's easier to arrange collisions, but I think we can trust
# Ubuntu uploaders not to do this; and in the worst case scenario we'll
# only misidentify the uploader and give the wrong person the subscription
# anyway.
fingerprint_to_person.update(
{
fingerprint[-16:]: person
for fingerprint, person in fingerprint_to_person.items()
}
)
return fingerprint_to_person
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
lp = launchpadlib.launchpad.Launchpad.login_with(
"~racb sru_autosubscribe", "production", version="devel"
)
fingerprint_to_person = determine_fingerprint_to_person_map(lp)
distro_seriess = [
series
for series in lp.distributions["ubuntu"].series
if series.status in {"Current Stable Release", "Supported"}
]
uploads = itertools.chain.from_iterable(
distro_series.getPackageUploads(pocket="Proposed", status="Unapproved")
for distro_series in distro_seriess
)
for upload in uploads:
logging.debug(f"Considering {upload}")
try:
urls = get_upload_source_urls(upload)
except RuntimeError:
logging.debug(f"Could not get source URLs for {upload}")
continue
try:
dsc_url = find_specific_url(urls, ".dsc")
except ValueError:
logging.debug(f"Could not find .dsc for {upload}")
continue
if not upload.changes_file_url:
logging.debug(f"Could not find changes file for {upload}")
continue
bug_numbers = find_changes_bugs(upload.changes_file_url)
fingerprint = find_dsc_signing_fingerprint(dsc_url)
try:
signer = fingerprint_to_person[fingerprint]
except KeyError:
logging.debug(f"Could not find signer with fingerprint {fingerprint}")
continue
for bug_number in bug_numbers:
try:
bug = lp.bugs[bug_number]
except KeyError:
logging.debug(f"Could not find bug {bug_number}")
continue
ensure_subscribed(person=signer, bug=bug, dry_run=args.dry_run)
|