10
import pickle as cPickle
16
from lxml import etree
19
import xml.etree.cElementTree as etree
22
import xml.etree.ElementTree as etree
24
from deap.dtm.dtmTypes import *
25
from deap.dtm.abstractCommManager import AbstractCommThread
27
_logger = logging.getLogger("dtm.communication")
29
DTM_MPI_MIN_LATENCY = 0.005
30
DTM_MPI_MAX_LATENCY = 0.01
31
DTM_CONCURRENT_RECV_LIMIT = 1000
32
DTM_CONCURRENT_SEND_LIMIT = 1000
34
class CommThread(AbstractCommThread):
36
def __init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs):
37
AbstractCommThread.__init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs)
48
def isRootWorker(self):
49
return self.currentId == 0
52
def isLaunchProcess(self):
55
def setTraceModeOn(self, xmlLogger):
57
self.traceTo = xmlLogger
59
def iterOverIDs(self):
60
return range(self.pSize)
63
from mpi4py import MPI
65
def mpiSend(msg, dest):
66
# Pickle and send over MPI
67
arrayBuf = array.array('b')
68
arrayBuf.fromstring(cPickle.dumps(msg, cPickle.HIGHEST_PROTOCOL))
70
b = MPI.COMM_WORLD.Isend([arrayBuf, MPI.CHAR], dest=dest, tag=self.msgSendTag)
72
etree.SubElement(self.traceTo, "msg", {"direc" : "out", "type" : str(msg.msgType), "otherWorker" : str(dest), "msgtag" : str(self.msgSendTag), "time" : repr(time.time())})
77
assert MPI.Is_initialized(), "Error in MPI Init!"
79
self.pSize = MPI.COMM_WORLD.Get_size()
80
self.currentId = MPI.COMM_WORLD.Get_rank()
82
self.commReadyEvent.set() # Notify the main thread that we are ready
84
if self.currentId == 0 and MPI.Query_thread() > 0:
86
_logger.warning("MPI was initialized with a thread level of %i, which is higher than MPI_THREAD_SINGLE."
87
" The current MPI implementations do not always handle well the MPI_THREAD_MULTIPLE or MPI_THREAD_SERIALIZED modes."
88
" As DTM was designed to work with the base, safe mode (MPI_THREAD_SINGLE), it is strongly suggested to change"
89
" the 'thread_level' variable or your mpi4py settings in 'site-packages/mpi4py/rc.py', unless you have strong"
90
" motivations to keep that setting. This may bring both stability and performance improvements.", MPI.Query_thread())
96
lMessageStatus = MPI.Status()
99
countRecvNotTransmit = 0
100
countRecvTimeInit = time.time()
103
recvSomething = False
104
sendSomething = False
106
if self.exitStatus.is_set(): # Exiting
107
# Warning : the communication thread MUST clear the sendQ
108
# BEFORE leaving (the exiting orders must be send)
111
while len(lRecvWaiting) < DTM_CONCURRENT_RECV_LIMIT and MPI.COMM_WORLD.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=lMessageStatus):
112
# We received something
113
lBuf = array.array('b', (0,))
114
lBuf = lBuf * lMessageStatus.Get_elements(MPI.CHAR)
116
lRecvWaiting.append((lBuf, MPI.COMM_WORLD.Irecv([lBuf, MPI.CHAR], source=lMessageStatus.Get_source(), tag=lMessageStatus.Get_tag()), lMessageStatus.Get_tag()))
118
lMessageStatus = MPI.Status()
122
for i, reqTuple in enumerate(lRecvWaiting):
123
if reqTuple[1].Test():
125
dataS = cPickle.loads(reqTuple[0].tostring())
127
etree.SubElement(self.traceTo, "msg", {"direc" : "in", "type" : str(dataS.msgType), "otherWorker" : str(dataS.senderWid), "msgtag" : str(reqTuple[2]), "time" : repr(time.time())})
128
self.recvQ.put(dataS)
129
lRecvWaiting[i] = None
131
# Wake up the main thread if there's a sufficient number
132
# of pending receives
133
countRecvNotTransmit += 1
136
if countRecvNotTransmit > 50 or (time.time() - countRecvTimeInit > 0.1 and countRecvNotTransmit > 0):
137
countRecvNotTransmit = 0
138
countRecvTimeInit = time.time()
139
self.wakeUpMainThread.set()
141
lRecvWaiting = filter(lambda d: not d is None, lRecvWaiting)
142
if not isinstance(lRecvWaiting, list):
143
lRecvWaiting = list(lRecvWaiting)
145
while len(lSendWaiting) < DTM_CONCURRENT_SEND_LIMIT:
146
# Send all pending sends, under the limit of
147
# DTM_CONCURRENT_SEND_LIMIT
149
sendMsg = self.sendQ.get_nowait()
151
sendMsg.sendTime = time.time()
152
commA, buf1 = mpiSend(sendMsg, sendMsg.receiverWid)
153
lSendWaiting.append((commA, buf1))
158
lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
159
if not isinstance(lSendWaiting, list): # Python 3
160
lSendWaiting = list(lSendWaiting)
162
if not recvSomething:
163
time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
165
while len(lSendWaiting) > 0:
166
# Send the lasts messages before shutdown
167
lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
168
if not isinstance(lSendWaiting, list): # Python 3
169
lSendWaiting = list(lSendWaiting)
170
time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
b'\\ No newline at end of file'