2
2
* All rights reserved. */
4
4
/* This program is free software; you can redistribute it and/or modify it
5
* under the terms of the license (GNU LGPL) which comes with this package. */
5
* under the terms of the license (GNU LGPL) which comes with this package. */
11
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12
"Logging specific to SMPI (base)");
10
#include "xbt/replay.h"
12
#include "simix/smx_private.h"
13
#include "surf/surf.h"
14
#include "simgrid/sg_config.h"
17
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
14
20
static int match_recv(void* a, void* b, smx_action_t ignored) {
15
21
MPI_Request ref = (MPI_Request)a;
16
22
MPI_Request req = (MPI_Request)b;
23
XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
18
xbt_assert(ref, "Cannot match recv against null reference");
19
xbt_assert(req, "Cannot match recv against null request");
20
return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
21
&& (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
25
xbt_assert(ref, "Cannot match recv against null reference");
26
xbt_assert(req, "Cannot match recv against null request");
27
if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
28
&& (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
29
//we match, we can transfer some values
30
// FIXME : move this to the copy function ?
31
if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
32
if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
33
if(ref->real_size < req->real_size) ref->truncated = 1;
35
ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
24
41
static int match_send(void* a, void* b,smx_action_t ignored) {
25
42
MPI_Request ref = (MPI_Request)a;
26
43
MPI_Request req = (MPI_Request)b;
44
XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
28
45
xbt_assert(ref, "Cannot match send against null reference");
29
46
xbt_assert(req, "Cannot match send against null request");
30
return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
31
&& (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
48
if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
49
&& (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
51
if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
52
if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
53
if(req->real_size < ref->real_size) req->truncated = 1;
55
req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
63
typedef struct s_smpi_factor *smpi_factor_t;
64
typedef struct s_smpi_factor {
67
double values[4];//arbitrary set to 4
69
xbt_dynar_t smpi_os_values = NULL;
70
xbt_dynar_t smpi_or_values = NULL;
73
// Methods used to parse and store the values for timing injections in smpi
74
// These are taken from surf/network.c and generalized to have more factors
75
// These methods should be merged with those in surf/network.c (moved somewhere in xbt ?)
77
static int factor_cmp(const void *pa, const void *pb)
79
return (((s_smpi_factor_t*)pa)->factor > ((s_smpi_factor_t*)pb)->factor);
83
static xbt_dynar_t parse_factor(const char *smpi_coef_string)
86
unsigned int iter = 0;
89
xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
91
smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_t), NULL);
92
radical_elements = xbt_str_split(smpi_coef_string, ";");
93
xbt_dynar_foreach(radical_elements, iter, value) {
95
radical_elements2 = xbt_str_split(value, ":");
96
if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
97
xbt_die("Malformed radical for smpi factor!");
98
for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
100
fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
102
fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
107
xbt_dynar_push_as(smpi_factor, s_smpi_factor_t, fact);
108
XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
109
xbt_dynar_free(&radical_elements2);
111
xbt_dynar_free(&radical_elements);
113
xbt_dynar_sort(smpi_factor, &factor_cmp);
114
xbt_dynar_foreach(smpi_factor, iter, fact) {
115
XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
120
static double smpi_os(double size)
124
parse_factor(sg_cfg_get_string("smpi/os"));
126
unsigned int iter = 0;
127
s_smpi_factor_t fact;
129
xbt_dynar_foreach(smpi_os_values, iter, fact) {
130
if (size <= fact.factor) {
131
XBT_DEBUG("os : %lf <= %ld return %f", size, fact.factor, current);
134
current=fact.values[0]+fact.values[1]*size;
137
XBT_DEBUG("os : %lf > %ld return %f", size, fact.factor, current);
142
static double smpi_or(double size)
146
parse_factor(sg_cfg_get_string("smpi/or"));
148
unsigned int iter = 0;
149
s_smpi_factor_t fact;
151
xbt_dynar_foreach(smpi_or_values, iter, fact) {
152
if (size <= fact.factor) {
153
XBT_DEBUG("or : %lf <= %ld return %f", size, fact.factor, current);
156
current=fact.values[0]+fact.values[1]*size;
158
XBT_DEBUG("or : %lf > %ld return %f", size, fact.factor, current);
34
163
static MPI_Request build_request(void *buf, int count,
47
194
request->comm = comm;
48
195
request->action = NULL;
49
196
request->flags = flags;
197
request->detached = 0;
198
request->detached_sender = NULL;
200
request->truncated = 0;
201
request->real_size = 0;
202
request->real_tag = 0;
50
205
#ifdef HAVE_TRACING
51
206
request->send = 0;
52
207
request->recv = 0;
209
if (flags & SEND) smpi_datatype_unuse(datatype);
215
void smpi_empty_status(MPI_Status * status) {
216
if(status != MPI_STATUS_IGNORE) {
217
status->MPI_SOURCE=MPI_ANY_SOURCE;
218
status->MPI_TAG=MPI_ANY_TAG;
223
void smpi_action_trace_run(char *path)
227
xbt_dict_cursor_t cursor;
231
action_fp = fopen(path, "r");
232
xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
236
if (!xbt_dict_is_empty(action_queues)) {
238
("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
241
xbt_dict_foreach(action_queues, cursor, name, todo) {
242
XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
248
xbt_dict_free(&action_queues);
249
action_queues = xbt_dict_new_homogeneous(NULL);
57
252
static void smpi_mpi_request_free_voidp(void* request)
59
254
MPI_Request req = request;
75
270
int src, int tag, MPI_Comm comm)
77
272
MPI_Request request =
78
build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
79
comm, PERSISTENT | RECV);
273
build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
274
comm, PERSISTENT | RECV);
84
279
void smpi_mpi_start(MPI_Request request)
86
281
smx_rdv_t mailbox;
89
283
xbt_assert(!request->action,
90
"Cannot (re)start a non-finished communication");
284
"Cannot (re)start a non-finished communication");
91
285
if(request->flags & RECV) {
92
286
print_request("New recv", request);
93
mailbox = smpi_process_mailbox();
94
// FIXME: SIMIX does not yet support non-contiguous datatypes
95
request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
287
if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
288
mailbox = smpi_process_mailbox_small();
290
mailbox = smpi_process_mailbox();
291
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
292
request->real_size=request->size;
293
smpi_datatype_use(request->old_type);
294
request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
296
double sleeptime = smpi_or(request->size);
298
simcall_process_sleep(sleeptime);
299
XBT_DEBUG("receiving size of %zu : sleep %lf ", request->size, smpi_or(request->size));
304
int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
305
/* if(receiver == MPI_UNDEFINED) {*/
306
/* XBT_WARN("Trying to send a message to a wrong rank");*/
97
309
print_request("New send", request);
98
mailbox = smpi_process_remote_mailbox(
99
smpi_group_index(smpi_comm_group(request->comm), request->dst));
100
// FIXME: SIMIX does not yet support non-contiguous datatypes
102
if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
103
void *oldbuf = request->buf;
105
request->buf = malloc(request->size);
106
memcpy(request->buf,oldbuf,request->size);
107
XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
109
XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
112
simcall_comm_isend(mailbox, request->size, -1.0,
113
request->buf, request->size,
115
&smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
117
// detach if msg size < eager/rdv switch limit
310
if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
311
mailbox = smpi_process_remote_mailbox_small(receiver);
313
XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
314
mailbox = smpi_process_remote_mailbox(receiver);
316
if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
318
request->detached = 1;
320
if(request->old_type->has_subtype == 0){
321
oldbuf = request->buf;
323
request->buf = xbt_malloc(request->size);
324
memcpy(request->buf,oldbuf,request->size);
327
XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
329
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
330
request->real_size=request->size;
331
smpi_datatype_use(request->old_type);
332
double sleeptime = smpi_os(request->size);
334
simcall_process_sleep(sleeptime);
335
XBT_DEBUG("sending size of %zu : sleep %lf ", request->size, smpi_os(request->size));
338
simcall_comm_isend(mailbox, request->size, -1.0,
339
request->buf, request->real_size,
341
&smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
343
// detach if msg size < eager/rdv switch limit
120
346
#ifdef HAVE_TRACING
121
347
/* FIXME: detached sends are not traceable (request->action == NULL) */
122
348
if (request->action)
123
349
simcall_set_category(request->action, TRACE_internal_smpi_get_category());
128
356
void smpi_mpi_startall(int count, MPI_Request * requests)
132
360
for(i = 0; i < count; i++) {
133
361
smpi_mpi_start(requests[i]);
137
365
void smpi_mpi_request_free(MPI_Request * request)
140
*request = MPI_REQUEST_NULL;
368
if((*request) != MPI_REQUEST_NULL){
369
(*request)->refcount--;
370
if((*request)->refcount<0) xbt_die("wrong refcount");
372
if((*request)->refcount==0){
374
*request = MPI_REQUEST_NULL;
377
xbt_die("freeing an already free request");
143
381
MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
144
382
int dst, int tag, MPI_Comm comm)
146
384
MPI_Request request =
147
build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
148
comm, NON_PERSISTENT | SEND);
385
build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
386
comm, NON_PERSISTENT | SEND);
226
466
static void finish_wait(MPI_Request * request, MPI_Status * status)
228
468
MPI_Request req = *request;
230
if(status != MPI_STATUS_IGNORE) {
231
status->MPI_SOURCE = req->src;
232
status->MPI_TAG = req->tag;
233
status->MPI_ERROR = MPI_SUCCESS;
234
// FIXME: really this should just contain the count of receive-type blocks,
236
status->count = req->size;
238
print_request("Finishing", req);
469
if(!(req->detached && req->flags & SEND)){
470
if(status != MPI_STATUS_IGNORE) {
471
status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
472
status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
474
status->MPI_ERROR = MPI_ERR_TRUNCATE;
475
else status->MPI_ERROR = MPI_SUCCESS ;
476
// this handles the case were size in receive differs from size in send
477
// FIXME: really this should just contain the count of receive-type blocks,
479
status->count = req->real_size;
482
print_request("Finishing", req);
483
MPI_Datatype datatype = req->old_type;
485
if(datatype->has_subtype == 1){
486
// This part handles the problem of non-contignous memory
487
// the unserialization at the reception
488
s_smpi_subtype_t *subtype = datatype->substruct;
489
if(req->flags & RECV) {
490
subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
492
if(req->detached == 0) free(req->buf);
494
smpi_datatype_unuse(datatype);
497
if(req->detached_sender!=NULL){
498
smpi_mpi_request_free(&(req->detached_sender));
239
501
if(req->flags & NON_PERSISTENT) {
240
502
smpi_mpi_request_free(request);
280
546
i = simcall_comm_testany(comms);
281
// FIXME: MPI_UNDEFINED or does SIMIX have a return code?
282
if(i != MPI_UNDEFINED) {
547
// not MPI_UNDEFINED, as this is a simix return code
284
smpi_mpi_wait(&requests[*index], status);
550
finish_wait(&requests[*index], status);
554
//all requests are null or inactive, return true
556
smpi_empty_status(status);
289
559
xbt_dynar_free(&comms);
566
int smpi_mpi_testall(int count, MPI_Request requests[],
570
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
573
for(i=0; i<count; i++){
574
if(requests[i]!= MPI_REQUEST_NULL){
575
if (smpi_mpi_test(&requests[i], pstat)!=1){
579
smpi_empty_status(pstat);
581
if(status != MPI_STATUSES_IGNORE) {
582
memcpy(&status[i], pstat, sizeof(*pstat));
588
void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
590
//FIXME find another wait to avoid busy waiting ?
591
// the issue here is that we have to wait on a nonexistent comm
593
smpi_mpi_iprobe(source, tag, comm, &flag, status);
594
XBT_DEBUG("Busy Waiting on probing : %d", flag);
596
simcall_process_sleep(0.0001);
601
void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
602
MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
603
comm, NON_PERSISTENT | RECV);
605
// behave like a receive, but don't do it
608
print_request("New iprobe", request);
609
// We have to test both mailboxes as we don't know if we will receive one one or another
610
if (sg_cfg_get_int("smpi/async_small_thres")>0){
611
mailbox = smpi_process_mailbox_small();
612
XBT_DEBUG("trying to probe the perm recv mailbox");
613
request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
615
if (request->action==NULL){
616
mailbox = smpi_process_mailbox();
617
XBT_DEBUG("trying to probe the other mailbox");
618
request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
622
MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
624
if(status != MPI_STATUS_IGNORE) {
625
status->MPI_SOURCE = req->src;
626
status->MPI_TAG = req->tag;
627
status->MPI_ERROR = MPI_SUCCESS;
628
status->count = req->real_size;
632
smpi_mpi_request_free(&request);
294
637
void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
314
658
comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
315
659
map = xbt_new(int, count);
317
XBT_DEBUG("Wait for one of");
661
XBT_DEBUG("Wait for one of %d", count);
318
662
for(i = 0; i < count; i++) {
319
if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
320
print_request(" ", requests[i]);
321
xbt_dynar_push(comms, &requests[i]->action);
663
if(requests[i] != MPI_REQUEST_NULL) {
664
if (requests[i]->action != NULL) {
665
XBT_DEBUG("Waiting any %p ", requests[i]);
666
xbt_dynar_push(comms, &requests[i]->action);
670
//This is a finished detached request, let's return this one
671
size=0;//so we free the dynar but don't do the waitany call
673
finish_wait(&requests[i], status);//cleanup if refcount = 0
674
requests[i]=MPI_REQUEST_NULL;//set to null
327
680
i = simcall_comm_waitany(comms);
328
// FIXME: MPI_UNDEFINED or does SIMIX have a return code?
329
if (i != MPI_UNDEFINED) {
682
// not MPI_UNDEFINED, as this is a simix return code
331
685
finish_wait(&requests[index], status);
335
689
xbt_dynar_free(&comms);
692
if (index==MPI_UNDEFINED)
693
smpi_empty_status(status);
340
void smpi_mpi_waitall(int count, MPI_Request requests[],
698
int smpi_mpi_waitall(int count, MPI_Request requests[],
341
699
MPI_Status status[])
345
MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
347
for(c = 0; c < count; c++) {
349
smpi_mpi_wait(&requests[c], pstat);
352
index = smpi_mpi_waitany(count, requests, pstat);
353
if(index == MPI_UNDEFINED) {
357
if(status != MPI_STATUS_IGNORE) {
358
memcpy(&status[index], pstat, sizeof(*pstat));
703
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
704
int retvalue=MPI_SUCCESS;
705
//tag invalid requests in the set
706
for(c = 0; c < count; c++) {
707
if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
708
if(status != MPI_STATUSES_IGNORE)
709
smpi_empty_status(&status[c]);
710
}else if(requests[c]->src == MPI_PROC_NULL ){
711
if(status != MPI_STATUSES_IGNORE) {
712
smpi_empty_status(&status[c]);
713
status[c].MPI_SOURCE=MPI_PROC_NULL;
717
for(c = 0; c < count; c++) {
719
smpi_mpi_wait(&requests[c], pstat);
722
index = smpi_mpi_waitany(count, requests, pstat);
723
if(index == MPI_UNDEFINED) {
726
if(status != MPI_STATUSES_IGNORE) {
727
memcpy(&status[index], pstat, sizeof(*pstat));
728
if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
363
737
int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
364
738
MPI_Status status[])
366
740
int i, count, index;
742
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
745
for(i = 0; i < incount; i++)
747
index=smpi_mpi_waitany(incount, requests, pstat);
748
if(index!=MPI_UNDEFINED){
749
indices[count] = index;
751
if(status != MPI_STATUSES_IGNORE) {
752
memcpy(&status[index], pstat, sizeof(*pstat));
755
return MPI_UNDEFINED;
761
int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
764
int i, count, count_dead;
766
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
369
770
for(i = 0; i < incount; i++) {
370
if(smpi_mpi_testany(incount, requests, &index, status)) {
371
indices[count] = index;
771
if((requests[i] != MPI_REQUEST_NULL)) {
772
if(smpi_mpi_test(&requests[i], pstat)) {
775
if(status != MPI_STATUSES_IGNORE) {
776
memcpy(&status[i], pstat, sizeof(*pstat));
783
if(count_dead==incount)return MPI_UNDEFINED;
378
787
void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,