459
548
self.update_fct = None
461
# initialize the thread controler
462
self.need_waiting = False
464
self.lock = thread.allocate_lock()
466
self.waiting_submission = []
550
self.lock = threading.Event() # allow nice lock of the main thread
551
self.pids = Queue.Queue() # allow to clean jobs submit via subprocess
552
self.done_pid = [] # list of job finisned
553
self.done_pid_queue = Queue.Queue()
468
554
self.fail_msg = None
556
# starting the worker node
557
for _ in range(self.nb_core):
561
def start_demon(self):
563
t = threading.Thread(target=self.worker)
566
self.demons.append(t)
572
while not self.stoprequest.isSet():
574
args = self.queue.get()
575
tag, exe, arg, opt = args
577
# check for executable case
578
if isinstance(exe,str):
579
if os.path.exists(exe) and not exe.startswith('/'):
581
if opt['stderr'] == None:
582
opt['stderr'] = subprocess.STDOUT
583
proc = misc.Popen([exe] + arg, **opt)
587
if proc.returncode not in [0, 143, -15] and not self.stoprequest.isSet():
588
fail_msg = 'program %s launch ends with non zero status: %s. Stop all computation' % \
589
(' '.join([exe]+arg), proc.returncode)
590
logger.warning(fail_msg)
591
self.stoprequest.set()
592
self.remove(fail_msg)
593
# handle the case when this is a python function. Note that
594
# this use Thread so they are NO built-in parralelization this is
595
# going to work on a single core! (but this is fine for IO intensive
596
# function. for CPU intensive fct this will slow down the computation
600
# the function should return 0 if everything is fine
601
# the error message otherwise
602
returncode = exe(*arg, **opt)
604
logger.warning("fct %s does not return 0", exe)
605
self.stoprequest.set()
606
self.remove("fct %s does not return 0" % exe)
607
except Exception,error:
608
self.fail_msg = sys.exc_info()
609
logger.warning(str(error))
610
self.stoprequest.set()
614
raise self.fail_msg[0], self.fail_msg[1],self.fail_msg[2]
616
self.queue.task_done()
618
self.done_pid_queue.put(pid)
619
#release the mother to print the status on the screen
630
def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None,
631
log=None, required_output=[], nb_submit=0):
632
"""submit a job on multicore machine"""
634
tag = (prog, tuple(argument), cwd, nb_submit)
635
if isinstance(prog, str):
641
self.queue.put((tag, prog, argument, opt))
642
self.submitted.put(1)
646
self.queue.put((tag, prog, argument, {}))
647
self.submitted.put(1)
470
650
def launch_and_wait(self, prog, argument=[], cwd=None, stdout=None,
471
651
stderr=None, log=None, **opts):
475
655
if isinstance(stderr, str):
476
656
stdout = open(stderr, 'w')
477
657
return misc.call([prog] + argument, stdout=stdout, stderr=stderr, cwd=cwd)
480
def submit(self, prog, argument=[], cwd=None, stdout=None, stderr=None,
481
log=None, required_output=[], nb_submit=0):
482
"""submit a job on multicore machine"""
487
if isinstance(prog, str):
488
if not os.path.exists(prog) and not misc.which(prog):
489
prog = os.path.join(cwd, prog)
492
if self.waiting_submission or self.nb_used == self.nb_core:
493
self.waiting_submission.append((prog, argument,cwd, stdout))
494
# check that none submission is already finished
495
while self.nb_used < self.nb_core and self.waiting_submission:
496
arg = self.waiting_submission.pop(0)
497
self.nb_used += 1 # udpate the number of running thread
498
thread.start_new_thread(self.launch, arg)
499
elif self.nb_used < self.nb_core -1:
500
self.nb_used += 1 # upate the number of running thread
501
thread.start_new_thread(self.launch, (prog, argument, cwd, stdout))
502
elif self.nb_used == self.nb_core -1:
503
self.nb_used += 1 # upate the number of running thread
504
thread.start_new_thread(self.launch, (prog, argument, cwd, stdout))
507
def launch(self, exe, argument, cwd, stdout):
508
""" way to launch for multicore. If exe is a string then treat it as
509
an executable. Otherwise treat it as a function"""
515
self.pids.remove(pid)
521
if isinstance(exe,str):
522
if os.path.exists(exe) and not exe.startswith('/'):
524
proc = misc.Popen([exe] + argument, cwd=cwd, stdout=stdout,
525
stderr=subprocess.STDOUT)
527
self.pids.append(pid)
529
if proc.returncode not in [0, 143, -15]:
530
fail_msg = 'program %s launch ends with non zero status: %s. Stop all computation' % \
531
(' '.join([exe]+argument), proc.returncode)
532
#self.fail_msg = fail_msg
533
logger.warning(fail_msg)
535
log = open(glob.glob(pjoin(cwd,'*','log.txt'))[0]).read()
536
logger.warning('Last 15 lines of logfile %s:\n%s\n' % \
537
(pjoin(cwd,'*','log.txt'), '\n'.join(log.split('\n')[-15:-1]) + '\n'))
538
except (IOError, AttributeError, IndexError):
539
logger.warning('Please look for possible logfiles in %s' % cwd)
541
self.remove(fail_msg)
543
pid = tuple([id(o) for o in [exe] + argument])
544
self.pids.append(pid)
545
# the function should return 0 if everything is fine
546
# the error message otherwise
547
returncode = exe(argument)
549
logger.warning(returncode)
554
# release the lock for allowing to launch the next job
556
# check that the status is locked to avoid coincidence unlock
558
while not self.lock.locked():
559
if not self.need_waiting:
560
# Main is not yet locked
576
except Exception, error:
577
#logger.critical('one core fails with %s' % error)
584
def wait(self, me_dir, update_status):
585
"""Wait that all thread finish
586
self.nb_used and self.done are update via each jobs (thread and local)
587
self.submitted is the nb of times that submitted has been call (local)
588
remaining is the nb of job that we still have to wait. (local)
589
self.pids is the list of the BASH pid of the submitted jobs. (thread)
591
WARNING: In principle all those value are coherent but since some are
592
modified in various thread, those data can be corrupted. (not the local
593
one). Nb_used in particular shouldn't be trusted too much.
594
This code check in different ways that all jobs have finished.
596
In principle, the statement related to '#security #X' are not used.
597
In practise they are times to times.
602
remaining = self.submitted - self.done
604
while self.nb_used < self.nb_core:
605
if self.waiting_submission:
606
arg = self.waiting_submission.pop(0)
607
thread.start_new_thread(self.launch, arg)
608
self.nb_used += 1 # update the number of running thread
613
self.need_waiting = True
616
secure_mode = False # forbid final acauire if in securemode
617
while self.waiting_submission or self.nb_used:
619
msg, self.fail_msg = self.fail_msg, None
623
update_status(len(self.waiting_submission), self.nb_used, self.done)
624
# security#1 that all job expected to be launched since
625
# we enter in this function are indeed launched.
626
if len(self.waiting_submission) == 0 == remaining :
627
self.done = self.submitted
630
# security #2: nb_used >0 but nothing remains as BASH PID
631
if len(self.waiting_submission) == 0 and len(self.pids) == 0:
632
if self.submitted == self.done:
634
logger.debug('Found too many jobs. Recovering')
636
time.sleep(min(180, 5 * no_in_queue))
638
logger.debug('Still too many jobs. Continue')
642
# security #3: if nb_used not reliable pass in secure mode
643
if not secure_mode and len(self.waiting_submission) != 0:
644
if self.nb_used != self.nb_core:
645
if self.nb_used != len(self.pids):
647
# security #4: nb_used not reliable use secure mode to finish the run
648
if secure_mode and not self.waiting_submission:
649
self.need_waiting = False
650
if self.lock.locked():
654
# Wait for core to finish
656
remaining -=1 # update remaining job
658
if self.waiting_submission:
659
arg = self.waiting_submission.pop(0)
660
thread.start_new_thread(self.launch, arg)
661
self.nb_used += 1 # update the number of running thread
664
msg, self.fail_msg = self.fail_msg, None
667
# security #5: checked that self.nb_used is not lower than expected
668
#This is the most current problem.
670
while self.submitted > self.done:
672
msg, self.fail_msg = self.fail_msg, None
676
logger.debug('Some jobs have been lost. Try to recover')
677
#something bad happens
678
if not len(self.pids):
679
# The job is not running
680
logger.critical('Some jobs have been lost in the multicore treatment.')
681
logger.critical('The results might be incomplete. (Trying to continue anyway)')
684
update_status(len(self.waiting_submission), len(self.pids) ,
686
# waiting that those jobs ends.
692
time.sleep(min(180,5*no_in_queue))
693
if no_in_queue > 5 * 3600.0 / 162:
695
except KeyboardInterrupt:
696
logger.warning('CTRL-C assumes that all jobs are done. Continue the code')
697
self.pids = [] # avoid security 6
700
# security #6. check that queue is empty. don't
702
while len(self.pids):
704
msg, self.fail_msg = self.fail_msg, None
707
self.need_waiting = False
708
if self.lock.locked():
711
if no_in_queue == 0 :
712
logger.warning('Some jobs have been lost. Try to recover.')
713
logger.warning('Hitting ctrl-c will consider that all jobs are done and continue the code.')
715
#something very bad happens
717
update_status(len(self.waiting_submission), len(self.pids) ,
719
time.sleep(min(5*no_in_queue, 180))
721
if no_in_queue > 5 * 3600.0 / 162:
723
except KeyboardInterrupt:
726
# print a last time the status (forcing 0 for the running)
729
update_status(len(self.waiting_submission), 0, self.done)
731
# reset variable for next submission
732
self.need_waiting = False
734
while not self.lock.locked() and security < 10:
735
# check that the status is locked to avoid coincidence unlock
747
except KeyboardInterrupt:
751
msg, self.fail_msg = self.fail_msg, None
756
659
def remove(self, error=None):
757
660
"""Ensure that all thread are killed"""
758
logger.info('remove job currently running')
759
self.waiting_submission = []
662
# ensure the worker to stop
663
self.stoprequest.set()
664
if error and not self.fail_msg:
761
665
self.fail_msg = error
762
for pid in list(self.pids):
667
# cleaning the queue done_pid_queue and move them to done_pid
668
while not self.done_pid_queue.empty():
669
pid = self.done_pid_queue.get()
670
self.done_pid.append(pid)
671
# self.done_pid_queue.task_done()
673
while not self.pids.empty():
674
pid = self.pids.get()
675
self.pids.task_done()
763
676
if isinstance(pid, tuple):
678
if pid in self.done_pid:
765
680
out = os.system('CPIDS=$(pgrep -P %(pid)s); kill -15 $CPIDS > /dev/null 2>&1' \
767
682
out = os.system('kill -15 %(pid)s > /dev/null 2>&1' % {'pid':pid} )
770
self.pids.remove(pid)
773
#out = os.system('kill -9 %s &> /dev/null' % pid)
775
time.sleep(1) # waiting if some were submitting at the time of ctrl-c
776
for pid in list(self.pids):
777
if isinstance(pid, tuple):
779
out = os.system('CPIDS=$(pgrep -P %s); kill -15 $CPIDS > /dev/null 2>&1' % pid )
780
out = os.system('kill -15 %(pid)s > /dev/null 2>&1' % {'pid':pid} )
783
self.pids.remove(pid)
685
def wait(self, me_dir, update_status, update_first=None):
686
"""Waiting that all the jobs are done. This function also control that
687
the submission by packet are handle correctly (i.e. submit the function)"""
691
last_status = (0, 0, 0)
696
force_one_more_loop = False # some security
698
# Loop over the job tagged as done to check if some packet of jobs
699
# are finished in case, put the associate function in the queue
700
while self.done.qsize():
702
tag = self.done.get(True, 1)
706
if self.id_to_packet and tuple(tag) in self.id_to_packet:
707
packet = self.id_to_packet[tuple(tag)]
708
remaining = packet.remove_one()
710
# fully ensure that the packet is finished (thread safe)
712
self.submit(packet.fct, packet.args)
713
force_one_more_loop = True
715
self.done.task_done()
717
# Get from the various queue the Idle/Done/Running information
718
# Those variable should be thread safe but approximate.
719
Idle = self.queue.qsize()
720
Done = self.nb_done + self.done.qsize()
721
Running = max(0, self.submitted.qsize() - Idle - Done)
723
if Idle + Running <= 0 and not force_one_more_loop:
724
update_status(Idle, Running, Done)
725
# Going the quit since everything is done
726
# Fully Ensure that everything is indeed done.
730
if (Idle, Running, Done) != last_status:
731
if first and update_first:
732
update_first(Idle, Running, Done)
735
update_status(Idle, Running, Done)
736
last_status = (Idle, Running, Done)
738
# cleaning the queue done_pid_queue and move them to done_pid
739
while not self.done_pid_queue.empty():
740
pid = self.done_pid_queue.get()
741
self.done_pid.append(pid)
742
self.done_pid_queue.task_done()
745
# Define how to wait for the next iteration
747
# simply wait that a worker release the lock
748
use_lock = self.lock.wait(300)
750
if not use_lock and Idle > 0:
753
# to be sure that we will never fully lock at the end pass to
754
# a simple time.sleep()
755
time.sleep(sleep_time)
756
sleep_time = min(sleep_time + 2, 180)
758
update_first(Idle, Running, Done)
760
if self.stoprequest.isSet():
761
if isinstance(self.fail_msg, Exception):
763
elif isinstance(self.fail_msg, str):
764
raise Exception, self.fail_msg
766
raise self.fail_msg[0], self.fail_msg[1], self.fail_msg[2]
767
# reset variable for next submission
772
self.done = Queue.Queue()
774
self.done_pid_queue = Queue.Queue()
776
self.submitted = Queue.Queue()
777
self.pids = Queue.Queue()
778
self.stoprequest.clear()
787
780
class CondorCluster(Cluster):
788
781
"""Basic class for dealing with cluster submission"""