~ubuntu-branches/ubuntu/raring/simgrid/raring

« back to all changes in this revision

Viewing changes to src/smpi/smpi_base.c

  • Committer: Package Import Robot
  • Author(s): Martin Quinson
  • Date: 2013-01-31 00:24:51 UTC
  • mfrom: (10.1.6 sid)
  • Revision ID: package-import@ubuntu.com-20130131002451-krejhf7w7h24lpsc
Tags: 3.9~rc1-1
* New upstream release: the "Grasgory" release. Major changes:
  - Gras was completely removed from this version.
  - Documentation reorganization to ease browsing it.
  - New default value for the TCP_gamma parameter: 4MiB

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 * All rights reserved.                                                     */
3
3
 
4
4
/* This program is free software; you can redistribute it and/or modify it
5
 
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
5
 * under the terms of the license (GNU LGPL) which comes with this package. */
6
6
 
7
7
#include "private.h"
8
 
#include "xbt/time.h"
 
8
#include "xbt/virtu.h"
9
9
#include "mc/mc.h"
10
 
 
11
 
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12
 
                                "Logging specific to SMPI (base)");
 
10
#include "xbt/replay.h"
 
11
#include <errno.h>
 
12
#include "simix/smx_private.h"
 
13
#include "surf/surf.h"
 
14
#include "simgrid/sg_config.h"
 
15
 
 
16
 
 
17
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
 
18
 
13
19
 
14
20
static int match_recv(void* a, void* b, smx_action_t ignored) {
15
21
   MPI_Request ref = (MPI_Request)a;
16
22
   MPI_Request req = (MPI_Request)b;
 
23
   XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
17
24
 
18
 
   xbt_assert(ref, "Cannot match recv against null reference");
19
 
   xbt_assert(req, "Cannot match recv against null request");
20
 
   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
21
 
          && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
 
25
  xbt_assert(ref, "Cannot match recv against null reference");
 
26
  xbt_assert(req, "Cannot match recv against null request");
 
27
  if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
 
28
    && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
 
29
    //we match, we can transfer some values
 
30
    // FIXME : move this to the copy function ?
 
31
    if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
 
32
    if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
 
33
    if(ref->real_size < req->real_size) ref->truncated = 1;
 
34
    if(req->detached==1){
 
35
        ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
 
36
    }
 
37
    return 1;
 
38
  }else return 0;
22
39
}
23
40
 
24
41
static int match_send(void* a, void* b,smx_action_t ignored) {
25
42
   MPI_Request ref = (MPI_Request)a;
26
43
   MPI_Request req = (MPI_Request)b;
27
 
 
 
44
   XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
28
45
   xbt_assert(ref, "Cannot match send against null reference");
29
46
   xbt_assert(req, "Cannot match send against null request");
30
 
   return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
31
 
          && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
 
47
 
 
48
   if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
 
49
             && (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
 
50
   {
 
51
     if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
 
52
     if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
 
53
     if(req->real_size < ref->real_size) req->truncated = 1;
 
54
     if(ref->detached==1){
 
55
         req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
 
56
     }
 
57
 
 
58
     return 1;
 
59
   } else return 0;
 
60
}
 
61
 
 
62
 
 
63
typedef struct s_smpi_factor *smpi_factor_t;
 
64
typedef struct s_smpi_factor {
 
65
  long factor;
 
66
  int nb_values;
 
67
  double values[4];//arbitrary set to 4
 
68
} s_smpi_factor_t;
 
69
xbt_dynar_t smpi_os_values = NULL;
 
70
xbt_dynar_t smpi_or_values = NULL;
 
71
 
 
72
 
 
73
// Methods used to parse and store the values for timing injections in smpi
 
74
// These are taken from surf/network.c and generalized to have more factors
 
75
// These methods should be merged with those in surf/network.c (moved somewhere in xbt ?)
 
76
 
 
77
static int factor_cmp(const void *pa, const void *pb)
 
78
{
 
79
  return (((s_smpi_factor_t*)pa)->factor > ((s_smpi_factor_t*)pb)->factor);
 
80
}
 
81
 
 
82
 
 
83
static xbt_dynar_t parse_factor(const char *smpi_coef_string)
 
84
{
 
85
  char *value = NULL;
 
86
  unsigned int iter = 0;
 
87
  s_smpi_factor_t fact;
 
88
  int i=0;
 
89
  xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
 
90
 
 
91
  smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_t), NULL);
 
92
  radical_elements = xbt_str_split(smpi_coef_string, ";");
 
93
  xbt_dynar_foreach(radical_elements, iter, value) {
 
94
    fact.nb_values=0;
 
95
    radical_elements2 = xbt_str_split(value, ":");
 
96
    if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
 
97
      xbt_die("Malformed radical for smpi factor!");
 
98
    for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
 
99
        if (i==0){
 
100
           fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
 
101
        }else{
 
102
           fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
 
103
           fact.nb_values++;
 
104
        }
 
105
    }
 
106
 
 
107
    xbt_dynar_push_as(smpi_factor, s_smpi_factor_t, fact);
 
108
    XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
 
109
    xbt_dynar_free(&radical_elements2);
 
110
  }
 
111
  xbt_dynar_free(&radical_elements);
 
112
  iter=0;
 
113
  xbt_dynar_sort(smpi_factor, &factor_cmp);
 
114
  xbt_dynar_foreach(smpi_factor, iter, fact) {
 
115
    XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
 
116
  }
 
117
  return smpi_factor;
 
118
}
 
119
 
 
120
static double smpi_os(double size)
 
121
{
 
122
  if (!smpi_os_values)
 
123
    smpi_os_values =
 
124
        parse_factor(sg_cfg_get_string("smpi/os"));
 
125
 
 
126
  unsigned int iter = 0;
 
127
  s_smpi_factor_t fact;
 
128
  double current=0.0;
 
129
  xbt_dynar_foreach(smpi_os_values, iter, fact) {
 
130
    if (size <= fact.factor) {
 
131
        XBT_DEBUG("os : %lf <= %ld return %f", size, fact.factor, current);
 
132
      return current;
 
133
    }else{
 
134
      current=fact.values[0]+fact.values[1]*size;
 
135
    }
 
136
  }
 
137
  XBT_DEBUG("os : %lf > %ld return %f", size, fact.factor, current);
 
138
 
 
139
  return current;
 
140
}
 
141
 
 
142
static double smpi_or(double size)
 
143
{
 
144
  if (!smpi_or_values)
 
145
    smpi_or_values =
 
146
        parse_factor(sg_cfg_get_string("smpi/or"));
 
147
 
 
148
  unsigned int iter = 0;
 
149
  s_smpi_factor_t fact;
 
150
  double current=0.0;
 
151
  xbt_dynar_foreach(smpi_or_values, iter, fact) {
 
152
    if (size <= fact.factor) {
 
153
        XBT_DEBUG("or : %lf <= %ld return %f", size, fact.factor, current);
 
154
      return current;
 
155
    }else
 
156
      current=fact.values[0]+fact.values[1]*size;
 
157
  }
 
158
  XBT_DEBUG("or : %lf > %ld return %f", size, fact.factor, current);
 
159
 
 
160
  return current;
32
161
}
33
162
 
34
163
static MPI_Request build_request(void *buf, int count,
37
166
{
38
167
  MPI_Request request;
39
168
 
 
169
  void *old_buf = NULL;
 
170
 
40
171
  request = xbt_new(s_smpi_mpi_request_t, 1);
 
172
 
 
173
  s_smpi_subtype_t *subtype = datatype->substruct;
 
174
 
 
175
  if(datatype->has_subtype == 1){
 
176
    // This part handles the problem of non-contiguous memory
 
177
    old_buf = buf;
 
178
    buf = xbt_malloc(count*smpi_datatype_size(datatype));
 
179
    if (flags & SEND) {
 
180
      subtype->serialize(old_buf, buf, count, datatype->substruct);
 
181
    }
 
182
  }
 
183
 
41
184
  request->buf = buf;
42
 
  // FIXME: this will have to be changed to support non-contiguous datatypes
 
185
  // This part handles the problem of non-contiguous memory (for the
 
186
  // unserialisation at the reception)
 
187
  request->old_buf = old_buf;
 
188
  request->old_type = datatype;
 
189
 
43
190
  request->size = smpi_datatype_size(datatype) * count;
44
191
  request->src = src;
45
192
  request->dst = dst;
47
194
  request->comm = comm;
48
195
  request->action = NULL;
49
196
  request->flags = flags;
 
197
  request->detached = 0;
 
198
  request->detached_sender = NULL;
 
199
 
 
200
  request->truncated = 0;
 
201
  request->real_size = 0;
 
202
  request->real_tag = 0;
 
203
 
 
204
  request->refcount=1;
50
205
#ifdef HAVE_TRACING
51
206
  request->send = 0;
52
207
  request->recv = 0;
53
208
#endif
 
209
  if (flags & SEND) smpi_datatype_unuse(datatype);
 
210
 
54
211
  return request;
55
212
}
56
213
 
 
214
 
 
215
void smpi_empty_status(MPI_Status * status) {
 
216
  if(status != MPI_STATUS_IGNORE) {
 
217
      status->MPI_SOURCE=MPI_ANY_SOURCE;
 
218
      status->MPI_TAG=MPI_ANY_TAG;
 
219
      status->count=0;
 
220
  }
 
221
}
 
222
 
 
223
void smpi_action_trace_run(char *path)
 
224
{
 
225
  char *name;
 
226
  xbt_dynar_t todo;
 
227
  xbt_dict_cursor_t cursor;
 
228
 
 
229
  action_fp=NULL;
 
230
  if (path) {
 
231
    action_fp = fopen(path, "r");
 
232
    xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
 
233
               strerror(errno));
 
234
  }
 
235
 
 
236
  if (!xbt_dict_is_empty(action_queues)) {
 
237
    XBT_WARN
 
238
      ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
 
239
 
 
240
 
 
241
    xbt_dict_foreach(action_queues, cursor, name, todo) {
 
242
      XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
 
243
    }
 
244
  }
 
245
 
 
246
  if (path)
 
247
    fclose(action_fp);
 
248
  xbt_dict_free(&action_queues);
 
249
  action_queues = xbt_dict_new_homogeneous(NULL);
 
250
}
 
251
 
57
252
static void smpi_mpi_request_free_voidp(void* request)
58
253
{
59
254
  MPI_Request req = request;
65
260
                               int dst, int tag, MPI_Comm comm)
66
261
{
67
262
  MPI_Request request =
68
 
      build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
69
 
                    comm, PERSISTENT | SEND);
70
 
 
 
263
    build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
 
264
                  comm, PERSISTENT | SEND);
 
265
  request->refcount++;
71
266
  return request;
72
267
}
73
268
 
75
270
                               int src, int tag, MPI_Comm comm)
76
271
{
77
272
  MPI_Request request =
78
 
      build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
79
 
                    comm, PERSISTENT | RECV);
80
 
 
 
273
    build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
 
274
                  comm, PERSISTENT | RECV);
 
275
  request->refcount++;
81
276
  return request;
82
277
}
83
278
 
84
279
void smpi_mpi_start(MPI_Request request)
85
280
{
86
281
  smx_rdv_t mailbox;
87
 
  int detached = 0;
88
282
 
89
283
  xbt_assert(!request->action,
90
 
              "Cannot (re)start a non-finished communication");
 
284
             "Cannot (re)start a non-finished communication");
91
285
  if(request->flags & RECV) {
92
286
    print_request("New recv", request);
93
 
    mailbox = smpi_process_mailbox();
94
 
    // FIXME: SIMIX does not yet support non-contiguous datatypes
95
 
    request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
 
287
    if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
 
288
      mailbox = smpi_process_mailbox_small();
 
289
    else
 
290
      mailbox = smpi_process_mailbox();
 
291
    // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
 
292
    request->real_size=request->size;
 
293
    smpi_datatype_use(request->old_type);
 
294
    request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
 
295
 
 
296
    double sleeptime = smpi_or(request->size);
 
297
    if(sleeptime!=0.0){
 
298
        simcall_process_sleep(sleeptime);
 
299
        XBT_DEBUG("receiving size of %zu : sleep %lf ", request->size, smpi_or(request->size));
 
300
    }
 
301
 
96
302
  } else {
 
303
 
 
304
    int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
 
305
/*    if(receiver == MPI_UNDEFINED) {*/
 
306
/*      XBT_WARN("Trying to send a message to a wrong rank");*/
 
307
/*      return;*/
 
308
/*    }*/
97
309
    print_request("New send", request);
98
 
    mailbox = smpi_process_remote_mailbox(
99
 
                          smpi_group_index(smpi_comm_group(request->comm), request->dst));
100
 
    // FIXME: SIMIX does not yet support non-contiguous datatypes
101
 
 
102
 
    if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
103
 
        void *oldbuf = request->buf;
104
 
        detached = 1;
105
 
        request->buf = malloc(request->size);
106
 
        memcpy(request->buf,oldbuf,request->size);
107
 
        XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
108
 
    } else {
109
 
        XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
110
 
    }
111
 
    request->action = 
112
 
                simcall_comm_isend(mailbox, request->size, -1.0,
113
 
                                    request->buf, request->size,
114
 
                                    &match_send,
115
 
                                    &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
116
 
                                    request,
117
 
                                    // detach if msg size < eager/rdv switch limit
118
 
                                    detached);
 
310
    if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
 
311
      mailbox = smpi_process_remote_mailbox_small(receiver);
 
312
    }else{
 
313
      XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
 
314
      mailbox = smpi_process_remote_mailbox(receiver);
 
315
    }
 
316
    if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
 
317
      void *oldbuf = NULL;
 
318
      request->detached = 1;
 
319
      request->refcount++;
 
320
      if(request->old_type->has_subtype == 0){
 
321
        oldbuf = request->buf;
 
322
        if (oldbuf){
 
323
          request->buf = xbt_malloc(request->size);
 
324
          memcpy(request->buf,oldbuf,request->size);
 
325
        }
 
326
      }
 
327
      XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
 
328
    }
 
329
    // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
 
330
    request->real_size=request->size;
 
331
    smpi_datatype_use(request->old_type);
 
332
    double sleeptime = smpi_os(request->size);
 
333
    if(sleeptime!=0.0){
 
334
        simcall_process_sleep(sleeptime);
 
335
        XBT_DEBUG("sending size of %zu : sleep %lf ", request->size, smpi_os(request->size));
 
336
    }
 
337
    request->action =
 
338
      simcall_comm_isend(mailbox, request->size, -1.0,
 
339
                         request->buf, request->real_size,
 
340
                         &match_send,
 
341
                         &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
 
342
                         request,
 
343
                         // detach if msg size < eager/rdv switch limit
 
344
                         request->detached);
119
345
 
120
346
#ifdef HAVE_TRACING
121
347
    /* FIXME: detached sends are not traceable (request->action == NULL) */
122
348
    if (request->action)
123
349
      simcall_set_category(request->action, TRACE_internal_smpi_get_category());
124
350
#endif
 
351
 
125
352
  }
 
353
 
126
354
}
127
355
 
128
356
void smpi_mpi_startall(int count, MPI_Request * requests)
129
357
{
130
 
          int i;
 
358
  int i;
131
359
 
132
360
  for(i = 0; i < count; i++) {
133
361
    smpi_mpi_start(requests[i]);
136
364
 
137
365
void smpi_mpi_request_free(MPI_Request * request)
138
366
{
139
 
  xbt_free(*request);
140
 
  *request = MPI_REQUEST_NULL;
 
367
 
 
368
  if((*request) != MPI_REQUEST_NULL){
 
369
    (*request)->refcount--;
 
370
    if((*request)->refcount<0) xbt_die("wrong refcount");
 
371
 
 
372
    if((*request)->refcount==0){
 
373
        xbt_free(*request);
 
374
        *request = MPI_REQUEST_NULL;
 
375
    }
 
376
  }else{
 
377
      xbt_die("freeing an already free request");
 
378
  }
141
379
}
142
380
 
143
381
MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
144
382
                            int dst, int tag, MPI_Comm comm)
145
383
{
146
384
  MPI_Request request =
147
 
      build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
148
 
                    comm, NON_PERSISTENT | SEND);
 
385
    build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
 
386
                  comm, NON_PERSISTENT | SEND);
149
387
 
150
388
  return request;
151
389
}
154
392
                           int dst, int tag, MPI_Comm comm)
155
393
{
156
394
  MPI_Request request =
157
 
      smpi_isend_init(buf, count, datatype, dst, tag, comm);
 
395
      build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
 
396
                    comm, NON_PERSISTENT | SEND);
158
397
 
159
398
  smpi_mpi_start(request);
160
399
  return request;
164
403
                            int src, int tag, MPI_Comm comm)
165
404
{
166
405
  MPI_Request request =
167
 
      build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
168
 
                    comm, NON_PERSISTENT | RECV);
169
 
 
 
406
    build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
 
407
                  comm, NON_PERSISTENT | RECV);
170
408
  return request;
171
409
}
172
410
 
174
412
                           int src, int tag, MPI_Comm comm)
175
413
{
176
414
  MPI_Request request =
177
 
      smpi_irecv_init(buf, count, datatype, src, tag, comm);
 
415
      build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
 
416
                    comm, NON_PERSISTENT | RECV);
178
417
 
179
418
  smpi_mpi_start(request);
180
419
  return request;
184
423
                   int tag, MPI_Comm comm, MPI_Status * status)
185
424
{
186
425
  MPI_Request request;
187
 
 
188
426
  request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
189
427
  smpi_mpi_wait(&request, status);
190
428
}
191
429
 
 
430
 
 
431
 
192
432
void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
193
433
                   int tag, MPI_Comm comm)
194
434
{
195
435
  MPI_Request request;
196
 
 
197
436
  request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
198
437
  smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
 
438
 
199
439
}
200
440
 
201
441
void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
207
447
  MPI_Status stats[2];
208
448
 
209
449
  requests[0] =
210
 
      smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
 
450
    smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
211
451
  requests[1] =
212
 
      smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
 
452
    smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
213
453
  smpi_mpi_startall(2, requests);
214
454
  smpi_mpi_waitall(2, requests, stats);
215
455
  if(status != MPI_STATUS_IGNORE) {
226
466
static void finish_wait(MPI_Request * request, MPI_Status * status)
227
467
{
228
468
  MPI_Request req = *request;
229
 
 
230
 
  if(status != MPI_STATUS_IGNORE) {
231
 
    status->MPI_SOURCE = req->src;
232
 
    status->MPI_TAG = req->tag;
233
 
    status->MPI_ERROR = MPI_SUCCESS;
234
 
    // FIXME: really this should just contain the count of receive-type blocks,
235
 
    // right?
236
 
    status->count = req->size;
237
 
  }
238
 
  print_request("Finishing", req);
 
469
  if(!(req->detached && req->flags & SEND)){
 
470
    if(status != MPI_STATUS_IGNORE) {
 
471
      status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
 
472
      status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
 
473
      if(req->truncated)
 
474
      status->MPI_ERROR = MPI_ERR_TRUNCATE;
 
475
      else status->MPI_ERROR = MPI_SUCCESS ;
 
476
      // this handles the case were size in receive differs from size in send
 
477
      // FIXME: really this should just contain the count of receive-type blocks,
 
478
      // right?
 
479
      status->count = req->real_size;
 
480
    }
 
481
 
 
482
    print_request("Finishing", req);
 
483
    MPI_Datatype datatype = req->old_type;
 
484
 
 
485
    if(datatype->has_subtype == 1){
 
486
        // This part handles the problem of non-contignous memory
 
487
        // the unserialization at the reception
 
488
      s_smpi_subtype_t *subtype = datatype->substruct;
 
489
      if(req->flags & RECV) {
 
490
        subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
 
491
      }
 
492
      if(req->detached == 0) free(req->buf);
 
493
    }
 
494
    smpi_datatype_unuse(datatype);
 
495
  }
 
496
 
 
497
  if(req->detached_sender!=NULL){
 
498
    smpi_mpi_request_free(&(req->detached_sender));
 
499
  }
 
500
 
239
501
  if(req->flags & NON_PERSISTENT) {
240
502
    smpi_mpi_request_free(request);
241
503
  } else {
244
506
}
245
507
 
246
508
int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
247
 
int flag;
 
509
  int flag;
248
510
 
249
 
   if ((*request)->action == NULL)
250
 
        flag = 1;
251
 
   else 
 
511
  //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
 
512
  if ((*request)->action == NULL)
 
513
    flag = 1;
 
514
  else
252
515
    flag = simcall_comm_test((*request)->action);
253
 
   if(flag) {
254
 
                    smpi_mpi_wait(request, status);
255
 
          }
256
 
          return flag;
 
516
  if(flag) {
 
517
    (*request)->refcount++;
 
518
    finish_wait(request, status);
 
519
  }else{
 
520
    smpi_empty_status(status);
 
521
  }
 
522
  return flag;
257
523
}
258
524
 
259
525
int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
270
536
    map = xbt_new(int, count);
271
537
    size = 0;
272
538
    for(i = 0; i < count; i++) {
273
 
      if(requests[i]->action) {
 
539
      if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
274
540
         xbt_dynar_push(comms, &requests[i]->action);
275
541
         map[size] = i;
276
542
         size++;
278
544
    }
279
545
    if(size > 0) {
280
546
      i = simcall_comm_testany(comms);
281
 
      // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
282
 
      if(i != MPI_UNDEFINED) {
 
547
      // not MPI_UNDEFINED, as this is a simix return code
 
548
      if(i != -1) {
283
549
        *index = map[i];
284
 
        smpi_mpi_wait(&requests[*index], status);
 
550
        finish_wait(&requests[*index], status);
285
551
        flag = 1;
286
552
      }
 
553
    }else{
 
554
        //all requests are null or inactive, return true
 
555
        flag=1;
 
556
        smpi_empty_status(status);
287
557
    }
288
558
    xbt_free(map);
289
559
    xbt_dynar_free(&comms);
290
560
  }
291
 
  return flag;
 
561
 
 
562
  return flag;
 
563
}
 
564
 
 
565
 
 
566
int smpi_mpi_testall(int count, MPI_Request requests[],
 
567
                     MPI_Status status[])
 
568
{
 
569
  MPI_Status stat;
 
570
  MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
571
  int flag=1;
 
572
  int i;
 
573
  for(i=0; i<count; i++){
 
574
    if(requests[i]!= MPI_REQUEST_NULL){
 
575
      if (smpi_mpi_test(&requests[i], pstat)!=1){
 
576
        flag=0;
 
577
      }
 
578
    }else{
 
579
      smpi_empty_status(pstat);
 
580
    }
 
581
    if(status != MPI_STATUSES_IGNORE) {
 
582
      memcpy(&status[i], pstat, sizeof(*pstat));
 
583
    }
 
584
  }
 
585
  return flag;
 
586
}
 
587
 
 
588
void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
 
589
  int flag=0;
 
590
  //FIXME find another wait to avoid busy waiting ?
 
591
  // the issue here is that we have to wait on a nonexistent comm
 
592
  while(flag==0){
 
593
    smpi_mpi_iprobe(source, tag, comm, &flag, status);
 
594
    XBT_DEBUG("Busy Waiting on probing : %d", flag);
 
595
    if(!flag) {
 
596
      simcall_process_sleep(0.0001);
 
597
    }
 
598
  }
 
599
}
 
600
 
 
601
void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
 
602
  MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
 
603
            comm, NON_PERSISTENT | RECV);
 
604
 
 
605
  // behave like a receive, but don't do it
 
606
  smx_rdv_t mailbox;
 
607
 
 
608
  print_request("New iprobe", request);
 
609
  // We have to test both mailboxes as we don't know if we will receive one one or another
 
610
    if (sg_cfg_get_int("smpi/async_small_thres")>0){
 
611
        mailbox = smpi_process_mailbox_small();
 
612
        XBT_DEBUG("trying to probe the perm recv mailbox");
 
613
        request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
 
614
    }
 
615
    if (request->action==NULL){
 
616
        mailbox = smpi_process_mailbox();
 
617
        XBT_DEBUG("trying to probe the other mailbox");
 
618
        request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
 
619
    }
 
620
 
 
621
  if(request->action){
 
622
    MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
 
623
    *flag = 1;
 
624
    if(status != MPI_STATUS_IGNORE) {
 
625
      status->MPI_SOURCE = req->src;
 
626
      status->MPI_TAG = req->tag;
 
627
      status->MPI_ERROR = MPI_SUCCESS;
 
628
      status->count = req->real_size;
 
629
    }
 
630
  }
 
631
  else *flag = 0;
 
632
  smpi_mpi_request_free(&request);
 
633
 
 
634
  return;
292
635
}
293
636
 
294
637
void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
296
639
  print_request("Waiting", *request);
297
640
  if ((*request)->action != NULL) { // this is not a detached send
298
641
    simcall_comm_wait((*request)->action, -1.0);
299
 
    finish_wait(request, status);
300
642
  }
 
643
  finish_wait(request, status);
 
644
 
301
645
  // FIXME for a detached send, finish_wait is not called:
302
646
}
303
647
 
314
658
    comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
315
659
    map = xbt_new(int, count);
316
660
    size = 0;
317
 
    XBT_DEBUG("Wait for one of");
 
661
    XBT_DEBUG("Wait for one of %d", count);
318
662
    for(i = 0; i < count; i++) {
319
 
      if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
320
 
        print_request("   ", requests[i]);
321
 
        xbt_dynar_push(comms, &requests[i]->action);
322
 
        map[size] = i;
323
 
        size++;
 
663
      if(requests[i] != MPI_REQUEST_NULL) {
 
664
        if (requests[i]->action != NULL) {
 
665
          XBT_DEBUG("Waiting any %p ", requests[i]);
 
666
          xbt_dynar_push(comms, &requests[i]->action);
 
667
          map[size] = i;
 
668
          size++;
 
669
        }else{
 
670
         //This is a finished detached request, let's return this one
 
671
         size=0;//so we free the dynar but don't do the waitany call
 
672
         index=i;
 
673
         finish_wait(&requests[i], status);//cleanup if refcount = 0
 
674
         requests[i]=MPI_REQUEST_NULL;//set to null
 
675
         break;
 
676
         }
324
677
      }
325
678
    }
326
679
    if(size > 0) {
327
680
      i = simcall_comm_waitany(comms);
328
 
      // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
329
 
      if (i != MPI_UNDEFINED) {
 
681
 
 
682
      // not MPI_UNDEFINED, as this is a simix return code
 
683
      if (i != -1) {
330
684
        index = map[i];
331
685
        finish_wait(&requests[index], status);
332
686
      }
334
688
    xbt_free(map);
335
689
    xbt_dynar_free(&comms);
336
690
  }
 
691
 
 
692
  if (index==MPI_UNDEFINED)
 
693
    smpi_empty_status(status);
 
694
 
337
695
  return index;
338
696
}
339
697
 
340
 
void smpi_mpi_waitall(int count, MPI_Request requests[],
 
698
int smpi_mpi_waitall(int count, MPI_Request requests[],
341
699
                      MPI_Status status[])
342
700
{
343
 
  int index, c;
 
701
  int  index, c;
344
702
  MPI_Status stat;
345
 
  MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
346
 
 
347
 
  for(c = 0; c < count; c++) {
348
 
    if(MC_IS_ENABLED) {
349
 
      smpi_mpi_wait(&requests[c], pstat);
350
 
      index = c;
351
 
    } else {
352
 
      index = smpi_mpi_waitany(count, requests, pstat);
353
 
      if(index == MPI_UNDEFINED) {
354
 
        break;
355
 
      }
356
 
    }
357
 
    if(status != MPI_STATUS_IGNORE) {
358
 
      memcpy(&status[index], pstat, sizeof(*pstat));
359
 
    }
360
 
  }
 
703
  MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
704
  int retvalue=MPI_SUCCESS;
 
705
  //tag invalid requests in the set
 
706
  for(c = 0; c < count; c++) {
 
707
    if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
 
708
      if(status != MPI_STATUSES_IGNORE)
 
709
        smpi_empty_status(&status[c]);
 
710
    }else if(requests[c]->src == MPI_PROC_NULL ){
 
711
      if(status != MPI_STATUSES_IGNORE) {
 
712
        smpi_empty_status(&status[c]);
 
713
        status[c].MPI_SOURCE=MPI_PROC_NULL;
 
714
      }
 
715
    }
 
716
  }
 
717
  for(c = 0; c < count; c++) {
 
718
      if(MC_is_active()) {
 
719
        smpi_mpi_wait(&requests[c], pstat);
 
720
        index = c;
 
721
      } else {
 
722
        index = smpi_mpi_waitany(count, requests, pstat);
 
723
        if(index == MPI_UNDEFINED) {
 
724
          break;
 
725
       }
 
726
      if(status != MPI_STATUSES_IGNORE) {
 
727
        memcpy(&status[index], pstat, sizeof(*pstat));
 
728
        if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
 
729
 
 
730
      }
 
731
    }
 
732
  }
 
733
 
 
734
  return retvalue;
361
735
}
362
736
 
363
737
int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
364
738
                      MPI_Status status[])
365
739
{
366
740
  int i, count, index;
367
 
 
368
 
  count = 0;
 
741
  MPI_Status stat;
 
742
  MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
743
 
 
744
  count = 0;
 
745
  for(i = 0; i < incount; i++)
 
746
  {
 
747
    index=smpi_mpi_waitany(incount, requests, pstat);
 
748
    if(index!=MPI_UNDEFINED){
 
749
      indices[count] = index;
 
750
      count++;
 
751
      if(status != MPI_STATUSES_IGNORE) {
 
752
        memcpy(&status[index], pstat, sizeof(*pstat));
 
753
      }
 
754
    }else{
 
755
      return MPI_UNDEFINED;
 
756
    }
 
757
  }
 
758
  return count;
 
759
}
 
760
 
 
761
int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
 
762
                      MPI_Status status[])
 
763
{
 
764
  int i, count, count_dead;
 
765
  MPI_Status stat;
 
766
  MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
767
 
 
768
  count = 0;
 
769
  count_dead = 0;
369
770
  for(i = 0; i < incount; i++) {
370
 
     if(smpi_mpi_testany(incount, requests, &index, status)) {
371
 
       indices[count] = index;
372
 
       count++;
373
 
     }
 
771
    if((requests[i] != MPI_REQUEST_NULL)) {
 
772
      if(smpi_mpi_test(&requests[i], pstat)) {
 
773
         indices[count] = i;
 
774
         count++;
 
775
         if(status != MPI_STATUSES_IGNORE) {
 
776
            memcpy(&status[i], pstat, sizeof(*pstat));
 
777
         }
 
778
      }
 
779
    }else{
 
780
      count_dead++;
 
781
    }
374
782
  }
375
 
  return count;
 
783
  if(count_dead==incount)return MPI_UNDEFINED;
 
784
  else return count;
376
785
}
377
786
 
378
787
void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
406
815
    // FIXME: check for errors
407
816
    smpi_datatype_extent(recvtype, &lb, &recvext);
408
817
    // Local copy from root
409
 
    smpi_datatype_copy(sendbuf, sendcount, sendtype, 
410
 
        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
 
818
    smpi_datatype_copy(sendbuf, sendcount, sendtype,
 
819
                       (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
411
820
    // Receive buffers from senders
412
821
    requests = xbt_new(MPI_Request, size - 1);
413
822
    index = 0;
414
823
    for(src = 0; src < size; src++) {
415
824
      if(src != root) {
416
 
        requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
417
 
                                          recvcount, recvtype, 
 
825
        requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
 
826
                                          recvcount, recvtype,
418
827
                                          src, system_tag, comm);
419
828
        index++;
420
829
      }
444
853
    // FIXME: check for errors
445
854
    smpi_datatype_extent(recvtype, &lb, &recvext);
446
855
    // Local copy from root
447
 
    smpi_datatype_copy(sendbuf, sendcount, sendtype, 
448
 
                       (char *)recvbuf + displs[root] * recvext, 
 
856
    smpi_datatype_copy(sendbuf, sendcount, sendtype,
 
857
                       (char *)recvbuf + displs[root] * recvext,
449
858
                       recvcounts[root], recvtype);
450
859
    // Receive buffers from senders
451
860
    requests = xbt_new(MPI_Request, size - 1);
453
862
    for(src = 0; src < size; src++) {
454
863
      if(src != root) {
455
864
        requests[index] =
456
 
            smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
457
 
                            recvcounts[src], recvtype, src, system_tag, comm);
 
865
          smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
 
866
                          recvcounts[src], recvtype, src, system_tag, comm);
458
867
        index++;
459
868
      }
460
869
    }
480
889
  // FIXME: check for errors
481
890
  smpi_datatype_extent(recvtype, &lb, &recvext);
482
891
  // Local copy from self
483
 
  smpi_datatype_copy(sendbuf, sendcount, sendtype, 
484
 
                     (char *)recvbuf + rank * recvcount * recvext, recvcount, 
 
892
  smpi_datatype_copy(sendbuf, sendcount, sendtype,
 
893
                     (char *)recvbuf + rank * recvcount * recvext, recvcount,
485
894
                     recvtype);
486
895
  // Send/Recv buffers to/from others;
487
896
  requests = xbt_new(MPI_Request, 2 * (size - 1));
489
898
  for(other = 0; other < size; other++) {
490
899
    if(other != rank) {
491
900
      requests[index] =
492
 
          smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
493
 
                          comm);
 
901
        smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
 
902
                        comm);
494
903
      index++;
495
 
      requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
496
 
                                        recvcount, recvtype, other, 
 
904
      requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
 
905
                                        recvcount, recvtype, other,
497
906
                                        system_tag, comm);
498
907
      index++;
499
908
    }
519
928
  // FIXME: check for errors
520
929
  smpi_datatype_extent(recvtype, &lb, &recvext);
521
930
  // Local copy from self
522
 
  smpi_datatype_copy(sendbuf, sendcount, sendtype, 
523
 
                     (char *)recvbuf + displs[rank] * recvext, 
 
931
  smpi_datatype_copy(sendbuf, sendcount, sendtype,
 
932
                     (char *)recvbuf + displs[rank] * recvext,
524
933
                     recvcounts[rank], recvtype);
525
934
  // Send buffers to others;
526
935
  requests = xbt_new(MPI_Request, 2 * (size - 1));
528
937
  for(other = 0; other < size; other++) {
529
938
    if(other != rank) {
530
939
      requests[index] =
531
 
          smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
532
 
                          comm);
 
940
        smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
 
941
                        comm);
533
942
      index++;
534
943
      requests[index] =
535
 
          smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
536
 
                          recvtype, other, system_tag, comm);
 
944
        smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
 
945
                        recvtype, other, system_tag, comm);
537
946
      index++;
538
947
    }
539
948
  }
563
972
    smpi_datatype_extent(sendtype, &lb, &sendext);
564
973
    // Local copy from root
565
974
    smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
566
 
      sendcount, sendtype, recvbuf, recvcount, recvtype);
 
975
                       sendcount, sendtype, recvbuf, recvcount, recvtype);
567
976
    // Send buffers to receivers
568
977
    requests = xbt_new(MPI_Request, size - 1);
569
978
    index = 0;
570
979
    for(dst = 0; dst < size; dst++) {
571
980
      if(dst != root) {
572
 
        requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
 
981
        requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
573
982
                                          sendcount, sendtype, dst,
574
983
                                          system_tag, comm);
575
984
        index++;
601
1010
    // FIXME: check for errors
602
1011
    smpi_datatype_extent(sendtype, &lb, &sendext);
603
1012
    // Local copy from root
604
 
    smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
 
1013
    smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
605
1014
                       sendtype, recvbuf, recvcount, recvtype);
606
1015
    // Send buffers to receivers
607
1016
    requests = xbt_new(MPI_Request, size - 1);
609
1018
    for(dst = 0; dst < size; dst++) {
610
1019
      if(dst != root) {
611
1020
        requests[index] =
612
 
            smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
613
 
                            sendtype, dst, system_tag, comm);
 
1021
          smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
 
1022
                          sendtype, dst, system_tag, comm);
614
1023
        index++;
615
1024
      }
616
1025
    }
640
1049
    // FIXME: check for errors
641
1050
    smpi_datatype_extent(datatype, &lb, &dataext);
642
1051
    // Local copy from root
643
 
    smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
 
1052
    if (sendbuf && recvbuf)
 
1053
      smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
644
1054
    // Receive buffers from senders
645
1055
    //TODO: make a MPI_barrier here ?
646
1056
    requests = xbt_new(MPI_Request, size - 1);
652
1062
        //  mapping...
653
1063
        tmpbufs[index] = xbt_malloc(count * dataext);
654
1064
        requests[index] =
655
 
            smpi_irecv_init(tmpbufs[index], count, datatype, src,
656
 
                            system_tag, comm);
 
1065
          smpi_irecv_init(tmpbufs[index], count, datatype, src,
 
1066
                          system_tag, comm);
657
1067
        index++;
658
1068
      }
659
1069
    }
661
1071
    smpi_mpi_startall(size - 1, requests);
662
1072
    for(src = 0; src < size - 1; src++) {
663
1073
      index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
 
1074
      XBT_DEBUG("finished waiting any request with index %d", index);
664
1075
      if(index == MPI_UNDEFINED) {
665
1076
        break;
666
1077
      }
667
 
      smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
 
1078
      if(op) /* op can be MPI_OP_NULL that does nothing */
 
1079
        smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
668
1080
    }
669
1081
    for(index = 0; index < size - 1; index++) {
670
1082
      xbt_free(tmpbufs[index]);
704
1116
  tmpbufs = xbt_new(void *, rank);
705
1117
  index = 0;
706
1118
  for(other = 0; other < rank; other++) {
707
 
    // FIXME: possibly overkill we we have contiguous/noncontiguous data 
 
1119
    // FIXME: possibly overkill we we have contiguous/noncontiguous data
708
1120
    // mapping...
709
1121
    tmpbufs[index] = xbt_malloc(count * dataext);
710
1122
    requests[index] =
711
 
        smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
712
 
                        comm);
 
1123
      smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
 
1124
                      comm);
713
1125
    index++;
714
1126
  }
715
1127
  for(other = rank + 1; other < size; other++) {
716
1128
    requests[index] =
717
 
        smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
 
1129
      smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
718
1130
    index++;
719
1131
  }
720
1132
  // Wait for completion of all comms.