~ubuntu-branches/ubuntu/quantal/deap/quantal

« back to all changes in this revision

Viewing changes to deap/dtm/commManagerMpi4py.py

  • Committer: Package Import Robot
  • Author(s): Miriam Ruiz, Jakub Wilk, Miriam Ruiz
  • Date: 2011-11-17 11:53:15 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20111117115315-k9lkwpqcbsq8n0q7
Tags: 0.7.1-1
[ Jakub Wilk ]
* Add Vcs-* fields.

[ Miriam Ruiz ]
* New Upstream Release
* Upgraded Standards-Version from 3.9.1 to 3.9.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
try:
 
2
    import Queue
 
3
except ImportError:
 
4
    import queue as Queue
 
5
import time
 
6
import threading
 
7
try:
 
8
    import cPickle
 
9
except ImportError:
 
10
    import pickle as cPickle
 
11
import array
 
12
import copy
 
13
import logging
 
14
 
 
15
try:
 
16
    from lxml import etree
 
17
except ImportError:
 
18
    try:
 
19
        import xml.etree.cElementTree as etree
 
20
    except ImportError:
 
21
        # Python 2.5
 
22
        import xml.etree.ElementTree as etree
 
23
 
 
24
from deap.dtm.dtmTypes import *
 
25
from deap.dtm.abstractCommManager import AbstractCommThread
 
26
 
 
27
_logger = logging.getLogger("dtm.communication")
 
28
 
 
29
DTM_MPI_MIN_LATENCY = 0.005
 
30
DTM_MPI_MAX_LATENCY = 0.01
 
31
DTM_CONCURRENT_RECV_LIMIT = 1000
 
32
DTM_CONCURRENT_SEND_LIMIT = 1000
 
33
 
 
34
class CommThread(AbstractCommThread):
 
35
 
 
36
    def __init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs):        
 
37
        AbstractCommThread.__init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs)
 
38
    
 
39
    @property
 
40
    def poolSize(self):
 
41
        return self.pSize
 
42
 
 
43
    @property
 
44
    def workerId(self):
 
45
        return self.currentId
 
46
 
 
47
    @property
 
48
    def isRootWorker(self):
 
49
        return self.currentId == 0
 
50
    
 
51
    @property
 
52
    def isLaunchProcess(self):
 
53
        return False
 
54
    
 
55
    def setTraceModeOn(self, xmlLogger):
 
56
        self.traceMode = True
 
57
        self.traceTo = xmlLogger
 
58
 
 
59
    def iterOverIDs(self):
 
60
        return range(self.pSize)
 
61
 
 
62
    def run(self):
 
63
        from mpi4py import MPI
 
64
        
 
65
        def mpiSend(msg, dest):
 
66
            # Pickle and send over MPI
 
67
            arrayBuf = array.array('b')
 
68
            arrayBuf.fromstring(cPickle.dumps(msg, cPickle.HIGHEST_PROTOCOL))
 
69
            
 
70
            b = MPI.COMM_WORLD.Isend([arrayBuf, MPI.CHAR], dest=dest, tag=self.msgSendTag)
 
71
            if self.traceMode:
 
72
                etree.SubElement(self.traceTo, "msg", {"direc" : "out", "type" : str(msg.msgType), "otherWorker" : str(dest), "msgtag" : str(self.msgSendTag), "time" : repr(time.time())})
 
73
            
 
74
            self.msgSendTag += 1
 
75
            return b, arrayBuf
 
76
        
 
77
        assert MPI.Is_initialized(), "Error in MPI Init!"
 
78
        
 
79
        self.pSize = MPI.COMM_WORLD.Get_size()
 
80
        self.currentId = MPI.COMM_WORLD.Get_rank()
 
81
        
 
82
        self.commReadyEvent.set()   # Notify the main thread that we are ready
 
83
        
 
84
        if self.currentId == 0 and MPI.Query_thread() > 0:
 
85
            # Warn only once
 
86
            _logger.warning("MPI was initialized with a thread level of %i, which is higher than MPI_THREAD_SINGLE."
 
87
            " The current MPI implementations do not always handle well the MPI_THREAD_MULTIPLE or MPI_THREAD_SERIALIZED modes."
 
88
            " As DTM was designed to work with the base, safe mode (MPI_THREAD_SINGLE), it is strongly suggested to change"
 
89
            " the 'thread_level' variable or your mpi4py settings in 'site-packages/mpi4py/rc.py', unless you have strong"
 
90
            " motivations to keep that setting. This may bring both stability and performance improvements.", MPI.Query_thread())
 
91
        
 
92
        lRecvWaiting = []
 
93
        lSendWaiting = []
 
94
        countSend = 0
 
95
        countRecv = 0
 
96
        lMessageStatus = MPI.Status()
 
97
        working = True
 
98
        
 
99
        countRecvNotTransmit = 0
 
100
        countRecvTimeInit = time.time()
 
101
 
 
102
        while working:
 
103
            recvSomething = False
 
104
            sendSomething = False
 
105
 
 
106
            if self.exitStatus.is_set():    # Exiting
 
107
                # Warning : the communication thread MUST clear the sendQ
 
108
                # BEFORE leaving (the exiting orders must be send)
 
109
                working = False
 
110
 
 
111
            while len(lRecvWaiting) < DTM_CONCURRENT_RECV_LIMIT and MPI.COMM_WORLD.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=lMessageStatus):
 
112
                # We received something
 
113
                lBuf = array.array('b', (0,))
 
114
                lBuf = lBuf * lMessageStatus.Get_elements(MPI.CHAR)
 
115
                
 
116
                lRecvWaiting.append((lBuf, MPI.COMM_WORLD.Irecv([lBuf, MPI.CHAR], source=lMessageStatus.Get_source(), tag=lMessageStatus.Get_tag()), lMessageStatus.Get_tag()))
 
117
 
 
118
                lMessageStatus = MPI.Status()
 
119
                recvSomething = True
 
120
 
 
121
            
 
122
            for i, reqTuple in enumerate(lRecvWaiting):
 
123
                if reqTuple[1].Test():
 
124
                    countRecv += 1
 
125
                    dataS = cPickle.loads(reqTuple[0].tostring())
 
126
                    if self.traceMode:
 
127
                        etree.SubElement(self.traceTo, "msg", {"direc" : "in", "type" : str(dataS.msgType), "otherWorker" : str(dataS.senderWid), "msgtag" : str(reqTuple[2]), "time" : repr(time.time())})
 
128
                    self.recvQ.put(dataS)
 
129
                    lRecvWaiting[i] = None
 
130
                    recvSomething = True
 
131
                    # Wake up the main thread if there's a sufficient number
 
132
                    # of pending receives
 
133
                    countRecvNotTransmit += 1
 
134
                    
 
135
                    
 
136
            if countRecvNotTransmit > 50 or (time.time() - countRecvTimeInit > 0.1 and countRecvNotTransmit > 0):
 
137
                countRecvNotTransmit = 0
 
138
                countRecvTimeInit = time.time()
 
139
                self.wakeUpMainThread.set()        
 
140
                    
 
141
            lRecvWaiting = filter(lambda d: not d is None, lRecvWaiting)
 
142
            if not isinstance(lRecvWaiting, list):
 
143
                lRecvWaiting = list(lRecvWaiting)
 
144
 
 
145
            while len(lSendWaiting) < DTM_CONCURRENT_SEND_LIMIT:
 
146
                # Send all pending sends, under the limit of
 
147
                # DTM_CONCURRENT_SEND_LIMIT
 
148
                try:
 
149
                    sendMsg = self.sendQ.get_nowait()
 
150
                    countSend += 1
 
151
                    sendMsg.sendTime = time.time()
 
152
                    commA, buf1 = mpiSend(sendMsg, sendMsg.receiverWid)
 
153
                    lSendWaiting.append((commA, buf1))
 
154
                    sendSomething = True
 
155
                except Queue.Empty:
 
156
                    break
 
157
            
 
158
            lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
 
159
            if not isinstance(lSendWaiting, list):  # Python 3
 
160
                lSendWaiting = list(lSendWaiting)
 
161
            
 
162
            if not recvSomething:
 
163
                time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
 
164
                
 
165
        while len(lSendWaiting) > 0:
 
166
            # Send the lasts messages before shutdown
 
167
            lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
 
168
            if not isinstance(lSendWaiting, list):  # Python 3
 
169
                lSendWaiting = list(lSendWaiting)
 
170
            time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
 
171
            
 
172
        del lSendWaiting
 
173
        del lRecvWaiting
 
174
        
 
 
b'\\ No newline at end of file'