~ubuntu-branches/ubuntu/karmic/python-scipy/karmic

« back to all changes in this revision

Viewing changes to Lib/ga/parallel_pop.py

  • Committer: Bazaar Package Importer
  • Author(s): Daniel T. Chen (new)
  • Date: 2005-03-16 02:15:29 UTC
  • Revision ID: james.westby@ubuntu.com-20050316021529-xrjlowsejs0cijig
Tags: upstream-0.3.2
ImportĀ upstreamĀ versionĀ 0.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from Numeric import *
 
2
from scipy_base.fastumath import *
 
3
 
 
4
import sys, thread, sync
 
5
 
 
6
 
 
7
 
 
8
import remote_exec      
 
9
 
 
10
import population
 
11
 
 
12
 
 
13
 
 
14
"""
 
15
 
 
16
######
 
17
 
 
18
#I've got to lean up evaluate and initial in population so that 
 
19
 
 
20
#the incorporation of the parallel stuff is smoother.
 
21
 
 
22
######
 
23
 
 
24
"""
 
25
 
 
26
import sys, thread, sync
 
27
 
 
28
 
 
29
 
 
30
def array_round(x):
 
31
 
 
32
        y = zeros(shape(x))
 
33
 
 
34
        for i in range(len(x.flat)): 
 
35
 
 
36
                y[i] = int(round(x[i]))
 
37
 
 
38
        return y
 
39
 
 
40
 
 
41
 
 
42
def divide_list(l,sections):
 
43
 
 
44
                Ntot = len(l)
 
45
 
 
46
                Nsec = float(sections)
 
47
 
 
48
                Neach = Ntot/Nsec
 
49
 
 
50
                div_points = array_round(arange(0,Ntot,Neach)).tolist()         
 
51
 
 
52
                if div_points[-1] != Ntot: div_points.append(Ntot)
 
53
 
 
54
                sub_pops = []
 
55
 
 
56
                st = div_points[0]
 
57
 
 
58
                for end in div_points[1:]:
 
59
 
 
60
                        sub_pops.append(l[st:end])
 
61
 
 
62
                        st = end
 
63
 
 
64
                return sub_pops 
 
65
 
 
66
 
 
67
 
 
68
class parallel_pop_initializer:
 
69
 
 
70
        def evaluate(self,pop,settings = None):
 
71
 
 
72
                #only send the individuals out that need evaluation
 
73
 
 
74
                if len(pop):
 
75
 
 
76
                        Nserv = len(pop.server_list)
 
77
 
 
78
                        groups = divide_list(pop,Nserv)
 
79
 
 
80
                        sys.setcheckinterval(10)
 
81
 
 
82
                        finished = sync.event()
 
83
 
 
84
                        bar = sync.barrier(Nserv)
 
85
 
 
86
                        print '************',len(groups), len(pop.server_list), len(pop)
 
87
 
 
88
                        for i in range(len(groups)):
 
89
 
 
90
                                inputs = {'sub_pop':groups[i],'settings':settings, 'initializer':pop.initializer}
 
91
 
 
92
                                returns = ('sub_pop',)
 
93
 
 
94
                                code = 'initializer.evaluate(sub_pop,settings)'
 
95
 
 
96
                                data_pack = (inputs,returns,code)
 
97
 
 
98
                                server = pop.server_list[i]
 
99
 
 
100
                                thread.start_new_thread(remote_thread_init,(bar,finished,server,data_pack))
 
101
 
 
102
                        finished.wait()
 
103
 
 
104
                        sys.setcheckinterval(10)
 
105
 
 
106
#what is this?                  for ind in pop: ind.evaluate(force)
 
107
 
 
108
 
 
109
 
 
110
import cPickle
 
111
 
 
112
def plen(obj): return len(cPickle.dumps(obj,1))
 
113
 
 
114
 
 
115
 
 
116
class parallel_pop_evaluator:
 
117
 
 
118
        def evaluate(self,pop,force = 0):
 
119
 
 
120
                import tree
 
121
 
 
122
                #print '1',tree.ref()
 
123
 
 
124
                #only send the individuals out that need evaluation
 
125
 
 
126
                if force:       
 
127
 
 
128
                        _eval_list = pop.data
 
129
 
 
130
                else:   
 
131
 
 
132
                        _eval_list = filter(lambda x: not x.evaluated,pop)
 
133
 
 
134
                #print '2',tree.ref()   
 
135
 
 
136
                eval_list = pop.clone()
 
137
 
 
138
                #print '3',tree.ref()
 
139
 
 
140
                eval_list.data = _eval_list
 
141
 
 
142
                if len(eval_list):
 
143
 
 
144
                        Nserv = len(pop.server_list)
 
145
 
 
146
                        groups = divide_list(eval_list,Nserv)
 
147
 
 
148
                        #print '4',tree.ref()
 
149
 
 
150
                        sys.setcheckinterval(10)
 
151
 
 
152
                        finished = sync.event()
 
153
 
 
154
                        bar = sync.barrier(Nserv)
 
155
 
 
156
                        #print "EVAL LENGTH!!!", plen(pop.evaluator)
 
157
 
 
158
                        gr = groups[0]
 
159
 
 
160
                        print "GROUP LENGTH!!!", plen(groups[0]), len(gr), 
 
161
 
 
162
                        #print "IND!!!", plen(gr[0]),plen(gr[0].root)
 
163
 
 
164
                        #print '4.5',tree.ref()
 
165
 
 
166
                        for i in range(len(groups)):
 
167
 
 
168
                                inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator, 'force':force}
 
169
 
 
170
                                returns = ('sub_pop',)
 
171
 
 
172
                                code = 'evaluator.evaluate(sub_pop,force)'
 
173
 
 
174
                                data_pack = (inputs,returns,code)
 
175
 
 
176
                                server = pop.server_list[i]
 
177
 
 
178
                                thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
 
179
 
 
180
                        #print '7',tree.ref()   
 
181
 
 
182
                        finished.wait()
 
183
 
 
184
                        sys.setcheckinterval(10)
 
185
 
 
186
#what is this?                  for ind in pop: ind.evaluate(force)
 
187
 
 
188
        """
 
189
 
 
190
        def evaluate(self,pop,force = 0):
 
191
 
 
192
                #only send the individuals out that need evaluation
 
193
 
 
194
                _eval_list = filter(lambda x: not x.evaluated,pop)
 
195
 
 
196
                eval_list = pop.clone()
 
197
 
 
198
                eval_list.data = _eval_list
 
199
 
 
200
                if len(eval_list):
 
201
 
 
202
                        #finest grain possible
 
203
 
 
204
                        groups = divide_list(eval_list,len(eval_list))
 
205
 
 
206
                        finished = sync.event()
 
207
 
 
208
                        bar = sync.barrier(groups)
 
209
 
 
210
                        
 
211
 
 
212
                        sys.setcheckinterval(10)
 
213
 
 
214
                        Nserv = len(pop.server_list)
 
215
 
 
216
                        idx = 0
 
217
 
 
218
                        while idx < len(groups):
 
219
 
 
220
                                inputs = {'sub_pop':groups[idx], 'evaluator':pop.evaluator}
 
221
 
 
222
                                returns = ('sub_pop',)
 
223
 
 
224
                                code = 'evaluator.evaluate(sub_pop)'
 
225
 
 
226
                                data_pack = (inputs,returns,code)
 
227
 
 
228
                                server = pop.server_list[i]
 
229
 
 
230
                                thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
 
231
 
 
232
                        #for i in range(len(groups)):
 
233
 
 
234
                        #       inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator}
 
235
 
 
236
                        #       returns = ('sub_pop',)
 
237
 
 
238
                        #       code = 'evaluator.evaluate(sub_pop)'
 
239
 
 
240
                        #       data_pack = (inputs,returns,code)
 
241
 
 
242
                        #       server = pop.server_list[i]
 
243
 
 
244
                        #       thread.start_new_thread(remote_thread,(bar,finished,server,data_pack))
 
245
 
 
246
                        finished.wait()
 
247
 
 
248
                        sys.setcheckinterval(10)
 
249
 
 
250
#what is this?                  for ind in pop: ind.evaluate(force)
 
251
 
 
252
        """
 
253
 
 
254
 
 
255
 
 
256
def remote_thread_init(bar,finished,server,data_pack):
 
257
 
 
258
        try:
 
259
 
 
260
                remote = remote_exec.remote_exec(server[0],server[1],0,1)
 
261
 
 
262
                results = remote.run(data_pack)
 
263
 
 
264
                #assign the results from the returned data to the local individuals
 
265
 
 
266
                inputs = data_pack[0]
 
267
 
 
268
                old = inputs['sub_pop']
 
269
 
 
270
                new = results['sub_pop']
 
271
 
 
272
                for i in range(len(old)):
 
273
 
 
274
                        old[i].__dict__.update(new[i].__dict__)
 
275
 
 
276
        except IndexError:
 
277
 
 
278
                print 'error in %s,%d' %  server
 
279
 
 
280
        bar.enter()
 
281
 
 
282
        finished.post()
 
283
 
 
284
 
 
285
 
 
286
def remote_thread_eval(bar,finished,server,data_pack):
 
287
 
 
288
        import tree
 
289
 
 
290
        try:
 
291
 
 
292
                #print '5',tree.ref()
 
293
 
 
294
                remote = remote_exec.remote_exec(server[0],server[1],0,1)
 
295
 
 
296
                results = remote.run(data_pack)
 
297
 
 
298
                #print '6',tree.ref()
 
299
 
 
300
                #assign the results from the returned data to the local individuals
 
301
 
 
302
                inputs = data_pack[0]
 
303
 
 
304
                old = inputs['sub_pop']
 
305
 
 
306
                new = results['sub_pop']
 
307
 
 
308
                for gnm in new:
 
309
 
 
310
                        gnm.root.delete_circulars()
 
311
 
 
312
                        del gnm.root
 
313
 
 
314
                #print '6.25',tree.ref()
 
315
 
 
316
                for i in range(len(old)):
 
317
 
 
318
                        old[i].__dict__.update(new[i].__dict__)
 
319
 
 
320
 
 
321
 
 
322
                #print '6.5',tree.ref()
 
323
 
 
324
        except IndexError:
 
325
 
 
326
                print 'error in %s,%d' %  server
 
327
 
 
328
        """
 
329
 
 
330
        import sys
 
331
 
 
332
        #r = new[0].root
 
333
 
 
334
        #print 'ref count',sys.getrefcount(r)
 
335
 
 
336
        #print '6.75',tree.ref()                
 
337
 
 
338
        #Huh??? Why do I need to delete the new genomes
 
339
 
 
340
        #individually here?  Why aren't they garbage collected?
 
341
 
 
342
        indices = range(len(new))
 
343
 
 
344
        indices.reverse()
 
345
 
 
346
        for i in indices:
 
347
 
 
348
                del new[i]
 
349
 
 
350
        #print 'ref count',sys.getrefcount(r)
 
351
 
 
352
        #print '6.8',tree.ref() 
 
353
 
 
354
        #r.delete_circulars()   
 
355
 
 
356
        #print 'ref count',sys.getrefcount(r)
 
357
 
 
358
        #print '6.9',tree.ref() 
 
359
 
 
360
        #del r
 
361
 
 
362
        #print '6.95',tree.ref()        
 
363
 
 
364
        """
 
365
 
 
366
        bar.enter()
 
367
 
 
368
        finished.post()
 
369
 
 
370
 
 
371
 
 
372
class ga_parallel_pop(population.population):
 
373
 
 
374
        parallel_evaluator = parallel_pop_evaluator()
 
375
 
 
376
        parallel_initializer = parallel_pop_initializer()
 
377
 
 
378
        def __init__(self,genome,size=1,server_list=None):
 
379
 
 
380
                """Arguments:
 
381
 
 
382
                
 
383
 
 
384
                   genome -- a genome object.
 
385
 
 
386
                   size -- number.  The population size.  The genome will be 
 
387
 
 
388
                           replicated size times to fill the population.
 
389
 
 
390
                   server_list -- a list of tuple pairs with machine names and
 
391
 
 
392
                                  ports listed for the available servers
 
393
 
 
394
                                  ex: [(ee.duke.edu,8000),('elsie.ee.duke.edu',8000)]        
 
395
 
 
396
                """
 
397
 
 
398
                population.population.__init__(self,genome,size)
 
399
 
 
400
                assert(server_list)
 
401
 
 
402
                self.server_list = server_list
 
403
 
 
404
        def initialize(self,settings = None):
 
405
 
 
406
                """This method **must** be called before a genetic algorithm 
 
407
 
 
408
                   begins evolving the population.  It takes care of initializing
 
409
 
 
410
                   the individual genomes, evaluating them, and scaling the population.
 
411
 
 
412
                   It also clears and intializes the statistics for the population.
 
413
 
 
414
                   
 
415
 
 
416
                   Arguments:
 
417
 
 
418
                
 
419
 
 
420
                   settings -- dictionary of genetic algorithm parameters.  These
 
421
 
 
422
                               are passed on to the genomes for initialization.
 
423
 
 
424
                """     
 
425
 
 
426
                self.stats = {'current':{},'initial':{},'overall':{}}
 
427
 
 
428
                self.stats['ind_evals'] = 0
 
429
 
 
430
 
 
431
 
 
432
                print "beigninning genome generation" 
 
433
 
 
434
                b = time.clock()        
 
435
 
 
436
                self.parallel_initializer.evaluate(self,settings)
 
437
 
 
438
                e = time.clock()        
 
439
 
 
440
                print "finished generation: ", e-b      
 
441
 
 
442
                self.touch(); 
 
443
 
 
444
                b = time.clock()        
 
445
 
 
446
                self.evaluate()
 
447
 
 
448
                e = time.clock()        
 
449
 
 
450
                print "evaluation time: ", e-b  
 
451
 
 
452
                self.scale()
 
453
 
 
454
                self.update_stats()
 
455
 
 
456
                self.stats['initial']['avg'] = self.stats['current']['avg']
 
457
 
 
458
                self.stats['initial']['max'] = self.stats['current']['max']
 
459
 
 
460
                self.stats['initial']['min'] = self.stats['current']['min']
 
461
 
 
462
                self.stats['initial']['dev'] = self.stats['current']['dev']
 
463
 
 
464
        
 
465
 
 
466
        def evaluate(self, force = 0):
 
467
 
 
468
                """ call the parallel_evaluator instead of the evaluator directly
 
469
 
 
470
                """
 
471
 
 
472
                self.selector.clear()
 
473
 
 
474
                self.parallel_evaluator.evaluate(self,force)
 
475
 
 
476
                #self.post_evaluate()
 
477
 
 
478
                #all of the remaining should be put in post eval...
 
479
 
 
480
                self.sort()
 
481
 
 
482
                #this is a cluge to get eval count to work correctly
 
483
 
 
484
                preval = self.stats['ind_evals']
 
485
 
 
486
                for ind in self:  
 
487
 
 
488
                        self.stats['ind_evals'] = self.stats['ind_evals'] + ind.evals
 
489
 
 
490
                        ind.evals = 0           
 
491
 
 
492
                print 'evals: ', self.stats['ind_evals'] - preval
 
493
 
 
494
                self.touch()
 
495
 
 
496
                self.evaluated = 1
 
497
 
 
498
 
 
499
 
 
500
########################## test stuff ############################
 
501
 
 
502
import genome
 
503
 
 
504
import gene
 
505
 
 
506
import time
 
507
 
 
508
 
 
509
 
 
510
import socket                   
 
511
 
 
512
 
 
513
 
 
514
class objective:
 
515
 
 
516
        def __init__(self,wait=.01):
 
517
 
 
518
                self.wait = wait
 
519
 
 
520
        def evaluate(self,genome): 
 
521
 
 
522
                time.sleep(self.wait)
 
523
 
 
524
                return sum(genome.array())
 
525
 
 
526
 
 
527
 
 
528
def test_pop(server_list,size=100,wait=.01):
 
529
 
 
530
        obj = objective(wait)
 
531
 
 
532
        the_gene = gene.float_gene((0,2.5))
 
533
 
 
534
        genome = genome.list_genome(the_gene.replicate(5))
 
535
 
 
536
        genome.evaluator = obj
 
537
 
 
538
        pop = ga_parallel_pop(genome,size,server_list)
 
539
 
 
540
        print  '########### awaiting evaluation#############'
 
541
 
 
542
        pop.initialize()
 
543
 
 
544
        print ' evaluation done!'
 
545
 
 
546
        print 'best:', pop.best()
 
547
 
 
548
        print 'worst',pop.worst()
 
549
 
 
550
 
 
551
 
 
552
def gen_pop():
 
553
 
 
554
        genome.list_genome.evaluator = objective()
 
555
 
 
556
        gene = gene.float_gene((0,2.5))
 
557
 
 
558
        genome = genome.list_genome(gene.replicate(5))
 
559
 
 
560
        pop = ga_parallel_pop(genome,100,[(host,port),])
 
561
 
 
562
        return pop
 
563
 
 
564
 
 
565
 
 
566
        import parallel_pop,beowulf,os
 
567
 
 
568
 
 
569
 
 
570
def test_pop2(server_list,size=100,wait=.01):
 
571
 
 
572
        import hmm_gnm,os
 
573
 
 
574
        genome = hmm_gnm.make_genome()
 
575
 
 
576
        #pop = ga_parallel_pop(genome,4,server_list)
 
577
 
 
578
        global galg
 
579
 
 
580
        #genome.target = targets[0]
 
581
 
 
582
        pop = ga_parallel_pop(genome,1,server_list)
 
583
 
 
584
        galg = hmm_gnm.class_ga(pop)
 
585
 
 
586
        galg.settings.update({ 'pop_size':6,'gens':2,'p_mutate':.03,
 
587
 
 
588
                                    'dbase':os.environ['HOME'] + '/all_lift3', 'p_cross':0.9, 'p_replace':.6,
 
589
 
 
590
                                    'p_deviation': -.001})
 
591
 
 
592
        galg.evolve()
 
593
 
 
594
 
 
595
 
 
596
        print  '########### awaiting evaluation#############'
 
597
 
 
598
        pop.initialize()
 
599
 
 
600
        print ' evaluation done!'
 
601
 
 
602
        print 'best:', pop.best()
 
603
 
 
604
        print 'worst',pop.worst()
 
605
 
 
606
        
 
607
 
 
608
import thread
 
609
 
 
610
def test():
 
611
 
 
612
        host = socket.gethostname()
 
613
 
 
614
        port = 8000
 
615
 
 
616
        server_list = [(host,port),(host,port+1)]
 
617
 
 
618
        for server in server_list:
 
619
 
 
620
                host,port = server
 
621
 
 
622
                thread.start_new_thread(remote_exec.server,(host,port))
 
623
 
 
624
        thread.start_new_thread(test_pop2,(server_list,))               
 
625
 
 
626
        
 
627
 
 
628
def test2(machines=32,size=100,wait=.01):
 
629
 
 
630
        import time
 
631
 
 
632
        t1 = time.time()
 
633
 
 
634
        #requires that servers are started on beowulf 1 and 2.
 
635
 
 
636
        import beowulf
 
637
 
 
638
        server_list = beowulf.beowulf.servers[:machines]
 
639
 
 
640
        thread.start_new_thread(test_pop,(server_list,size,wait))                       
 
641
 
 
642
        print 'total time:', time.time()-t1