~wesley-wiedenmeier/curtin/curtin-1543263

« back to all changes in this revision

Viewing changes to tests/vmtests/image_sync.py

  • Committer: Scott Moser
  • Date: 2016-02-10 01:02:53 UTC
  • mfrom: (344.1.18 trunk.sync-cleanup)
  • Revision ID: smoser@ubuntu.com-20160210010253-n4u75udwoq0v54nb
vmtest: fix importing of xenial images.

This cleans up the importing of images to use a simplestreams
mirror concept rather than the cmdline tools sstream-mirror and
sstream-query.

Theres a main in tests/vmtests/image_sync that allows query and
mirror from a cmdline interface:
   $ python3 -m tests.vmtests.image_sync query vmtest.d
   $ python3 -m tests.vmtests.image_sync mirror -vv \
      vmtest.d arch=amd64 release=trusty

The ./tools/vmtest-sync-images is still functional, and supports
a '--clean' option that clears the mirrors internal and derived data.
This might be useful in the event of a broken mirror output.

The default max images to keep is still 1, and old images should be
removed on a sync/mirror.

The mirror (IMAGE_DIR) now knows about the derived images and names them
vmtest.*. It can be mirrored with something like:
   sstream-mirror mirror $IMAGE_DIR out.d
If that was published on a http mirror, then http mirror would mean
no need of creating the derived files locally.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python3
 
2
 
 
3
from simplestreams import util as sutil
 
4
from simplestreams import contentsource
 
5
from simplestreams import objectstores
 
6
from simplestreams import log
 
7
from simplestreams.log import LOG
 
8
from simplestreams import mirrors
 
9
from simplestreams import filters
 
10
 
 
11
import argparse
 
12
import errno
 
13
import hashlib
 
14
import os
 
15
import shutil
 
16
import signal
 
17
import sys
 
18
import tempfile
 
19
 
 
20
from curtin import util
 
21
 
 
22
IMAGE_SRC_URL = os.environ.get(
 
23
    'IMAGE_SRC_URL',
 
24
    "http://maas.ubuntu.com/images/ephemeral-v2/daily/streams/v1/index.sjson")
 
25
 
 
26
KEYRING = '/usr/share/keyrings/ubuntu-cloudimage-keyring.gpg'
 
27
ITEM_NAME_FILTERS = ['ftype~(root-image.gz|boot-initrd|boot-kernel)']
 
28
FORMAT_JSON = 'JSON'
 
29
VMTEST_CONTENT_ID = 'com.ubuntu.maas:daily:v2:download'
 
30
VMTEST_JSON_PATH = "streams/v1/vmtest.json"
 
31
 
 
32
 
 
33
DEFAULT_ARCHES = {
 
34
    'i386': ['i386'],
 
35
    'i586': ['i386'],
 
36
    'i686': ['i386'],
 
37
    'x86_64': ['amd64'],
 
38
    'ppc64le': ['ppc64el'],
 
39
    'armhf': ['armhf'],
 
40
    'aarch64': ['arm64'],
 
41
}
 
42
 
 
43
 
 
44
def get_file_info(path, sums=None):
 
45
    # return dictionary with size and checksums of existing file
 
46
    LOG.info("getting info for %s" % path)
 
47
    buflen = 1024*1024
 
48
 
 
49
    if sums is None:
 
50
        sums = ['sha256']
 
51
    sumers = {k: hashlib.new(k) for k in sums}
 
52
 
 
53
    ret = {'size': os.path.getsize(path)}
 
54
    with open(path, "rb") as fp:
 
55
        while True:
 
56
            buf = fp.read(buflen)
 
57
            for sumer in sumers.values():
 
58
                sumer.update(buf)
 
59
            if len(buf) != buflen:
 
60
                break
 
61
 
 
62
    ret.update({k: sumers[k].hexdigest() for k in sumers})
 
63
    LOG.info("done getting ifo for %s: %s" % (path, ret))
 
64
    return ret
 
65
 
 
66
 
 
67
def generate_root_derived(path_gz, base_d="/", info_func=get_file_info):
 
68
    fpath_gz = os.path.join(base_d, path_gz)
 
69
    ri_name = 'vmtest.root-image'
 
70
    rtgz_name = 'vmtest.root-tgz'
 
71
    ri_path = os.path.dirname(path_gz) + "/" + ri_name
 
72
    rtgz_path = os.path.dirname(path_gz) + "/" + rtgz_name
 
73
    ri_fpath = os.path.join(base_d, ri_path)
 
74
    rtgz_fpath = os.path.join(base_d, rtgz_path)
 
75
    new_items = {ri_name: {'ftype': ri_name, 'path': ri_path},
 
76
                 rtgz_name: {'ftype': rtgz_name, 'path': rtgz_path}}
 
77
 
 
78
    tmpd = None
 
79
    try:
 
80
        # create tmpdir under output dir
 
81
        tmpd = tempfile.mkdtemp(dir=os.path.dirname(fpath_gz))
 
82
        tmp_img = ri_fpath
 
83
        tmp_rtgz = rtgz_fpath
 
84
        if not os.path.exists(ri_fpath):
 
85
            # uncompress path_gz to tmpdir/root-image
 
86
            tmp_img = os.path.join(tmpd, ri_name)
 
87
            LOG.info("uncompressing %s to %s" % (fpath_gz, tmp_img))
 
88
            util.subp(['sh', '-c', 'exec gunzip -c "$0" > "$1"',
 
89
                      fpath_gz, tmp_img])
 
90
        if not os.path.exists(rtgz_fpath):
 
91
            tmp_rtgz = os.path.join(tmpd, rtgz_name)
 
92
            m2r = ['tools/maas2roottar', tmp_img, tmp_rtgz]
 
93
            LOG.info("creating root-tgz from %s" % tmp_img)
 
94
            util.subp(m2r)
 
95
 
 
96
        if tmp_img != ri_fpath:
 
97
            os.rename(tmp_img, ri_fpath)
 
98
        if tmp_rtgz != rtgz_fpath:
 
99
            os.rename(tmp_rtgz, rtgz_fpath)
 
100
 
 
101
    finally:
 
102
        if tmpd:
 
103
            shutil.rmtree(tmpd)
 
104
 
 
105
    new_items[ri_name].update(info_func(ri_fpath))
 
106
    new_items[rtgz_name].update(info_func(rtgz_fpath))
 
107
 
 
108
    return new_items
 
109
 
 
110
 
 
111
def remove_empty_dir(dirpath):
 
112
    if os.path.exists(dirpath):
 
113
        try:
 
114
            os.rmdir(dirpath)
 
115
            print("removed empty dir %s" % dirpath)
 
116
            if dirpath.endswith(os.path.sep):
 
117
                dirpath = dirpath[:-1]
 
118
                remove_empty_dir(os.path.dirname(dirpath))
 
119
        except OSError as e:
 
120
            if e.errno == errno.ENOTEMPTY:
 
121
                pass
 
122
 
 
123
 
 
124
class FakeContentSource(contentsource.ContentSource):
 
125
    def __init__(self, path):
 
126
        self.url = path
 
127
 
 
128
    def open(self):
 
129
        raise ValueError(
 
130
            "'%s' content source never expected to be read" % self.url)
 
131
 
 
132
 
 
133
def products_version_get(tree, pedigree):
 
134
    tprod = tree.get('products', {}).get(pedigree[0], {})
 
135
    return tprod.get('versions', {}).get(pedigree[1], {})
 
136
 
 
137
 
 
138
class CurtinVmTestMirror(mirrors.ObjectFilterMirror):
 
139
    # This class works as a 'target' mirror.
 
140
    # it creates the vmtest files as it needs them and
 
141
    # writes the maas image files and maas json files intact.
 
142
    # but adds a streams/v1/vmtest.json file the created data.
 
143
    def __init__(self, config, out_d, verbosity=0):
 
144
 
 
145
        self.config = config
 
146
        self.filters = self.config.get('filters', [])
 
147
        self.out_d = os.path.abspath(out_d)
 
148
        self.objectstore = objectstores.FileStore(
 
149
            out_d, complete_callback=self.callback)
 
150
        self.file_info = {}
 
151
        self.data_path = ".vmtest-data"
 
152
        super(CurtinVmTestMirror, self).__init__(config=config,
 
153
                                                 objectstore=self.objectstore)
 
154
 
 
155
        self.verbosity = verbosity
 
156
        self.dlstatus = {'columns': 80, 'total': 0, 'curpath': None}
 
157
 
 
158
    def callback(self, path, cur_bytes, tot_bytes):
 
159
        # progress written to screen
 
160
        if self.verbosity == 0:
 
161
            return
 
162
 
 
163
        # this is taken logically from simplstreams DotProgress
 
164
        if self.dlstatus['curpath'] != path:
 
165
            self.dlstatus['printed'] = 0
 
166
            self.dlstatus['curpath'] = path
 
167
            sys.stderr.write('=> %s [%s]\n' % (path, tot_bytes))
 
168
 
 
169
        if cur_bytes == tot_bytes:
 
170
            self.dlstatus['total'] += tot_bytes
 
171
            sys.stderr.write("\n")
 
172
            return
 
173
 
 
174
        columns = self.dlstatus['columns']
 
175
        printed = self.dlstatus['printed']
 
176
        toprint = int(cur_bytes * columns / tot_bytes) - printed
 
177
        if toprint <= 0:
 
178
            return
 
179
        sys.stderr.write('.' * toprint)
 
180
        sys.stderr.flush()
 
181
        self.dlstatus['printed'] += toprint
 
182
 
 
183
    def fpath(self, path):
 
184
        # return the full path to a local file in the mirror
 
185
        return os.path.join(self.out_d, path)
 
186
 
 
187
    def products_data_path(self, content_id):
 
188
        # our data path is .vmtest-data rather than .data
 
189
        return self.data_path + os.path.sep + content_id
 
190
 
 
191
    def _reference_count_data_path(self):
 
192
        # overridden from ObjectStoreMirrorWriter
 
193
        return self.data_path + os.path.sep + "references.json"
 
194
 
 
195
    def load_products(self, path=None, content_id=None):
 
196
        # overridden from ObjectStoreMirrorWriter
 
197
        # the reason is that we have copied here from trunk
 
198
        # is bug 1511364 which is not fixed in all ubuntu versions
 
199
        if content_id:
 
200
            try:
 
201
                dpath = self.products_data_path(content_id)
 
202
                return sutil.load_content(self.source(dpath).read())
 
203
            except IOError as e:
 
204
                if e.errno != errno.ENOENT:
 
205
                    raise
 
206
 
 
207
        if path:
 
208
            return {}
 
209
 
 
210
        raise TypeError("unable to load_products with no path")
 
211
 
 
212
    def insert_version(self, data, src, target, pedigree):
 
213
        # this is called for any version that had items inserted
 
214
        # data target['products'][pedigree[0]]['versions'][pedigree[1]]
 
215
        # a dictionary with possibly some tags and 'items': {'boot_initrd}...
 
216
        ri_name = 'vmtest.root-image'
 
217
        rtgz_name = 'vmtest.root-tgz'
 
218
        tver_data = products_version_get(target, pedigree)
 
219
        titems = tver_data.get('items')
 
220
 
 
221
        if ('root-image.gz' in titems and
 
222
                not (ri_name in titems and rtgz_name in titems)):
 
223
            # generate the root-image and root-tgz
 
224
            derived_items = generate_root_derived(
 
225
                titems['root-image.gz']['path'], base_d=self.out_d,
 
226
                info_func=self.get_file_info)
 
227
            for fname, item in derived_items.items():
 
228
                self.insert_item(item, src, target, pedigree + (fname,),
 
229
                                 FakeContentSource(item['path']))
 
230
 
 
231
    def get_file_info(self, path):
 
232
        # check and see if we might know checksum and size
 
233
        if path in self.file_info:
 
234
            return self.file_info[path]
 
235
        found = get_file_info(path)
 
236
        self.file_info[path] = found
 
237
        return found
 
238
 
 
239
    def remove_version(self, data, src, target, pedigree):
 
240
        # called for versions that were removed.
 
241
        # we want to remove empty paths that have been cleaned
 
242
        for item in data.get('items', {}).values():
 
243
            if 'path' in item:
 
244
                remove_empty_dir(self.fpath(os.path.dirname(item['path'])))
 
245
 
 
246
    def insert_products(self, path, target, content):
 
247
        # The super classes' insert_products will
 
248
        # we override this because default  mirror inserts content
 
249
        # where as we want to insert the rendered 'target' tree
 
250
        # the difference is that 'content' is the original (with gpg sign)
 
251
        # so our content will no longer have that signature.
 
252
 
 
253
        dpath = self.products_data_path(target['content_id'])
 
254
        self.store.insert_content(dpath, util.json_dumps(target))
 
255
        if not path:
 
256
            return
 
257
        # this will end up writing the content exactly as it
 
258
        # was in the source, leaving the signed data in-tact
 
259
        self.store.insert_content(path, content)
 
260
 
 
261
        # for our vmtest content id, we want to write
 
262
        # a vmtest.json in streams/v1/vmtest.json that can be queried
 
263
        # even though it will not appear in index
 
264
        if target['content_id'] == VMTEST_CONTENT_ID:
 
265
            self.store.insert_content(VMTEST_JSON_PATH,
 
266
                                      util.json_dumps(target))
 
267
 
 
268
    def insert_index_entry(self, data, src, pedigree, contentsource):
 
269
        # this is overridden, because the default implementation
 
270
        # when syncing an index.json will call insert_products
 
271
        # and also insert_index_entry. And both actually end up
 
272
        # writing the .[s]json file that they should write. Since
 
273
        # insert_products will do that, we just no-op this.
 
274
        return
 
275
 
 
276
 
 
277
def set_logging(verbose, log_file):
 
278
    vlevel = min(verbose, 2)
 
279
    level = (log.ERROR, log.INFO, log.DEBUG)[vlevel]
 
280
    log.basicConfig(stream=log_file, level=level)
 
281
    return vlevel
 
282
 
 
283
 
 
284
def main_mirror(args):
 
285
    if len(args.arches) == 0:
 
286
        try:
 
287
            karch = os.uname()[4]
 
288
            arches = DEFAULT_ARCHES[karch]
 
289
        except KeyError:
 
290
            msg = "No default arch list for kernel arch '%s'. Try '--arches'."
 
291
            sys.stderr.write(msg % karch + "\n")
 
292
            return False
 
293
    else:
 
294
        arches = []
 
295
        for f in args.arches:
 
296
            arches.extend(f.split(","))
 
297
 
 
298
    arch_filter = "arch~(" + "|".join(arches) + ")"
 
299
 
 
300
    mirror_filters = [arch_filter] + ITEM_NAME_FILTERS + args.filters
 
301
 
 
302
    vlevel = set_logging(args.verbose, args.log_file)
 
303
 
 
304
    sys.stderr.write(
 
305
        "summary: \n " + '\n '.join([
 
306
            "source: %s" % args.source,
 
307
            "output: %s" % args.output_d,
 
308
            "arches: %s" % arches,
 
309
            "filters: %s" % mirror_filters,
 
310
        ]) + '\n')
 
311
 
 
312
    mirror(output_d=args.output_d, source=args.source,
 
313
           mirror_filters=mirror_filters, max_items=args.max_items,
 
314
           keyring=args.keyring, verbosity=vlevel)
 
315
 
 
316
 
 
317
def mirror(output_d, source=IMAGE_SRC_URL, mirror_filters=None, max_items=1,
 
318
           keyring=KEYRING, verbosity=0):
 
319
    if mirror_filters is None:
 
320
        mirror_filters = [f for f in ITEM_NAME_FILTERS]
 
321
 
 
322
    filter_list = filters.get_filters(mirror_filters)
 
323
 
 
324
    (source_url, initial_path) = sutil.path_from_mirror_url(source, None)
 
325
 
 
326
    def policy(content, path):  # pylint: disable=W0613
 
327
        if initial_path.endswith('sjson'):
 
328
            return sutil.read_signed(content, keyring=keyring)
 
329
        else:
 
330
            return content
 
331
 
 
332
    smirror = mirrors.UrlMirrorReader(source_url, policy=policy)
 
333
 
 
334
    LOG.debug(
 
335
        "summary: \n " + '\n '.join([
 
336
            "source: %s" % source_url,
 
337
            "path: %s" % initial_path,
 
338
            "output: %s" % output_d,
 
339
            "filters: %s" % filter_list,
 
340
        ]) + '\n')
 
341
 
 
342
    mirror_config = {'max_items': max_items, 'filters': filter_list}
 
343
    tmirror = CurtinVmTestMirror(config=mirror_config, out_d=output_d,
 
344
                                 verbosity=verbosity)
 
345
 
 
346
    tmirror.sync(smirror, initial_path)
 
347
 
 
348
 
 
349
def query_ptree(ptree, max_num=None, ifilters=None, path2url=None):
 
350
    results = []
 
351
    pkey = 'products'
 
352
    verkey = 'versions'
 
353
    for prodname, proddata in sorted(ptree.get(pkey, {}).items()):
 
354
        if verkey not in proddata:
 
355
            continue
 
356
        cur = 0
 
357
        for vername in sorted(proddata[verkey].keys(), reverse=True):
 
358
            if max_num is not None and cur >= max_num:
 
359
                break
 
360
            verdata = proddata[verkey][vername]
 
361
            cur += 1
 
362
            for itemname, itemdata in sorted(verdata.get('items', {}).items()):
 
363
                flat = sutil.products_exdata(ptree,
 
364
                                             (prodname, vername, itemname))
 
365
                if ifilters is not None and len(ifilters) > 0:
 
366
                    if not filters.filter_dict(ifilters, flat):
 
367
                        continue
 
368
                if path2url and 'path' in flat:
 
369
                    flat['item_url'] = path2url(flat['path'])
 
370
                results.append(flat)
 
371
    return results
 
372
 
 
373
 
 
374
def query(mirror, max_items=1, filter_list=None, verbosity=0):
 
375
    if filter_list is None:
 
376
        filter_list = []
 
377
 
 
378
    ifilters = filters.get_filters(filter_list)
 
379
 
 
380
    def fpath(path):
 
381
        # return the full path to a local file in the mirror
 
382
        return os.path.join(mirror, path)
 
383
 
 
384
    try:
 
385
        stree = sutil.load_content(util.load_file(fpath(VMTEST_JSON_PATH)))
 
386
    except OSError:
 
387
        raise
 
388
    results = query_ptree(stree, max_num=max_items, ifilters=ifilters,
 
389
                          path2url=fpath)
 
390
    return results
 
391
 
 
392
 
 
393
def main_query(args):
 
394
    vlevel = set_logging(args.verbose, args.log_file)
 
395
 
 
396
    results = query(args.mirror_url, args.max_items, args.filters,
 
397
                    verbosity=vlevel)
 
398
    try:
 
399
        print(util.json_dumps(results).decode())
 
400
    except IOError as e:
 
401
        if e.errno == errno.EPIPE:
 
402
            sys.exit(0x80 | signal.SIGPIPE)
 
403
        raise
 
404
 
 
405
 
 
406
def main():
 
407
    parser = argparse.ArgumentParser()
 
408
 
 
409
    parser.add_argument('--log-file', default=sys.stderr,
 
410
                        type=argparse.FileType('w'))
 
411
    parser.add_argument('--verbose', '-v', action='count', default=0)
 
412
 
 
413
    parser.set_defaults(func=None)
 
414
    subparsers = parser.add_subparsers(help='subcommand help')
 
415
    mirror_p = subparsers.add_parser(
 
416
        'mirror', help='like sstream-mirror but for vmtest images')
 
417
    mirror_p.set_defaults(func=main_mirror)
 
418
    mirror_p.add_argument('--max', type=int, default=1, dest='max_items',
 
419
                          help='store at most MAX items in the target')
 
420
    mirror_p.add_argument('--verbose', '-v', action='count', default=0)
 
421
    mirror_p.add_argument('--dry-run', action='store_true', default=False,
 
422
                          help='only report what would be done')
 
423
    mirror_p.add_argument('--arches', action='append',
 
424
                          default=[], help='which arches to mirror.')
 
425
    mirror_p.add_argument('--source', default=IMAGE_SRC_URL,
 
426
                          help='maas images mirror')
 
427
    mirror_p.add_argument('--keyring', action='store', default=KEYRING,
 
428
                          help='keyring to be specified to gpg via --keyring')
 
429
    mirror_p.add_argument('output_d')
 
430
    mirror_p.add_argument('filters', nargs='*', default=[])
 
431
 
 
432
    query_p = subparsers.add_parser(
 
433
        'query', help='like sstream-query but for vmtest mirror')
 
434
    query_p.set_defaults(func=main_query)
 
435
    query_p.add_argument('--max', type=int, default=None, dest='max_items',
 
436
                         help='store at most MAX items in the target')
 
437
    query_p.add_argument('--path', default=None,
 
438
                         help='sync from index or products file in mirror')
 
439
 
 
440
    fmt_group = query_p.add_mutually_exclusive_group()
 
441
    fmt_group.add_argument('--output-format', '-o', action='store',
 
442
                           dest='output_format', default=None,
 
443
                           help="specify output format per python str.format")
 
444
    fmt_group.add_argument('--json', action='store_const',
 
445
                           const=FORMAT_JSON, dest='output_format',
 
446
                           help="output in JSON as a list of dicts.")
 
447
    query_p.add_argument('--verbose', '-v', action='count', default=0)
 
448
 
 
449
    query_p.add_argument('mirror_url')
 
450
    query_p.add_argument('filters', nargs='*', default=[])
 
451
 
 
452
    args = parser.parse_args()
 
453
 
 
454
    if args.func is None:
 
455
        parser.print_help()
 
456
        sys.exit(1)
 
457
    args.func(args)
 
458
 
 
459
 
 
460
if __name__ == '__main__':
 
461
    main()
 
462
    sys.exit(0)
 
463
 
 
464
# vi: ts=4 expandtab syntax=python