~al-maisan/launchpad/ejdt-484819

« back to all changes in this revision

Viewing changes to lib/lp/soyuz/model/buildqueue.py

  • Committer: Muharem Hrnjadovic
  • Date: 2009-12-10 07:14:59 UTC
  • Revision ID: muharem@ubuntu.com-20091210071459-npx4yt39v9zmd5wk
Re-implementation complete.

Show diffs side-by-side

added added

removed removed

Lines of Context:
191
191
        #     it in the queue that may execute on any builder/processor
192
192
        # It is used to determine which builders should be considered when
193
193
        # looking for the builder that will become available next.
194
 
        sum_of_delays, mixed_builders = self._delayCausedByPendingJobsAhead()
 
194
        sum_of_delays = self._delayCausedByPendingJobsAhead()
195
195
 
196
196
        # Get the minimum time duration until the next builder becomes
197
197
        # available.
198
 
        min_wait_time = self._minTimeToNextBuilderAvailable(mixed_builders)
 
198
        min_wait_time = self._minTimeToNextBuilderAvailable()
199
199
 
200
200
        # This is the estimated build job start time in seconds from now.
201
201
        start_time = 0
217
217
 
218
218
        return result
219
219
 
220
 
    def _getBuilderPoolSize(self):
221
 
        """How many builders can run the job at hand?"""
222
 
        result = 0
223
 
        my_processor = self.specific_job.processor
224
 
        my_virtualized = self.specific_job.virtualized
225
 
        builder_counts = self._getBuilderCounts()
226
 
 
227
 
        if my_processor is None:
228
 
            # This job does not care about processor types, any builder will
229
 
            # do, so, sum them all up.
230
 
            builder_count = itemgetter(2)
231
 
            result = sum(builder_count(row) for row in builder_counts)
232
 
        else:
233
 
            # Only builders with the matching processor and virtualization
234
 
            # setting please.
235
 
            for processor, virtualized, count in builder_counts:
236
 
                if (processor == my_processor.id and
237
 
                    virtualized == my_virtualized):
238
 
                    result = count
239
 
                    break
240
 
 
241
 
        return result
242
 
 
243
 
    def _getBuilderCounts(self):
244
 
        """Builder numbers grouped by processor and virtualization flag."""
245
 
        # Avoid circular imports.
246
 
        from lp.soyuz.model.builder import Builder
 
220
    def _prepareBuilderData(self):
 
221
        """How many working builders are there, how are they configured?"""
 
222
        # Please note: this method will send only one request to the database.
247
223
 
248
224
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
249
 
        query = Select(
250
 
            columns=[Builder.processorID, Builder.virtualized, Count()],
251
 
            tables=[Builder],
252
 
            where=And([Builder.builderok == True, Builder.manual == False]),
253
 
            group_by=[Builder.processorID, Builder.virtualized])
254
 
        return store.execute(query)
255
 
 
256
 
    def _delayCausedByPendingJobsAhead(self):
 
225
        temp_builder_data = """
 
226
            CREATE TEMPORARY TABLE __tmp_builder_data(
 
227
                processor integer NOT NULL,
 
228
                virtualized boolean NOT NULL,
 
229
                quantity integer NOT NULL);
 
230
            INSERT INTO __tmp_builder_data(
 
231
                processor, virtualized, quantity)
 
232
            SELECT processor, virtualized, COUNT(id) FROM builder
 
233
            WHERE builderok = TRUE AND manual = FALSE
 
234
            GROUP BY processor, virtualized;
 
235
            SELECT SUM(quantity) FROM __tmp_builder_data
 
236
        """
 
237
        # How many working builders in total?
 
238
        total_builders = store.execute(temp_builder_data).get_one()[0]
 
239
        if total_builders is None:
 
240
            total_builders = 0
 
241
 
 
242
        return total_builders
 
243
 
 
244
    def _delayCausedByPendingJobsAhead(self, total_builders):
257
245
        """Sum of estimated durations for *pending* jobs ahead in queue."""
258
 
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
259
 
 
260
 
        # For a given pending job in position N in the build farm queue the
261
 
        # query below sums up the estimated build durations for the
262
 
        # pending jobs [1 .. N-1] i.e. for the jobs that are ahead of job N.
263
 
        result_set = None
 
246
        # Please note: this method will send only one request to the database.
 
247
 
 
248
        # This will create a temporary table that will hold dispatch data for
 
249
        # jobs that
 
250
        #   - have a score that's equal or above the score of the job of
 
251
        #     interest (JOI)
 
252
        #   - (potentially) compete with the JOI for builders
 
253
        temp_dispatch_data = """
 
254
            CREATE TEMPORARY TABLE __tmp_dispatch_data(
 
255
                job integer NOT NULL,
 
256
                score integer NOT NULL,
 
257
                -- the job's estimated duration
 
258
                duration interval NOT NULL,
 
259
                processor integer,
 
260
                virtualized boolean);
 
261
        """
 
262
 
 
263
        # Ask all build farm job type classes for a plain SQL SELECT query
 
264
        # that will yield the pending jobs whose score is equal or better
 
265
        # than the score of the job of interest (JOI).
 
266
        # Chain these queries in "SELECT .. UNION SELECT .." fashion and
 
267
        # use them to populate the temporary dispatch data table.
264
268
        queries = []
265
269
        for job_class in self.specific_job_classes.values():
266
270
            queries.append(job_class.pendingJobsQuery(self.lastscore))
267
 
 
268
 
        query = "%s ORDER BY lastscore, job" % ' UNION '.join(queries)
 
271
        dispatch_query = """
 
272
            INSERT INTO __tmp_dispatch_data(
 
273
                job, score, duration, processor, virtualized) %s;
 
274
        """ % ' UNION '.join(queries)
 
275
        temp_dispatch_data += dispatch_query
269
276
 
270
277
        my_processor = self.specific_job.processor
271
278
        my_virtualized = self.specific_job.virtualized
272
 
        sum_of_delays = datetime.timedelta()
273
 
 
274
 
        # If the job at hand
275
 
        #   - does not care about processor types
276
 
        #   - is tied to a specific processor but there are jobs ahead of
277
 
        #     it in the queue that may execute on any builder/processor
278
 
        # then this flag should be set.
279
 
        # It is returned as part of the result data and used to determine
280
 
        # which builders should be considered when looking for the builder
281
 
        # that will become available next.
282
 
        mixed_builders = my_processor is None
283
 
 
284
 
        # Iterate over all jobs that are ahead of me (score wise) in the
285
 
        # queue.
286
 
        result_set = store.execute(query)
287
 
        for job_data in result_set:
288
 
            (bq_id, bq_score, bq_duration, job_processor,
289
 
             job_virtualized) = job_data
290
 
 
291
 
            # Ignore the job at hand.
292
 
            if bq_id == self.id:
293
 
                continue
294
 
            # Does this job care about processor types?
295
 
            if my_processor is None:
296
 
                # No, any other job ahead in the queue thus competes for
297
 
                # builder time.
298
 
                sum_of_delays += bq_duration
299
 
            elif (my_processor.id == job_processor and
300
 
                  my_virtualized == job_virtualized) or job_processor is None:
301
 
                # This job is tied to a particular processor type. Hence only
302
 
                # jobs
303
 
                #   - with the same processor and virtualization settings
304
 
                #   - that do not care about processor types
305
 
                # compete for builder time.
306
 
                sum_of_delays += bq_duration
307
 
                # Please see the comment where `mixed_builders` is initialized
308
 
                # for details.
309
 
                if job_processor is None and mixed_builders == False:
310
 
                    mixed_builders = True
311
 
 
312
 
        return (sum_of_delays.seconds, mixed_builders)
 
279
        if my_processor is not None:
 
280
            # If the job of interest (JOI) is tied to a particular processor
 
281
            # then purge all jobs that are tied to a different processor since
 
282
            # these do not compete for the same builders.
 
283
            # Please note: this will retain all jobs that are processor
 
284
            # independent.
 
285
            purge_jobs_with_mismatching_processors = """
 
286
                DELETE FROM __tmp_dispatch_data
 
287
               WHERE processor != %s AND virtualized != %s;
 
288
            """ % sqlvalues(my_processor, my_virtualized)
 
289
            temp_dispatch_data += purge_jobs_with_mismatching_processors
 
290
 
 
291
        # Apply weights to the estimated duration of the jobs as follows:
 
292
        #   - if a job is tied to a processor TP then divide the estimated
 
293
        #     duration of that job by the number of builders that target TP
 
294
        #     since only these can build the job.
 
295
        #   - if the job is processor independent then divide its estimated
 
296
        #     duration by the total number of builders because any one of
 
297
        #     them may run it.
 
298
        weight_durations = """
 
299
            UPDATE __tmp_dispatch_data dd
 
300
            SET duration = duration/quantity FROM __tmp_builder_data bd
 
301
            WHERE
 
302
                dd.processor = bd.processor AND 
 
303
                dd.virtualized = bd.virtualized;
 
304
            UPDATE __tmp_dispatch_data dd
 
305
            SET duration = duration/%s
 
306
            WHERE
 
307
                dd.processor IS NULL;
 
308
        """ % sqlvalues(total_builders)
 
309
        temp_dispatch_data += dispatch_query
 
310
 
 
311
        # Now sum up the estimated durations of jobs where
 
312
        #   - the score is above the score of the job of interest (JOI)
 
313
        #   - the score is equal to the score of the JOI but the value of its
 
314
        #     job foreign key is lower i.e. it is older and should thus run
 
315
        #     before the JOI.
 
316
        get_the_sum_of_delays = """
 
317
            SELECT SUM(duration) FROM __tmp_dispatch_data
 
318
            WHERE score > %s OR job < %s
 
319
        """ % sqlvalues(self.lastscore, self.job)
 
320
        temp_dispatch_data += get_the_sum_of_delays
 
321
 
 
322
        # Finally, run the accumulated queries and fetch the sum of delays.
 
323
        sum_of_delays = store.execute(temp_builder_data).get_one()[0]
 
324
        if sum_of_delays is None:
 
325
            sum_of_delays = datetime.timedelta()
 
326
 
 
327
        return sum_of_delays.seconds
313
328
 
314
329
    def _minTimeToNextBuilderAvailable(self, mixed_builders):
315
 
        """Get estimated time until a builder becomes available.
316
 
 
317
 
        Calculate and return the minimum remaining job execution time (in
318
 
        seconds since EPOCH) for the jobs that are currently running on the
319
 
        machine pool of interest.
320
 
        """
 
330
        """Get estimated time until a builder becomes available."""
 
331
        # Which builders should be considered? If the job of interest (JOI)
 
332
        # is processor independent then we should look at all builders since
 
333
        # any one of them can run the job.
 
334
        # In the opposite case (JOI is tied to a processor TP) we should only
 
335
        # look at builders that target TP.
 
336
        #
 
337
        # However, if there are processor independent jobs ahead of the JOI
 
338
        # in the queue we should again expand our selection to all builders.
321
339
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
322
340
        my_processor = self.specific_job.processor
323
341
        my_virtualized = self.specific_job.virtualized
324
 
        if my_processor is None or mixed_builders:
325
 
            # Look at all builders irrespective of processor type.
326
 
            looking_at_all_builders = True
327
 
            tables = "BuildQueue, Job"
328
 
            extra_clauses = ""
329
 
        else:
330
 
            # Only look at builders with specific processor types.
331
 
            looking_at_all_builders = False
332
 
            tables = "BuildQueue, Job, Builder"
333
 
            extra_clauses = """
334
 
                AND BuildQueue.builder = Builder.id
335
 
                AND Builder.processor = %s
336
 
                AND Builder.virtualized = %s
337
 
                """ % sqlvalues(my_processor, my_virtualized)
338
 
        params = (tables,) + sqlvalues(JobStatus.RUNNING) + (extra_clauses,)
 
342
 
 
343
        # If the count in the query below is above zero we should consider all
 
344
        # builders.
 
345
        case_query = """
 
346
            SELECT COUNT(job) FROM __tmp_dispatch_data
 
347
            WHERE
 
348
                -- Is the job of interest (JOI) processor independent?
 
349
                (job = %s AND processor IS NULL) OR
 
350
                -- Are there jobs ahead of the JOI that are
 
351
                -- processor independent?
 
352
                ((score > %s OR job < %s) AND processor IS NULL)
 
353
        """ % sqlvalues(self.job, self.lastscore, self.job)
 
354
 
 
355
        # This is the processor independent query, considers all builders.
 
356
        all_builders = """
 
357
            SELECT MIN(
 
358
                CAST (EXTRACT(EPOCH FROM
 
359
                        (BuildQueue.estimated_duration -
 
360
                        (NOW() - Job.date_started))) AS INTEGER))
 
361
            FROM
 
362
                BuildQueue, Job
 
363
            WHERE
 
364
                BuildQueue.job = Job.id
 
365
                AND Job.status = %s
 
366
        """ % sqlvalues(JobStatus.RUNNING)
 
367
 
 
368
        # This is the query that only considers builders that target a
 
369
        # particular processor.
 
370
        specific_builders = """
 
371
            SELECT MIN(
 
372
                CAST (EXTRACT(EPOCH FROM
 
373
                        (BuildQueue.estimated_duration -
 
374
                        (NOW() - Job.date_started))) AS INTEGER))
 
375
            FROM
 
376
                BuildQueue, Job, Builder
 
377
            WHERE
 
378
                BuildQueue.job = Job.id
 
379
                AND Job.status = %s
 
380
                AND Builder.processor = %s AND Builder.virtualized = %s
 
381
        """ % sqlvalues(JobStatus.RUNNING, my_processor, my_virtualized)
 
382
 
339
383
        delay_query = """
340
 
            SELECT MIN(
341
 
                CAST (EXTRACT(EPOCH FROM
342
 
                        (BuildQueue.estimated_duration -
343
 
                        (NOW() - Job.date_started))) AS INTEGER))
344
 
            FROM
345
 
                %s
346
 
            WHERE
347
 
                BuildQueue.job = Job.id
348
 
                AND Job.status = %s
349
 
                %s
350
 
            """ % params
 
384
            SELECT CASE
 
385
              WHEN (%s) > 0
 
386
              THEN (
 
387
                %s
 
388
              )
 
389
              ELSE (
 
390
                %s
 
391
              )
 
392
            END
 
393
        """ % (case_query, all_builders, specific_builders)
 
394
 
351
395
        result_set = store.execute(delay_query)
352
396
        headjob_delay = result_set.get_one()[0]
353
397
        if headjob_delay is None: