2
from scipy_base.fastumath import *
4
import sys, thread, sync
18
#I've got to lean up evaluate and initial in population so that
20
#the incorporation of the parallel stuff is smoother.
26
import sys, thread, sync
34
for i in range(len(x.flat)):
36
y[i] = int(round(x[i]))
42
def divide_list(l,sections):
46
Nsec = float(sections)
50
div_points = array_round(arange(0,Ntot,Neach)).tolist()
52
if div_points[-1] != Ntot: div_points.append(Ntot)
58
for end in div_points[1:]:
60
sub_pops.append(l[st:end])
68
class parallel_pop_initializer:
70
def evaluate(self,pop,settings = None):
72
#only send the individuals out that need evaluation
76
Nserv = len(pop.server_list)
78
groups = divide_list(pop,Nserv)
80
sys.setcheckinterval(10)
82
finished = sync.event()
84
bar = sync.barrier(Nserv)
86
print '************',len(groups), len(pop.server_list), len(pop)
88
for i in range(len(groups)):
90
inputs = {'sub_pop':groups[i],'settings':settings, 'initializer':pop.initializer}
92
returns = ('sub_pop',)
94
code = 'initializer.evaluate(sub_pop,settings)'
96
data_pack = (inputs,returns,code)
98
server = pop.server_list[i]
100
thread.start_new_thread(remote_thread_init,(bar,finished,server,data_pack))
104
sys.setcheckinterval(10)
106
#what is this? for ind in pop: ind.evaluate(force)
112
def plen(obj): return len(cPickle.dumps(obj,1))
116
class parallel_pop_evaluator:
118
def evaluate(self,pop,force = 0):
122
#print '1',tree.ref()
124
#only send the individuals out that need evaluation
128
_eval_list = pop.data
132
_eval_list = filter(lambda x: not x.evaluated,pop)
134
#print '2',tree.ref()
136
eval_list = pop.clone()
138
#print '3',tree.ref()
140
eval_list.data = _eval_list
144
Nserv = len(pop.server_list)
146
groups = divide_list(eval_list,Nserv)
148
#print '4',tree.ref()
150
sys.setcheckinterval(10)
152
finished = sync.event()
154
bar = sync.barrier(Nserv)
156
#print "EVAL LENGTH!!!", plen(pop.evaluator)
160
print "GROUP LENGTH!!!", plen(groups[0]), len(gr),
162
#print "IND!!!", plen(gr[0]),plen(gr[0].root)
164
#print '4.5',tree.ref()
166
for i in range(len(groups)):
168
inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator, 'force':force}
170
returns = ('sub_pop',)
172
code = 'evaluator.evaluate(sub_pop,force)'
174
data_pack = (inputs,returns,code)
176
server = pop.server_list[i]
178
thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
180
#print '7',tree.ref()
184
sys.setcheckinterval(10)
186
#what is this? for ind in pop: ind.evaluate(force)
190
def evaluate(self,pop,force = 0):
192
#only send the individuals out that need evaluation
194
_eval_list = filter(lambda x: not x.evaluated,pop)
196
eval_list = pop.clone()
198
eval_list.data = _eval_list
202
#finest grain possible
204
groups = divide_list(eval_list,len(eval_list))
206
finished = sync.event()
208
bar = sync.barrier(groups)
212
sys.setcheckinterval(10)
214
Nserv = len(pop.server_list)
218
while idx < len(groups):
220
inputs = {'sub_pop':groups[idx], 'evaluator':pop.evaluator}
222
returns = ('sub_pop',)
224
code = 'evaluator.evaluate(sub_pop)'
226
data_pack = (inputs,returns,code)
228
server = pop.server_list[i]
230
thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
232
#for i in range(len(groups)):
234
# inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator}
236
# returns = ('sub_pop',)
238
# code = 'evaluator.evaluate(sub_pop)'
240
# data_pack = (inputs,returns,code)
242
# server = pop.server_list[i]
244
# thread.start_new_thread(remote_thread,(bar,finished,server,data_pack))
248
sys.setcheckinterval(10)
250
#what is this? for ind in pop: ind.evaluate(force)
256
def remote_thread_init(bar,finished,server,data_pack):
260
remote = remote_exec.remote_exec(server[0],server[1],0,1)
262
results = remote.run(data_pack)
264
#assign the results from the returned data to the local individuals
266
inputs = data_pack[0]
268
old = inputs['sub_pop']
270
new = results['sub_pop']
272
for i in range(len(old)):
274
old[i].__dict__.update(new[i].__dict__)
278
print 'error in %s,%d' % server
286
def remote_thread_eval(bar,finished,server,data_pack):
292
#print '5',tree.ref()
294
remote = remote_exec.remote_exec(server[0],server[1],0,1)
296
results = remote.run(data_pack)
298
#print '6',tree.ref()
300
#assign the results from the returned data to the local individuals
302
inputs = data_pack[0]
304
old = inputs['sub_pop']
306
new = results['sub_pop']
310
gnm.root.delete_circulars()
314
#print '6.25',tree.ref()
316
for i in range(len(old)):
318
old[i].__dict__.update(new[i].__dict__)
322
#print '6.5',tree.ref()
326
print 'error in %s,%d' % server
334
#print 'ref count',sys.getrefcount(r)
336
#print '6.75',tree.ref()
338
#Huh??? Why do I need to delete the new genomes
340
#individually here? Why aren't they garbage collected?
342
indices = range(len(new))
350
#print 'ref count',sys.getrefcount(r)
352
#print '6.8',tree.ref()
354
#r.delete_circulars()
356
#print 'ref count',sys.getrefcount(r)
358
#print '6.9',tree.ref()
362
#print '6.95',tree.ref()
372
class ga_parallel_pop(population.population):
374
parallel_evaluator = parallel_pop_evaluator()
376
parallel_initializer = parallel_pop_initializer()
378
def __init__(self,genome,size=1,server_list=None):
384
genome -- a genome object.
386
size -- number. The population size. The genome will be
388
replicated size times to fill the population.
390
server_list -- a list of tuple pairs with machine names and
392
ports listed for the available servers
394
ex: [(ee.duke.edu,8000),('elsie.ee.duke.edu',8000)]
398
population.population.__init__(self,genome,size)
402
self.server_list = server_list
404
def initialize(self,settings = None):
406
"""This method **must** be called before a genetic algorithm
408
begins evolving the population. It takes care of initializing
410
the individual genomes, evaluating them, and scaling the population.
412
It also clears and intializes the statistics for the population.
420
settings -- dictionary of genetic algorithm parameters. These
422
are passed on to the genomes for initialization.
426
self.stats = {'current':{},'initial':{},'overall':{}}
428
self.stats['ind_evals'] = 0
432
print "beigninning genome generation"
436
self.parallel_initializer.evaluate(self,settings)
440
print "finished generation: ", e-b
450
print "evaluation time: ", e-b
456
self.stats['initial']['avg'] = self.stats['current']['avg']
458
self.stats['initial']['max'] = self.stats['current']['max']
460
self.stats['initial']['min'] = self.stats['current']['min']
462
self.stats['initial']['dev'] = self.stats['current']['dev']
466
def evaluate(self, force = 0):
468
""" call the parallel_evaluator instead of the evaluator directly
472
self.selector.clear()
474
self.parallel_evaluator.evaluate(self,force)
476
#self.post_evaluate()
478
#all of the remaining should be put in post eval...
482
#this is a cluge to get eval count to work correctly
484
preval = self.stats['ind_evals']
488
self.stats['ind_evals'] = self.stats['ind_evals'] + ind.evals
492
print 'evals: ', self.stats['ind_evals'] - preval
500
########################## test stuff ############################
516
def __init__(self,wait=.01):
520
def evaluate(self,genome):
522
time.sleep(self.wait)
524
return sum(genome.array())
528
def test_pop(server_list,size=100,wait=.01):
530
obj = objective(wait)
532
the_gene = gene.float_gene((0,2.5))
534
genome = genome.list_genome(the_gene.replicate(5))
536
genome.evaluator = obj
538
pop = ga_parallel_pop(genome,size,server_list)
540
print '########### awaiting evaluation#############'
544
print ' evaluation done!'
546
print 'best:', pop.best()
548
print 'worst',pop.worst()
554
genome.list_genome.evaluator = objective()
556
gene = gene.float_gene((0,2.5))
558
genome = genome.list_genome(gene.replicate(5))
560
pop = ga_parallel_pop(genome,100,[(host,port),])
566
import parallel_pop,beowulf,os
570
def test_pop2(server_list,size=100,wait=.01):
574
genome = hmm_gnm.make_genome()
576
#pop = ga_parallel_pop(genome,4,server_list)
580
#genome.target = targets[0]
582
pop = ga_parallel_pop(genome,1,server_list)
584
galg = hmm_gnm.class_ga(pop)
586
galg.settings.update({ 'pop_size':6,'gens':2,'p_mutate':.03,
588
'dbase':os.environ['HOME'] + '/all_lift3', 'p_cross':0.9, 'p_replace':.6,
590
'p_deviation': -.001})
596
print '########### awaiting evaluation#############'
600
print ' evaluation done!'
602
print 'best:', pop.best()
604
print 'worst',pop.worst()
612
host = socket.gethostname()
616
server_list = [(host,port),(host,port+1)]
618
for server in server_list:
622
thread.start_new_thread(remote_exec.server,(host,port))
624
thread.start_new_thread(test_pop2,(server_list,))
628
def test2(machines=32,size=100,wait=.01):
634
#requires that servers are started on beowulf 1 and 2.
638
server_list = beowulf.beowulf.servers[:machines]
640
thread.start_new_thread(test_pop,(server_list,size,wait))
642
print 'total time:', time.time()-t1