4
from urllib.request import urlopen
9
class KernelRoutingEntry:
10
def __init__(self, ks, source, data):
11
name = "{}:{}".format(source.series.codename, source.name)
12
if isinstance(data, str):
14
table = source.series.routing_table
16
raise ValueError("unable to map routing alias {}, "
17
"no series routing table".format(data))
19
raise ValueError("unable to map routing alias {}, "
20
"not listed in series routing table".format(data))
23
# Clear out any entries that have been overriden to None.
24
for entry in dict(data):
25
if data[entry] is None:
31
self._data = data if data else {}
41
def __eq__(self, other):
42
if isinstance(self, other.__class__):
43
return list(self) == list(other)
46
def __ne__(self, other):
47
return not self.__eq__(other)
50
return iter(list(self._data.items()))
52
def __getitem__(self, which):
53
return self._data[which]
55
def lookup_destination(self, dest, primary=False):
56
data = self._data.get(dest, None)
57
if primary is False or data is None:
62
return str(self._data)
65
class KernelRepoEntry:
66
def __init__(self, ks, owner, data):
67
if isinstance(data, list):
68
new_data = {'url': data[0]}
70
new_data['branch'] = 'master'
72
new_data['branch'] = data[1]
77
self._data = data if data else {}
83
# XXX: should this object have a name ?
85
def __eq__(self, other):
86
if isinstance(self, other.__class__):
87
return self.url == other.url and self.branch == other.branch
90
def __ne__(self, other):
91
return not self.__eq__(other)
95
return self._data['url']
99
return self._data.get('branch', None)
102
return "{} {}".format(self.url, self.branch)
105
class KernelSnapEntry:
106
def __init__(self, ks, source, name, data):
108
self._source = source
110
self._data = data if data else {}
112
# Convert arches/track to publish-to form.
113
if 'publish-to' not in self._data:
114
if 'arches' in self._data:
116
for arch in self._data['arches']:
117
publish_to[arch] = [self._data.get('track', 'latest')]
118
self._data['publish-to'] = publish_to
120
# Convert stable to promote-to form.
121
if 'promote-to' not in self._data and 'stable' in self._data:
122
if self._data['stable'] is True:
123
self._data['promote-to'] = 'stable'
125
self._data['promote-to'] = 'candidate'
126
# Assume no promote-to data to mean just to edge.
127
promote_to = self._data.get('promote-to', 'edge')
128
if isinstance(promote_to, str):
129
expand_promote_to = []
130
for risk in ('edge', 'beta', 'candidate', 'stable'):
131
expand_promote_to.append(risk)
132
if risk == promote_to:
134
self._data['promote-to'] = expand_promote_to
135
# Ensure we have stable when promote-to is present.
136
if 'promote-to' in self._data and 'stable' not in self._data:
137
if 'stable' in self._data['promote-to']:
138
self._data['stable'] = True
140
self._data['stable'] = False
142
def __eq__(self, other):
143
if isinstance(self, other.__class__):
144
return self.name == other.name and self.source == other.source
147
def __ne__(self, other):
148
return not self.__eq__(other)
152
return self._source.series
164
data = self._data.get('repo', None)
167
return KernelRepoEntry(self._ks, self, data)
171
return self._data.get('primary', False)
175
return self._data.get('gated', False)
179
return self._data.get('stable', False)
183
return self._data.get('qa', False)
187
return self._data.get('hw-cert', False)
191
# XXX: should this be []
192
return self._data.get('arches', None)
196
return self._data.get('track', None)
199
def publish_to(self):
200
return self._data.get('publish-to', None)
203
def promote_to(self):
204
return self._data.get('promote-to', None)
206
def promote_to_risk(self, risk):
207
return risk in self._data.get('promote-to', [])
210
return "{} {}".format(str(self.source), self.name)
213
class KernelPackageEntry:
214
def __init__(self, ks, source, name, data):
216
self._source = source
218
self._data = data if data else {}
220
def __eq__(self, other):
221
if isinstance(self, other.__class__):
222
return self.name == other.name and self.source == other.source
225
def __ne__(self, other):
226
return not self.__eq__(other)
230
return self._source.series
242
return self._data.get('type', None)
246
data = self._data.get('repo', None)
249
return KernelRepoEntry(self._ks, self, data)
252
return "{} {} {}".format(str(self.source), self.name, self.type)
255
class KernelSourceEntry:
256
def __init__(self, ks, series, name, data):
258
self._series = series
260
self._data = data if data else {}
262
def __eq__(self, other):
263
if isinstance(self, other.__class__):
264
return self.name == other.name and self.series == other.series
267
def __ne__(self, other):
268
return not self.__eq__(other)
280
if 'versions' in self._data:
281
return self._data['versions']
283
derived_from = self.derived_from
284
if derived_from is not None:
285
return derived_from.versions
287
copy_forward = self.copy_forward
288
if copy_forward is not None:
289
return copy_forward.versions
291
# XXX: should this be []
296
versions = self.versions
302
def development(self):
303
return self._data.get('development', self.series.development)
307
return self._data.get('supported', self.series.supported)
310
def severe_only(self):
311
return self._data.get('severe-only', False)
314
def stakeholder(self):
315
return self._data.get('stakeholder', None)
319
# XXX: should this return None when empty
321
packages = self._data.get('packages')
323
for package_key, package in list(packages.items()):
324
result.append(KernelPackageEntry(self._ks, self, package_key, package))
327
def lookup_package(self, package_key):
328
packages = self._data.get('packages')
329
if not packages or package_key not in packages:
331
return KernelPackageEntry(self._ks, self, package_key, packages[package_key])
335
# XXX: should this return None when empty
337
snaps = self._data.get('snaps')
339
for snap_key, snap in list(snaps.items()):
340
result.append(KernelSnapEntry(self._ks, self, snap_key, snap))
343
def lookup_snap(self, snap_key):
344
snaps = self._data.get('snaps')
345
if not snaps or snap_key not in snaps:
347
return KernelSnapEntry(self._ks, self, snap_key, snaps[snap_key])
350
def derived_from(self):
351
if 'derived-from' not in self._data:
354
(series_key, source_key) = self._data['derived-from']
356
series = self._ks.lookup_series(series_key)
357
source = series.lookup_source(source_key)
362
def testable_flavours(self):
364
if (self._data.get('testing') is not None and
365
self._data['testing'].get('flavours') is not None
367
for flavour in self._data['testing']['flavours'].keys():
368
fdata = self._data['testing']['flavours'][flavour]
369
# If we have neither arches nor clouds we represent a noop
372
arches = fdata.get('arches', None)
373
arches = arches if arches is not None else []
374
clouds = fdata.get('clouds', None)
375
clouds = clouds if clouds is not None else []
376
retval.append(KernelSourceTestingFlavourEntry(flavour, arches, clouds))
380
def invalid_tasks(self):
381
retval = self._data.get('invalid-tasks', [])
387
def copy_forward(self):
388
if 'copy-forward' not in self._data:
391
# XXX: backwards compatibility.
392
if self._data['copy-forward'] is False:
394
if self._data['copy-forward'] is True:
395
derived_from = self.derived_from
396
if derived_from is None:
398
return self.derived_from
400
(series_key, source_key) = self._data['copy-forward']
402
series = self._ks.lookup_series(series_key)
403
source = series.lookup_source(source_key)
409
return self._data.get('backport', False)
414
if self.series.development:
418
data = self._data.get('routing', default)
421
return KernelRoutingEntry(self._ks, self, data)
425
return self._data.get('swm')
429
return self._data.get('private', False)
432
return "{} {}".format(self.series.name, self.name)
434
class KernelSourceTestingFlavourEntry:
435
def __init__(self, name, arches, clouds):
437
self._arches = arches
438
self._clouds = clouds
452
class KernelSeriesEntry:
453
def __init__(self, ks, name, data, defaults=None):
457
if defaults is not None:
458
self._data.update(defaults)
460
self._data.update(data)
462
def __eq__(self, other):
463
if isinstance(self, other.__class__):
464
return self.name == other.name
467
def __ne__(self, other):
468
return not self.__eq__(other)
476
return self._data.get('codename', None)
480
if 'opening' in self._data:
481
if self._data['opening'] is not False:
485
def opening_ready(self, *flags):
486
if 'opening' not in self._data:
488
allow = self._data['opening']
491
if allow in (True, False):
494
flag_allow = allow.get(flag, False)
495
if flag_allow is None or flag_allow is False:
498
opening_allow = opening_ready
501
def development(self):
502
return self._data.get('development', False)
506
return self._data.get('supported', False)
510
return self._data.get('lts', False)
514
return self._data.get('esm', False)
517
return "{} ({})".format(self.name, self.codename)
522
sources = self._data.get('sources')
524
for source_key, source in list(sources.items()):
525
result.append(KernelSourceEntry(
526
self._ks, self, source_key, source))
530
def routing_table(self):
531
return self._data.get('routing-table', None)
533
def lookup_source(self, source_key):
534
sources = self._data.get('sources')
535
if not sources or source_key not in sources:
537
return KernelSourceEntry(self._ks, self, source_key, sources[source_key])
543
_url = 'https://git.launchpad.net/~canonical-kernel/' \
544
'+git/kteam-tools/plain/info/kernel-series.yaml'
545
_url_local = 'file://' + os.path.realpath(os.path.join(os.path.dirname(__file__),
546
'..', 'info', 'kernel-series.yaml'))
547
#_url = 'file:///home/apw/git2/kteam-tools/info/kernel-series.yaml'
548
#_url = 'file:///home/work/kteam-tools/info/kernel-series.yaml'
552
def __load_once(cls, url):
553
if url not in cls._data_txt:
554
response = urlopen(url)
555
data = response.read()
556
if not isinstance(data, str):
557
data = data.decode('utf-8')
558
cls._data_txt[url] = data
559
return cls._data_txt[url]
561
def __init__(self, url=None, data=None, use_local=os.getenv("USE_LOCAL_KERNEL_SERIES_YAML", False)):
564
response = urlopen(url)
565
data = response.read()
566
if not isinstance(data, str):
567
data = data.decode('utf-8')
569
data = self.__load_once(self._url_local if use_local else self._url)
570
self._data = yaml.safe_load(data)
572
self._development_series = None
573
self._codename_to_series = {}
574
for series_key, series in list(self._data.items()):
577
if series.get('development', False):
578
self._development_series = series_key
579
if 'codename' in series:
580
self._codename_to_series[series['codename']] = series_key
582
# Pull out the defaults.
583
self._defaults_series = {}
584
if 'defaults' in self._data:
585
self._defaults_series = self._data['defaults']
586
del self._data['defaults']
589
def key_series_name(series):
590
return [int(x) for x in series.name.split('.')]
594
return [KernelSeriesEntry(self, series_key, series,
595
defaults=self._defaults_series)
596
for series_key, series in list(self._data.items())]
598
def lookup_series(self, series=None, codename=None, development=False):
599
if not series and not codename and not development:
600
raise ValueError("series/codename/development required")
601
if not series and codename:
602
if codename not in self._codename_to_series:
604
series = self._codename_to_series[codename]
605
if not series and development:
606
if not self._development_series:
608
series = self._development_series
609
if series and series not in self._data:
611
return KernelSeriesEntry(self, series, self._data[series],
612
defaults=self._defaults_series)
615
if __name__ == '__main__':
618
series = db.lookup_series('16.04')
619
if series.name != '16.04':
620
print('series.name != 16.04')
621
if series.codename != 'xenial':
622
print('series.codename != xenial')
624
series2 = db.lookup_series(codename='xenial')
625
if series2.name != '16.04':
626
print('series2.name != 16.04')
627
if series2.codename != 'xenial':
628
print('series2.codename != xenial')
630
series3 = db.lookup_series(development=True)
631
if series3.name != '18.04':
632
print('series3.name != 18.04')
633
if series3.codename != 'bionic':
634
print('series3.codename != bionic')
636
print(str(series), str(series2), str(series3))
638
for series2 in sorted(db.series, key=db.key_series_name):
641
for source in series.sources:
642
print(str(source), source.series.name, source.name)
644
print(source.derived_from)
645
print(source.versions)
647
for package in source.packages:
648
print("PACKAGE", str(package))
650
for snap in source.snaps:
651
print("SNAP", str(snap), snap.arches)
654
# vi:set ts=4 sw=4 expandtab: