~madteam/mg5amcnlo/series2.0

« back to all changes in this revision

Viewing changes to madgraph/various/cluster.py

  • Committer: olivier Mattelaer
  • Date: 2015-03-05 00:14:16 UTC
  • mfrom: (258.1.9 2.3)
  • mto: (258.8.1 2.3)
  • mto: This revision was merged to the branch mainline in revision 259.
  • Revision ID: olivier.mattelaer@uclouvain.be-20150305001416-y9mzeykfzwnl9t0j
partial merge

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import re
19
19
import glob
20
20
import inspect
 
21
import sys
21
22
 
22
23
logger = logging.getLogger('madgraph.cluster') 
23
24
 
100
101
        self.cluster_retry_wait = opts['cluster_retry_wait'] if 'cluster_retry_wait' in opts else 300
101
102
        self.options = dict(opts)
102
103
        self.retry_args = {}
103
 
 
 
104
        # controlling jobs in controlled type submision
 
105
        self.packet = {}
 
106
        self.id_to_packet = {}
104
107
 
105
108
    def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, 
106
109
               log=None, required_output=[], nb_submit=0):
107
110
        """How to make one submission. Return status id on the cluster."""
108
111
        raise NotImplemented, 'No implementation of how to submit a job to cluster \'%s\'' % self.name
109
112
 
 
113
 
110
114
    @store_input()
111
115
    def submit2(self, prog, argument=[], cwd=None, stdout=None, stderr=None, 
112
 
                log=None, input_files=[], output_files=[], required_output=[],nb_submit=0):
 
116
                log=None, input_files=[], output_files=[], required_output=[],
 
117
                nb_submit=0):
113
118
        """How to make one submission. Return status id on the cluster.
114
119
        NO SHARE DISK"""
115
120
 
162
167
            rm -rf $MYTMP
163
168
#        fi
164
169
        """
 
170
        
165
171
        dico = {'tmpdir' : self.temp_dir, 'script': os.path.basename(prog),
166
172
                'cwd': cwd, 'job_id': self.job_id,
167
173
                'input_files': ' '.join(input_files + [prog]),
176
182
        
177
183
        return self.submit(new_prog, argument, cwd, stdout, stderr, log, 
178
184
                               required_output=required_output, nb_submit=nb_submit)
179
 
        
180
 
 
 
185
 
 
186
 
 
187
    def cluster_submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None, 
 
188
                log=None, input_files=[], output_files=[], required_output=[],
 
189
                nb_submit=0, packet_member=None):
 
190
            """This function wrap the cluster submition with cluster independant
 
191
               method should not be overwritten (but for DAG type submission)"""
 
192
               
 
193
            id = self.submit2(prog, argument, cwd, stdout, stderr, log, input_files, 
 
194
                             output_files, required_output, nb_submit)               
 
195
               
 
196
            
 
197
            if not packet_member:
 
198
                return id
 
199
            else:
 
200
                if isinstance(packet_member, Packet):
 
201
                    self.id_to_packet[id] = packet_member
 
202
                    packet_member.put(id)
 
203
                    if packet_member.tag not in self.packet:
 
204
                        self.packet[packet_member.tag] = packet_member
 
205
                else:
 
206
                    if packet_member in self.packet:
 
207
                        packet = self.packet[packet_member]
 
208
                        packet.put(id)
 
209
                        self.id_to_packet[id] = packet
 
210
                return id
 
211
                
181
212
    def control(self, me_dir=None):
182
213
        """Check the status of job associated to directory me_dir. return (idle, run, finish, fail)"""
183
214
        if not self.submitted_ids:
231
262
 
232
263
 
233
264
    @check_interupt()
234
 
    def wait(self, me_dir, fct, minimal_job=0):
 
265
    def wait(self, me_dir, fct, minimal_job=0, update_first=None):
235
266
        """Wait that all job are finish.
236
267
        if minimal_job set, then return if idle + run is lower than that number"""
237
268
        
240
271
        nb_iter = 0
241
272
        nb_short = 0 
242
273
        change_at = 5 # number of iteration from which we wait longer between update.
 
274
 
 
275
        if update_first:
 
276
            idle, run, finish, fail = self.control(me_dir)
 
277
            update_first(idle, run, finish)
 
278
        
243
279
        #usefull shortcut for readibility
244
280
        longtime, shorttime = self.options['cluster_status_update']
245
281
        
 
282
        nb_job = 0
246
283
        while 1: 
247
284
            old_mode = mode
248
285
            nb_iter += 1
249
286
            idle, run, finish, fail = self.control(me_dir)
 
287
            if nb_job:
 
288
                if  idle + run + finish + fail != nb_job:
 
289
                    nb_job = idle + run + finish + fail
 
290
                    nb_iter = 1 # since some packet finish prevent to pass in long waiting mode
 
291
            else:
 
292
                nb_job = idle + run + finish + fail
250
293
            if fail:
251
294
                raise ClusterManagmentError('Some Jobs are in a Hold/... state. Please try to investigate or contact the IT team')
252
295
            if idle + run == 0:
253
296
                #time.sleep(20) #security to ensure that the file are really written on the disk
254
297
                logger.info('All jobs finished')
 
298
                fct(idle, run, finish)
255
299
                break
256
300
            if idle + run < minimal_job:
257
301
                return
326
370
                logger.info('Job %s Finally found the missing output.' % (job_id))
327
371
            del self.retry_args[job_id]
328
372
            self.submitted_ids.remove(job_id)
 
373
            # check if the job_id is in a packet
 
374
            if job_id in self.id_to_packet:
 
375
                nb_in_packet = self.id_to_packet[job_id].remove_one()
 
376
                if nb_in_packet == 0:
 
377
                    # packet done run the associate function
 
378
                    packet = self.id_to_packet[job_id]
 
379
                    # fully ensure that the packet is finished (thread safe)
 
380
                    packet.queue.join()
 
381
                    #running the function
 
382
                    packet.fct(*packet.args)                    
 
383
                del self.id_to_packet[job_id]
 
384
                return 'resubmit'
 
385
            
329
386
            return 'done'
330
387
        
331
388
        if time_check == 0:
437
494
        logger.warning("""This cluster didn't support job removal, 
438
495
    the jobs are still running on the cluster.""")
439
496
 
 
497
 
 
498
class Packet(object):
 
499
    """ an object for handling packet of job, it is designed to be thread safe
 
500
    """
 
501
 
 
502
    def __init__(self, name, fct, args, opts={}):
 
503
        import Queue
 
504
        import threading
 
505
        self.queue = Queue.Queue()
 
506
        self.tag = name
 
507
        self.fct = fct
 
508
        self.args = args
 
509
        self.opts = opts
 
510
        self.done = threading.Event()
 
511
 
 
512
    def put(self, *args, **opts):
 
513
        self.queue.put(*args, **opts)
 
514
 
 
515
    append = put
 
516
 
 
517
    def remove_one(self):
 
518
        self.queue.get(True)
 
519
        self.queue.task_done()
 
520
        return self.queue.qsize()
 
521
        
440
522
class MultiCore(Cluster):
441
 
    """ class for dealing with the submission in multiple node"""
442
 
    
443
 
    job_id = '$'
444
 
    
 
523
    """class for dealing with the submission in multiple node"""
 
524
 
 
525
    job_id = "$"
 
526
 
445
527
    def __init__(self, *args, **opt):
446
 
        """Init the cluster"""
447
 
        import thread
 
528
        """Init the cluster """
 
529
        
 
530
        
448
531
        super(MultiCore, self).__init__(self, *args, **opt)
449
532
        
450
 
        
451
 
        self.submitted = 0
452
 
        self.finish = 0
 
533
        import Queue
 
534
        import threading
 
535
        import thread
 
536
        self.queue = Queue.Queue() # list of job to do
 
537
        self.done = Queue.Queue()  # list of job finisned
 
538
        self.submitted = Queue.Queue() # one entry by job submitted
 
539
        self.stoprequest = threading.Event() #flag to ensure everything to close
 
540
        self.demons = []
 
541
        self.nb_done =0
453
542
        if 'nb_core' in opt:
454
543
            self.nb_core = opt['nb_core']
455
544
        elif isinstance(args[0],int):
458
547
            self.nb_core = 1
459
548
        self.update_fct = None
460
549
        
461
 
        # initialize the thread controler
462
 
        self.need_waiting = False
463
 
        self.nb_used = 0
464
 
        self.lock = thread.allocate_lock()
465
 
        self.done = 0 
466
 
        self.waiting_submission = []
467
 
        self.pids = []
 
550
        self.lock = threading.Event() # allow nice lock of the main thread
 
551
        self.pids = Queue.Queue() # allow to clean jobs submit via subprocess
 
552
        self.done_pid = []  # list of job finisned
 
553
        self.done_pid_queue = Queue.Queue()
468
554
        self.fail_msg = None
 
555
 
 
556
        # starting the worker node
 
557
        for _ in range(self.nb_core):
 
558
            self.start_demon()
 
559
 
 
560
        
 
561
    def start_demon(self):
 
562
        import threading
 
563
        t = threading.Thread(target=self.worker)
 
564
        t.daemon = True
 
565
        t.start()
 
566
        self.demons.append(t)
 
567
 
 
568
 
 
569
    def worker(self):
 
570
        import Queue
 
571
        import thread
 
572
        while not self.stoprequest.isSet():
 
573
            try:
 
574
                args = self.queue.get()
 
575
                tag, exe, arg, opt = args
 
576
                try:
 
577
                    # check for executable case
 
578
                    if isinstance(exe,str):
 
579
                        if os.path.exists(exe) and not exe.startswith('/'):
 
580
                            exe = './' + exe
 
581
                        if opt['stderr'] == None:
 
582
                            opt['stderr'] = subprocess.STDOUT
 
583
                        proc = misc.Popen([exe] + arg,  **opt)
 
584
                        pid = proc.pid
 
585
                        self.pids.put(pid)
 
586
                        proc.wait()
 
587
                        if proc.returncode not in [0, 143, -15] and not self.stoprequest.isSet():
 
588
                            fail_msg = 'program %s launch ends with non zero status: %s. Stop all computation' % \
 
589
                            (' '.join([exe]+arg), proc.returncode)
 
590
                            logger.warning(fail_msg)
 
591
                            self.stoprequest.set()
 
592
                            self.remove(fail_msg)
 
593
                    # handle the case when this is a python function. Note that
 
594
                    # this use Thread so they are NO built-in parralelization this is 
 
595
                    # going to work on a single core! (but this is fine for IO intensive 
 
596
                    # function. for CPU intensive fct this will slow down the computation
 
597
                    else:
 
598
                        pid = tag
 
599
                        self.pids.put(pid)
 
600
                        # the function should return 0 if everything is fine
 
601
                        # the error message otherwise
 
602
                        returncode = exe(*arg, **opt)
 
603
                        if returncode != 0:
 
604
                            logger.warning("fct %s does not return 0", exe)
 
605
                            self.stoprequest.set()
 
606
                            self.remove("fct %s does not return 0" % exe)
 
607
                except Exception,error:
 
608
                    self.fail_msg = sys.exc_info()
 
609
                    logger.warning(str(error))
 
610
                    self.stoprequest.set()
 
611
                    self.remove(error)
 
612
                    
 
613
                    if __debug__:
 
614
                        raise self.fail_msg[0], self.fail_msg[1],self.fail_msg[2]
 
615
 
 
616
                self.queue.task_done()
 
617
                self.done.put(tag)
 
618
                self.done_pid_queue.put(pid)
 
619
                #release the mother to print the status on the screen
 
620
                try:
 
621
                    self.lock.set()
 
622
                except thread.error:
 
623
                    continue
 
624
            except Queue.Empty:
 
625
                continue
 
626
            
 
627
            
 
628
            
 
629
    
 
630
    def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None,
 
631
               log=None, required_output=[], nb_submit=0):
 
632
        """submit a job on multicore machine"""
 
633
        
 
634
        tag = (prog, tuple(argument), cwd, nb_submit)
 
635
        if isinstance(prog, str):
 
636
            
 
637
    
 
638
            opt = {'cwd': cwd, 
 
639
                   'stdout':stdout,
 
640
                   'stderr': stderr}
 
641
            self.queue.put((tag, prog, argument, opt))                                                                                                                                
 
642
            self.submitted.put(1)
 
643
            return tag
 
644
        else:
 
645
            # python function
 
646
            self.queue.put((tag, prog, argument, {}))
 
647
            self.submitted.put(1)
 
648
            return tag            
469
649
        
470
650
    def launch_and_wait(self, prog, argument=[], cwd=None, stdout=None, 
471
651
                                stderr=None, log=None, **opts):
475
655
        if isinstance(stderr, str):
476
656
            stdout = open(stderr, 'w')        
477
657
        return misc.call([prog] + argument, stdout=stdout, stderr=stderr, cwd=cwd) 
478
 
    
479
 
    
480
 
    def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None,
481
 
               log=None, required_output=[], nb_submit=0):
482
 
        """submit a job on multicore machine"""
483
 
        
484
 
        self.submitted +=1
485
 
        if cwd is None:
486
 
            cwd = os.getcwd()
487
 
        if isinstance(prog, str):
488
 
            if not os.path.exists(prog) and not misc.which(prog):
489
 
                prog = os.path.join(cwd, prog)
490
 
        
491
 
        import thread
492
 
        if self.waiting_submission or self.nb_used == self.nb_core:
493
 
            self.waiting_submission.append((prog, argument,cwd, stdout))
494
 
            # check that none submission is already finished
495
 
            while self.nb_used <  self.nb_core and self.waiting_submission:
496
 
                arg = self.waiting_submission.pop(0)
497
 
                self.nb_used += 1 # udpate the number of running thread
498
 
                thread.start_new_thread(self.launch, arg)              
499
 
        elif self.nb_used <  self.nb_core -1:
500
 
            self.nb_used += 1 # upate the number of running thread
501
 
            thread.start_new_thread(self.launch, (prog, argument, cwd, stdout))
502
 
        elif self.nb_used ==  self.nb_core -1:
503
 
            self.nb_used += 1 # upate the number of running thread            
504
 
            thread.start_new_thread(self.launch, (prog, argument, cwd, stdout))
505
 
        
506
 
        
507
 
    def launch(self, exe, argument, cwd, stdout):
508
 
        """ way to launch for multicore. If exe is a string then treat it as
509
 
        an executable. Otherwise treat it as a function"""
510
 
        import thread
511
 
        def end(self, pid):
512
 
            self.nb_used -= 1
513
 
            self.done += 1
514
 
            try:
515
 
                self.pids.remove(pid)
516
 
            except:
517
 
                pass
518
 
            
519
 
        fail_msg = None
520
 
        try:  
521
 
            if isinstance(exe,str):
522
 
                if os.path.exists(exe) and not exe.startswith('/'):
523
 
                    exe = './' + exe
524
 
                proc = misc.Popen([exe] + argument, cwd=cwd, stdout=stdout, 
525
 
                                                               stderr=subprocess.STDOUT)
526
 
                pid = proc.pid
527
 
                self.pids.append(pid)
528
 
                proc.wait()
529
 
                if proc.returncode not in [0, 143, -15]:
530
 
                    fail_msg = 'program %s launch ends with non zero status: %s. Stop all computation' % \
531
 
                            (' '.join([exe]+argument), proc.returncode)
532
 
                    #self.fail_msg = fail_msg
533
 
                    logger.warning(fail_msg)
534
 
                    try:
535
 
                        log = open(glob.glob(pjoin(cwd,'*','log.txt'))[0]).read()
536
 
                        logger.warning('Last 15 lines of logfile %s:\n%s\n' % \
537
 
                                (pjoin(cwd,'*','log.txt'), '\n'.join(log.split('\n')[-15:-1]) + '\n'))
538
 
                    except (IOError, AttributeError, IndexError):
539
 
                        logger.warning('Please look for possible logfiles in %s' % cwd)
540
 
                        pass
541
 
                    self.remove(fail_msg)
542
 
            else:
543
 
                pid = tuple([id(o) for o in [exe] + argument])
544
 
                self.pids.append(pid)
545
 
                # the function should return 0 if everything is fine
546
 
                # the error message otherwise
547
 
                returncode = exe(argument)
548
 
                if returncode != 0:
549
 
                    logger.warning(returncode)
550
 
                    self.remove()
551
 
 
552
 
 
553
 
            
554
 
            # release the lock for allowing to launch the next job
555
 
            security = 0       
556
 
            # check that the status is locked to avoid coincidence unlock
557
 
            while 1:
558
 
                while not self.lock.locked():
559
 
                    if not self.need_waiting:
560
 
                        # Main is not yet locked
561
 
                        end(self, pid)
562
 
                        return
563
 
                    elif security > 60:
564
 
                        end(self, pid)
565
 
                        return 
566
 
                    security += 1
567
 
                    time.sleep(1)
568
 
                try:
569
 
                    self.lock.release()
570
 
                except thread.error:
571
 
                    continue
572
 
                break
573
 
            end(self, pid)
574
 
 
575
 
 
576
 
        except Exception, error:
577
 
            #logger.critical('one core fails with %s' % error)
578
 
            self.remove()
579
 
            raise
580
 
 
581
 
            
582
 
          
583
 
 
584
 
    def wait(self, me_dir, update_status):
585
 
        """Wait that all thread finish
586
 
        self.nb_used and self.done are update via each jobs (thread and local)
587
 
        self.submitted is the nb of times that submitted has been call (local)
588
 
        remaining is the nb of job that we still have to wait. (local)
589
 
        self.pids is the list of the BASH pid of the submitted jobs. (thread)
590
 
        
591
 
        WARNING: In principle all those value are coherent but since some are
592
 
        modified in various thread, those data can be corrupted. (not the local 
593
 
        one). Nb_used in particular shouldn't be trusted too much.
594
 
        This code check in different ways that all jobs have finished.
595
 
 
596
 
        In principle, the statement related to  '#security #X' are not used.
597
 
        In practise they are times to times.
598
 
        """
599
 
        
600
 
        import thread
601
 
 
602
 
        remaining = self.submitted - self.done
603
 
 
604
 
        while self.nb_used < self.nb_core:
605
 
            if self.waiting_submission:
606
 
                arg = self.waiting_submission.pop(0)
607
 
                thread.start_new_thread(self.launch, arg)
608
 
                self.nb_used += 1 # update the number of running thread
609
 
            else:
610
 
                break
611
 
                    
612
 
        try:            
613
 
            self.need_waiting = True
614
 
            self.lock.acquire()
615
 
            no_in_queue = 0
616
 
            secure_mode = False # forbid final acauire if in securemode
617
 
            while self.waiting_submission or self.nb_used:
618
 
                if self.fail_msg:
619
 
                    msg,  self.fail_msg = self.fail_msg, None
620
 
                    self.remove()
621
 
                    raise Exception, msg
622
 
                if update_status:
623
 
                    update_status(len(self.waiting_submission), self.nb_used, self.done)
624
 
                # security#1 that all job expected to be launched since 
625
 
                # we enter in this function are indeed launched.
626
 
                if len(self.waiting_submission) == 0 == remaining :
627
 
                    self.done = self.submitted
628
 
                    break
629
 
                
630
 
                # security #2: nb_used >0 but nothing remains as BASH PID
631
 
                if len(self.waiting_submission) == 0 and len(self.pids) == 0:
632
 
                    if self.submitted == self.done:
633
 
                        break
634
 
                    logger.debug('Found too many jobs. Recovering')
635
 
                    no_in_queue += 1
636
 
                    time.sleep(min(180, 5 * no_in_queue))
637
 
                    if no_in_queue > 3:
638
 
                        logger.debug('Still too many jobs. Continue')
639
 
                        break
640
 
                    continue
641
 
                
642
 
                # security #3: if nb_used not reliable pass in secure mode
643
 
                if not secure_mode and len(self.waiting_submission) != 0:
644
 
                    if self.nb_used != self.nb_core:
645
 
                        if self.nb_used != len(self.pids):
646
 
                            secure_mode = True
647
 
                # security #4: nb_used not reliable use secure mode to finish the run
648
 
                if secure_mode and not self.waiting_submission:
649
 
                    self.need_waiting = False
650
 
                    if self.lock.locked():
651
 
                        self.lock.release()
652
 
                    break
653
 
                
654
 
                # Wait for core to finish               
655
 
                self.lock.acquire()
656
 
                remaining -=1    # update remaining job
657
 
                #submit next one
658
 
                if self.waiting_submission:
659
 
                    arg = self.waiting_submission.pop(0)
660
 
                    thread.start_new_thread(self.launch, arg)
661
 
                    self.nb_used += 1 # update the number of running thread
662
 
 
663
 
            if self.fail_msg:
664
 
                msg,  self.fail_msg = self.fail_msg, None
665
 
                self.remove()
666
 
                raise Exception, msg            
667
 
            # security #5: checked that self.nb_used is not lower than expected
668
 
            #This is the most current problem.
669
 
            no_in_queue = 0
670
 
            while self.submitted > self.done:
671
 
                if self.fail_msg:
672
 
                    msg,  self.fail_msg = self.fail_msg, None
673
 
                    self.remove()
674
 
                    raise Exception, msg
675
 
                if no_in_queue == 0:
676
 
                    logger.debug('Some jobs have been lost. Try to recover')
677
 
                #something bad happens
678
 
                if not len(self.pids):
679
 
                    # The job is not running 
680
 
                    logger.critical('Some jobs have been lost in the multicore treatment.')
681
 
                    logger.critical('The results might be incomplete. (Trying to continue anyway)')
682
 
                    break
683
 
                elif update_status:
684
 
                    update_status(len(self.waiting_submission), len(self.pids) ,
685
 
                                                                      self.done)
686
 
                # waiting that those jobs ends.
687
 
                if not secure_mode:
688
 
                    self.lock.acquire()
689
 
                else:
690
 
                    no_in_queue += 1
691
 
                    try:
692
 
                        time.sleep(min(180,5*no_in_queue))
693
 
                        if no_in_queue > 5 * 3600.0 / 162:
694
 
                            break
695
 
                    except KeyboardInterrupt:
696
 
                        logger.warning('CTRL-C assumes that all jobs are done. Continue the code')
697
 
                        self.pids = [] # avoid security 6
698
 
                        break
699
 
                    
700
 
            # security #6. check that queue is empty. don't
701
 
            no_in_queue = 0
702
 
            while len(self.pids):
703
 
                if self.fail_msg:
704
 
                    msg,  self.fail_msg = self.fail_msg, None
705
 
                    self.remove()
706
 
                    raise Exception, msg
707
 
                self.need_waiting = False
708
 
                if self.lock.locked():
709
 
                        self.lock.release()
710
 
                secure_mode = True
711
 
                if no_in_queue == 0 : 
712
 
                    logger.warning('Some jobs have been lost. Try to recover.')
713
 
                    logger.warning('Hitting ctrl-c will consider that all jobs are done and continue the code.')
714
 
                try:
715
 
                    #something very bad happens
716
 
                    if update_status:
717
 
                        update_status(len(self.waiting_submission), len(self.pids) ,
718
 
                                                                      self.done)
719
 
                    time.sleep(min(5*no_in_queue, 180))
720
 
                    no_in_queue += 1
721
 
                    if no_in_queue > 5 * 3600.0 / 162:
722
 
                            break
723
 
                except KeyboardInterrupt:
724
 
                    break
725
 
                
726
 
            # print a last time the status (forcing 0 for the running)  
727
 
            if update_status:
728
 
                self.next_update = 0
729
 
                update_status(len(self.waiting_submission), 0, self.done)             
730
 
            
731
 
            # reset variable for next submission
732
 
            self.need_waiting = False
733
 
            security = 0 
734
 
            while not self.lock.locked() and security < 10:
735
 
                # check that the status is locked to avoid coincidence unlock
736
 
                if secure_mode:
737
 
                    security = 10
738
 
                security +=1
739
 
                time.sleep(1)
740
 
            if security < 10:
741
 
                self.lock.release()
742
 
            self.done = 0
743
 
            self.nb_used = 0
744
 
            self.submitted = 0
745
 
            self.pids = []
746
 
            
747
 
        except KeyboardInterrupt:
748
 
            self.remove()
749
 
            raise
750
 
        if self.fail_msg:
751
 
            msg,  self.fail_msg = self.fail_msg, None
752
 
            self.remove()
753
 
            raise Exception, msg 
754
 
        
755
 
            
 
658
 
756
659
    def remove(self, error=None):
757
660
        """Ensure that all thread are killed"""
758
 
        logger.info('remove job currently running')
759
 
        self.waiting_submission = []
760
 
        if error:
 
661
        
 
662
        # ensure the worker to stop
 
663
        self.stoprequest.set()
 
664
        if error and not self.fail_msg:
761
665
            self.fail_msg = error
762
 
        for pid in list(self.pids):
 
666
            
 
667
        # cleaning the queue done_pid_queue and move them to done_pid        
 
668
        while not self.done_pid_queue.empty():
 
669
            pid = self.done_pid_queue.get()
 
670
            self.done_pid.append(pid)
 
671
#            self.done_pid_queue.task_done()
 
672
 
 
673
        while not self.pids.empty():
 
674
            pid = self.pids.get()
 
675
            self.pids.task_done()
763
676
            if isinstance(pid, tuple):
764
677
                continue
 
678
            if pid in self.done_pid:
 
679
                continue
765
680
            out = os.system('CPIDS=$(pgrep -P %(pid)s); kill -15 $CPIDS > /dev/null 2>&1' \
766
681
                            % {'pid':pid} )
767
682
            out = os.system('kill -15 %(pid)s > /dev/null 2>&1' % {'pid':pid} )            
768
 
            if out == 0:
769
 
                try:
770
 
                    self.pids.remove(pid)
771
 
                except:
772
 
                    pass
773
 
            #out = os.system('kill -9 %s &> /dev/null' % pid)
774
 
 
775
 
        time.sleep(1) # waiting if some were submitting at the time of ctrl-c
776
 
        for pid in list(self.pids):
777
 
            if isinstance(pid, tuple):
778
 
                continue
779
 
            out = os.system('CPIDS=$(pgrep -P %s); kill -15 $CPIDS > /dev/null 2>&1' % pid )
780
 
            out = os.system('kill -15 %(pid)s > /dev/null 2>&1' % {'pid':pid} ) 
781
 
            if out == 0:
782
 
                try:
783
 
                    self.pids.remove(pid)
784
 
                except:
785
 
                    pass
786
 
                    
 
683
 
 
684
 
 
685
    def wait(self, me_dir, update_status, update_first=None):
 
686
        """Waiting that all the jobs are done. This function also control that
 
687
        the submission by packet are handle correctly (i.e. submit the function)"""
 
688
        import Queue
 
689
        import threading
 
690
 
 
691
        last_status = (0, 0, 0)
 
692
        sleep_time = 1
 
693
        use_lock = True
 
694
        first = True
 
695
        while True:
 
696
            force_one_more_loop = False # some security
 
697
                        
 
698
            # Loop over the job tagged as done to check if some packet of jobs
 
699
            # are finished in case, put the associate function in the queue
 
700
            while self.done.qsize():
 
701
                try:
 
702
                    tag = self.done.get(True, 1)
 
703
                except Queue.Empty:
 
704
                    pass
 
705
                else:
 
706
                    if self.id_to_packet and tuple(tag) in self.id_to_packet:
 
707
                        packet = self.id_to_packet[tuple(tag)]
 
708
                        remaining = packet.remove_one()
 
709
                        if remaining == 0:
 
710
                            # fully ensure that the packet is finished (thread safe)
 
711
                            packet.queue.join()
 
712
                            self.submit(packet.fct, packet.args)
 
713
                            force_one_more_loop = True
 
714
                    self.nb_done += 1
 
715
                    self.done.task_done()
 
716
 
 
717
            # Get from the various queue the Idle/Done/Running information 
 
718
            # Those variable should be thread safe but approximate.
 
719
            Idle = self.queue.qsize()
 
720
            Done = self.nb_done + self.done.qsize()
 
721
            Running = max(0, self.submitted.qsize() - Idle - Done) 
 
722
                       
 
723
            if Idle + Running <= 0 and not force_one_more_loop:
 
724
                update_status(Idle, Running, Done)
 
725
                # Going the quit since everything is done
 
726
                # Fully Ensure that everything is indeed done.
 
727
                self.queue.join()
 
728
                break
 
729
            
 
730
            if (Idle, Running, Done) != last_status:
 
731
                if first and update_first:
 
732
                    update_first(Idle, Running, Done)
 
733
                    first = False
 
734
                else:
 
735
                    update_status(Idle, Running, Done)
 
736
                last_status = (Idle, Running, Done)
 
737
            
 
738
            # cleaning the queue done_pid_queue and move them to done_pid
 
739
            while not self.done_pid_queue.empty():
 
740
                pid = self.done_pid_queue.get()
 
741
                self.done_pid.append(pid)
 
742
                self.done_pid_queue.task_done()
 
743
                     
 
744
                
 
745
            # Define how to wait for the next iteration
 
746
            if use_lock:
 
747
                # simply wait that a worker release the lock
 
748
                use_lock = self.lock.wait(300)
 
749
                self.lock.clear()
 
750
                if not use_lock and Idle > 0:
 
751
                    use_lock = True
 
752
            else:
 
753
                # to be sure that we will never fully lock at the end pass to 
 
754
                # a simple time.sleep()
 
755
                time.sleep(sleep_time)
 
756
                sleep_time = min(sleep_time + 2, 180)
 
757
        if update_first:
 
758
            update_first(Idle, Running, Done)
 
759
        
 
760
        if self.stoprequest.isSet():
 
761
            if isinstance(self.fail_msg, Exception):
 
762
                raise self.fail_msg
 
763
            elif isinstance(self.fail_msg, str):
 
764
                raise Exception, self.fail_msg
 
765
            else:
 
766
                raise self.fail_msg[0], self.fail_msg[1], self.fail_msg[2]
 
767
        # reset variable for next submission
 
768
        try:
 
769
            self.lock.clear()
 
770
        except Exception:
 
771
            pass
 
772
        self.done = Queue.Queue()
 
773
        self.done_pid = []
 
774
        self.done_pid_queue = Queue.Queue()
 
775
        self.nb_done = 0
 
776
        self.submitted = Queue.Queue()
 
777
        self.pids = Queue.Queue()
 
778
        self.stoprequest.clear()
 
779
 
787
780
class CondorCluster(Cluster):
788
781
    """Basic class for dealing with cluster submission"""
789
782
    
1268
1261
 
1269
1262
        me_dir = self.get_jobs_identifier(me_dir)
1270
1263
 
 
1264
        finished = list(self.submitted_ids)
 
1265
 
1271
1266
        idle, run, fail = 0, 0, 0
1272
1267
        for line in status.stdout:
1273
1268
            if me_dir in line:
1274
 
                status = line.split()[4]
 
1269
                id,_,_,_,status = line.split()[:5]
1275
1270
                if status in self.idle_tag:
1276
1271
                    idle += 1
 
1272
                    finished.remove(id)
1277
1273
                elif status in self.running_tag:
1278
1274
                    run += 1
 
1275
                    finished.remove(id)
1279
1276
                else:
1280
1277
                    logger.debug(line)
1281
1278
                    fail += 1
 
1279
                    finished.remove(id)
 
1280
 
 
1281
        for id in finished:
 
1282
            self.check_termination(id)
1282
1283
 
1283
1284
        return idle, run, self.submitted - (idle+run+fail), fail
1284
1285
 
1554
1555
    mc = MultiCore(1)
1555
1556
    mc.submit(exe, argument, cwd, stdout, **opt)
1556
1557
    mc.need_waiting = True
1557
 
    mc.lock.acquire()
1558
1558
    return mc.lock
1559
1559
 
1560
1560