~ubuntu-archive/ubuntu-archive-scripts/trunk

« back to all changes in this revision

Viewing changes to extra-germinate

  • Committer: Colin Watson
  • Date: 2014-06-10 16:20:34 UTC
  • Revision ID: cjwatson@canonical.com-20140610162034-y1hst950owjwjxt8
extra-germinate: Parallelise by architecture.

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
# Copyright 2011-2012 Canonical Ltd.  This software is licensed under the
7
7
# GNU Affero General Public License version 3 (see the file LICENSE).
8
8
 
 
9
import copy
9
10
from functools import partial
10
11
import glob
11
12
import logging
12
13
from optparse import OptionParser
13
14
import os
 
15
import pickle
14
16
import shutil
15
17
import sys
 
18
import traceback
16
19
 
17
20
from germinate.archive import TagFile
18
21
from germinate.germinator import Germinator
38
41
            os.rename("%s.new" % self.filename, self.filename)
39
42
 
40
43
 
 
44
class BufferHandler(logging.Handler):
 
45
    """A log handler which stores records for emission by another logger."""
 
46
 
 
47
    def __init__(self):
 
48
        super(BufferHandler, self).__init__()
 
49
        self.records = []
 
50
 
 
51
    def emit(self, record):
 
52
        # Record arguments may not be available at the other end.
 
53
        record_copy = copy.copy(record)
 
54
        record_copy.msg = record.getMessage()
 
55
        record_copy.args = None
 
56
        self.records.append(pickle.dumps(record_copy, -1))
 
57
 
 
58
 
41
59
class ExtraGerminate:
42
60
    def __init__(self, args):
43
61
        self.germinate_logger = None
83
101
        self.germinate_logger = logging.getLogger("germinate")
84
102
        self.germinate_logger.setLevel(logging.INFO)
85
103
        self.log_file = os.path.join(self.germinateroot, "germinate.output")
86
 
        handler = logging.FileHandler(self.log_file, mode="w")
87
 
        handler.setFormatter(GerminateFormatter())
88
 
        self.germinate_logger.addHandler(handler)
 
104
        self.log_handler = logging.FileHandler(self.log_file, mode="w")
 
105
        self.log_handler.setFormatter(GerminateFormatter())
 
106
        self.germinate_logger.addHandler(self.log_handler)
89
107
        self.germinate_logger.propagate = False
90
108
 
91
109
    def setUp(self):
178
196
            germinator, structure, flavour, suite, arch,
179
197
            seed_outputs=seed_outputs)
180
198
 
181
 
    def germinateArch(self, suite, components, arch, flavours,
182
 
                      structures, seed_outputs=None):
 
199
    def germinateArch(self, suite, components, arch, flavours, structures):
183
200
        """Germinate seeds on all flavours for a single architecture."""
 
201
        # Buffer log output for each architecture so that it appears
 
202
        # sequential.
 
203
        self.germinate_logger.removeHandler(self.log_handler)
 
204
        log_handler = BufferHandler()
 
205
        self.germinate_logger.addHandler(log_handler)
 
206
 
184
207
        germinator = Germinator(arch)
185
208
 
186
209
        # Compute ogre model.
201
224
            suites, components, arch, self.options.archiveroot, cleanup=True)
202
225
        germinator.parse_archive(archive)
203
226
 
 
227
        seed_outputs = set()
204
228
        for flavour in flavours:
205
229
            logging.info(
206
230
                "Germinating for %s/%s/%s", flavour, suite, arch)
215
239
                structures[flavour], flavour == flavours[0],
216
240
                seed_outputs=seed_outputs)
217
241
 
 
242
        return log_handler.records, seed_outputs
 
243
 
 
244
    def germinateArchChild(self, close_in_child, wfd, *args):
 
245
        """Helper method to call germinateArch in a forked child process."""
 
246
        try:
 
247
            for fd in close_in_child:
 
248
                os.close(fd)
 
249
            with os.fdopen(wfd, "wb") as writer:
 
250
                pickle.dump(self.germinateArch(*args), writer, -1)
 
251
            return 0
 
252
        except:
 
253
            traceback.print_exc()
 
254
            pickle.dump(([], set()), writer, -1)
 
255
            return 1
 
256
 
218
257
    def removeStaleOutputs(self, suite, seed_outputs):
219
258
        """Remove stale outputs for a suite.
220
259
 
233
272
            suite, flavours, seed_bases=seed_bases)
234
273
 
235
274
        if structures:
 
275
            procs = []
 
276
            close_in_child = []
 
277
            for arch in architectures:
 
278
                rfd, wfd = os.pipe()
 
279
                close_in_child.append(rfd)
 
280
                pid = os.fork()
 
281
                if pid == 0:  # child
 
282
                    os._exit(self.germinateArchChild(
 
283
                        close_in_child, wfd,
 
284
                        suite, components, arch, flavours, structures))
 
285
                else:  # parent
 
286
                    os.close(wfd)
 
287
                    reader = os.fdopen(rfd, "rb")
 
288
                    procs.append((pid, reader))
 
289
 
236
290
            seed_outputs = set()
237
 
            for arch in architectures:
238
 
                self.germinateArch(
239
 
                    suite, components, arch, flavours,
240
 
                    structures, seed_outputs=seed_outputs)
 
291
            for pid, reader in procs:
 
292
                log_records, arch_seed_outputs = pickle.load(reader)
 
293
                for log_record in log_records:
 
294
                    self.germinate_logger.handle(pickle.loads(log_record))
 
295
                seed_outputs |= arch_seed_outputs
 
296
                reader.close()
 
297
                os.waitpid(pid, 0)
241
298
            self.removeStaleOutputs(suite, seed_outputs)
242
299
 
243
300
    def process(self, seed_bases=None):