~ubuntu-branches/ubuntu/raring/simgrid/raring

« back to all changes in this revision

Viewing changes to src/smpi/smpi_replay.c

  • Committer: Package Import Robot
  • Author(s): Martin Quinson
  • Date: 2013-01-31 00:24:51 UTC
  • mfrom: (10.1.6 sid)
  • Revision ID: package-import@ubuntu.com-20130131002451-krejhf7w7h24lpsc
Tags: 3.9~rc1-1
* New upstream release: the "Grasgory" release. Major changes:
  - Gras was completely removed from this version.
  - Documentation reorganization to ease browsing it.
  - New default value for the TCP_gamma parameter: 4MiB

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009, 2010, 2011, 2012. The SimGrid Team.
 
2
 * All rights reserved.                                                     */
 
3
 
 
4
/* This program is free software; you can redistribute it and/or modify it
 
5
 * under the terms of the license (GNU LGPL) which comes with this package. */
 
6
 
 
7
 
 
8
#include "private.h"
 
9
#include <stdio.h>
 
10
#include <xbt.h>
 
11
#include <xbt/replay.h>
 
12
 
 
13
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
 
14
 
 
15
int communicator_size = 0;
 
16
static int active_processes = 0;
 
17
 
 
18
static void log_timed_action (const char *const *action, double clock){
 
19
  if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
 
20
    char *name = xbt_str_join_array(action, " ");
 
21
    XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
 
22
    free(name);
 
23
  }
 
24
}
 
25
 
 
26
typedef struct {
 
27
  xbt_dynar_t isends; /* of MPI_Request */
 
28
  xbt_dynar_t irecvs; /* of MPI_Request */
 
29
} s_smpi_replay_globals_t, *smpi_replay_globals_t;
 
30
 
 
31
/* Helper function */
 
32
static double parse_double(const char *string)
 
33
{
 
34
  double value;
 
35
  char *endptr;
 
36
  value = strtod(string, &endptr);
 
37
  if (*endptr != '\0')
 
38
    THROWF(unknown_error, 0, "%s is not a double", string);
 
39
  return value;
 
40
}
 
41
 
 
42
static void action_init(const char *const *action)
 
43
{
 
44
  XBT_DEBUG("Initialize the counters");
 
45
  smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
 
46
  globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL);
 
47
  globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
 
48
 
 
49
  smpi_process_set_user_data((void*) globals);
 
50
 
 
51
  /* start a simulated timer */
 
52
  smpi_process_simulated_start();
 
53
  /*initialize the number of active processes */
 
54
  active_processes = smpi_process_count();
 
55
}
 
56
 
 
57
static void action_finalize(const char *const *action)
 
58
{
 
59
  smpi_replay_globals_t globals =
 
60
      (smpi_replay_globals_t) smpi_process_get_user_data();
 
61
 
 
62
  if (globals){
 
63
    XBT_DEBUG("There are %lu isends and %lu irecvs in the dynars",
 
64
         xbt_dynar_length(globals->isends),xbt_dynar_length(globals->irecvs));
 
65
    xbt_dynar_free_container(&(globals->isends));
 
66
    xbt_dynar_free_container(&(globals->irecvs));
 
67
  }
 
68
  free(globals);
 
69
}
 
70
 
 
71
static void action_comm_size(const char *const *action)
 
72
{
 
73
  double clock = smpi_process_simulated_elapsed();
 
74
 
 
75
  communicator_size = parse_double(action[2]);
 
76
  log_timed_action (action, clock);
 
77
}
 
78
 
 
79
static void action_comm_split(const char *const *action)
 
80
{
 
81
  double clock = smpi_process_simulated_elapsed();
 
82
 
 
83
  log_timed_action (action, clock);
 
84
}
 
85
 
 
86
static void action_comm_dup(const char *const *action)
 
87
{
 
88
  double clock = smpi_process_simulated_elapsed();
 
89
 
 
90
  log_timed_action (action, clock);
 
91
}
 
92
 
 
93
static void action_compute(const char *const *action)
 
94
{
 
95
  double clock = smpi_process_simulated_elapsed();
 
96
  smpi_execute_flops(parse_double(action[2]));
 
97
 
 
98
  log_timed_action (action, clock);
 
99
}
 
100
 
 
101
static void action_send(const char *const *action)
 
102
{
 
103
  int to = atoi(action[2]);
 
104
  double size=parse_double(action[3]);
 
105
  double clock = smpi_process_simulated_elapsed();
 
106
#ifdef HAVE_TRACING
 
107
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
108
  TRACE_smpi_computing_out(rank);
 
109
  int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
 
110
  TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
 
111
  TRACE_smpi_send(rank, rank, dst_traced);
 
112
#endif
 
113
 
 
114
  smpi_mpi_send(NULL, size, MPI_BYTE, to , 0, MPI_COMM_WORLD);
 
115
 
 
116
  log_timed_action (action, clock);
 
117
 
 
118
  #ifdef HAVE_TRACING
 
119
  TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
 
120
  TRACE_smpi_computing_in(rank);
 
121
#endif
 
122
 
 
123
}
 
124
 
 
125
static void action_Isend(const char *const *action)
 
126
{
 
127
  int to = atoi(action[2]);
 
128
  double size=parse_double(action[3]);
 
129
  double clock = smpi_process_simulated_elapsed();
 
130
  smpi_replay_globals_t globals =
 
131
     (smpi_replay_globals_t) smpi_process_get_user_data();
 
132
#ifdef HAVE_TRACING
 
133
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
134
  TRACE_smpi_computing_out(rank);
 
135
  int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
 
136
  TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
 
137
  TRACE_smpi_send(rank, rank, dst_traced);
 
138
#endif
 
139
 
 
140
  MPI_Request request = smpi_mpi_isend(NULL, size, MPI_BYTE, to, 0,
 
141
                                       MPI_COMM_WORLD);
 
142
#ifdef HAVE_TRACING
 
143
  TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
 
144
  request->send = 1;
 
145
  TRACE_smpi_computing_in(rank);
 
146
#endif
 
147
 
 
148
  xbt_dynar_push(globals->isends,&request);
 
149
 
 
150
  log_timed_action (action, clock);
 
151
}
 
152
 
 
153
static void action_recv(const char *const *action) {
 
154
  int from = atoi(action[2]);
 
155
  double size=parse_double(action[3]);
 
156
  double clock = smpi_process_simulated_elapsed();
 
157
  MPI_Status status;
 
158
#ifdef HAVE_TRACING
 
159
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
160
  int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
 
161
  TRACE_smpi_computing_out(rank);
 
162
 
 
163
  TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
 
164
#endif
 
165
 
 
166
  smpi_mpi_recv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD, &status);
 
167
 
 
168
#ifdef HAVE_TRACING
 
169
  TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
 
170
  TRACE_smpi_recv(rank, src_traced, rank);
 
171
  TRACE_smpi_computing_in(rank);
 
172
#endif
 
173
 
 
174
  log_timed_action (action, clock);
 
175
}
 
176
 
 
177
static void action_Irecv(const char *const *action)
 
178
{
 
179
  int from = atoi(action[2]);
 
180
  double size=parse_double(action[3]);
 
181
  double clock = smpi_process_simulated_elapsed();
 
182
  MPI_Request request;
 
183
  smpi_replay_globals_t globals =
 
184
     (smpi_replay_globals_t) smpi_process_get_user_data();
 
185
 
 
186
#ifdef HAVE_TRACING
 
187
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
188
  int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
 
189
  TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
 
190
#endif
 
191
 
 
192
  request = smpi_mpi_irecv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD);
 
193
#ifdef HAVE_TRACING
 
194
  TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
 
195
  request->recv = 1;
 
196
#endif
 
197
  xbt_dynar_push(globals->irecvs,&request);
 
198
 
 
199
  log_timed_action (action, clock);
 
200
}
 
201
 
 
202
static void action_wait(const char *const *action){
 
203
  double clock = smpi_process_simulated_elapsed();
 
204
  MPI_Request request;
 
205
  MPI_Status status;
 
206
  smpi_replay_globals_t globals =
 
207
      (smpi_replay_globals_t) smpi_process_get_user_data();
 
208
 
 
209
  xbt_assert(xbt_dynar_length(globals->irecvs),
 
210
      "action wait not preceded by any irecv: %s",
 
211
      xbt_str_join_array(action," "));
 
212
  request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
 
213
#ifdef HAVE_TRACING
 
214
  int rank = request && request->comm != MPI_COMM_NULL
 
215
      ? smpi_comm_rank(request->comm)
 
216
      : -1;
 
217
  TRACE_smpi_computing_out(rank);
 
218
 
 
219
  MPI_Group group = smpi_comm_group(request->comm);
 
220
  int src_traced = smpi_group_rank(group, request->src);
 
221
  int dst_traced = smpi_group_rank(group, request->dst);
 
222
  int is_wait_for_receive = request->recv;
 
223
  TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__);
 
224
#endif
 
225
  smpi_mpi_wait(&request, &status);
 
226
#ifdef HAVE_TRACING
 
227
  TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
 
228
  if (is_wait_for_receive) {
 
229
    TRACE_smpi_recv(rank, src_traced, dst_traced);
 
230
  }
 
231
  TRACE_smpi_computing_in(rank);
 
232
#endif
 
233
 
 
234
  log_timed_action (action, clock);
 
235
}
 
236
 
 
237
static void action_waitall(const char *const *action){
 
238
  double clock = smpi_process_simulated_elapsed();
 
239
//  smpi_mpi_waitall(count, requests, status);
 
240
  log_timed_action (action, clock);
 
241
}
 
242
 
 
243
static void action_barrier(const char *const *action){
 
244
  double clock = smpi_process_simulated_elapsed();
 
245
#ifdef HAVE_TRACING
 
246
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
247
  TRACE_smpi_computing_out(rank);
 
248
  TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
 
249
#endif
 
250
  smpi_mpi_barrier(MPI_COMM_WORLD);
 
251
#ifdef HAVE_TRACING
 
252
  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
 
253
  TRACE_smpi_computing_in(rank);
 
254
#endif
 
255
 
 
256
  log_timed_action (action, clock);
 
257
}
 
258
 
 
259
static void action_bcast(const char *const *action)
 
260
{
 
261
  double size = parse_double(action[2]);
 
262
  double clock = smpi_process_simulated_elapsed();
 
263
#ifdef HAVE_TRACING
 
264
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
265
  TRACE_smpi_computing_out(rank);
 
266
  int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
 
267
  TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
 
268
#endif
 
269
 
 
270
  smpi_mpi_bcast(NULL, size, MPI_BYTE, 0, MPI_COMM_WORLD);
 
271
#ifdef HAVE_TRACING
 
272
  TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
 
273
  TRACE_smpi_computing_in(rank);
 
274
#endif
 
275
 
 
276
  log_timed_action (action, clock);
 
277
}
 
278
 
 
279
static void action_reduce(const char *const *action)
 
280
{
 
281
  double size = parse_double(action[2]);
 
282
  double clock = smpi_process_simulated_elapsed();
 
283
#ifdef HAVE_TRACING
 
284
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
285
  TRACE_smpi_computing_out(rank);
 
286
  int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
 
287
  TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
 
288
#endif
 
289
   smpi_mpi_reduce(NULL, NULL, size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
 
290
#ifdef HAVE_TRACING
 
291
  TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
 
292
  TRACE_smpi_computing_in(rank);
 
293
#endif
 
294
 
 
295
  log_timed_action (action, clock);
 
296
}
 
297
 
 
298
static void action_allReduce(const char *const *action) {
 
299
  double comm_size = parse_double(action[2]);
 
300
  double comp_size = parse_double(action[3]);
 
301
  double clock = smpi_process_simulated_elapsed();
 
302
#ifdef HAVE_TRACING
 
303
  int rank = smpi_comm_rank(MPI_COMM_WORLD);
 
304
  TRACE_smpi_computing_out(rank);
 
305
  TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
 
306
#endif
 
307
  smpi_mpi_reduce(NULL, NULL, comm_size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
 
308
  smpi_execute_flops(comp_size);
 
309
  smpi_mpi_bcast(NULL, comm_size, MPI_BYTE, 0, MPI_COMM_WORLD);
 
310
#ifdef HAVE_TRACING
 
311
  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
 
312
  TRACE_smpi_computing_in(rank);
 
313
#endif
 
314
 
 
315
  log_timed_action (action, clock);
 
316
}
 
317
 
 
318
static void action_allToAll(const char *const *action) {
 
319
  double clock = smpi_process_simulated_elapsed();
 
320
  double comm_size = smpi_comm_size(MPI_COMM_WORLD);
 
321
  double send_size = parse_double(action[2]);
 
322
  double recv_size = parse_double(action[3]);
 
323
 
 
324
#ifdef HAVE_TRACING
 
325
  int rank = smpi_process_index();
 
326
  TRACE_smpi_computing_out(rank);
 
327
  TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
 
328
#endif
 
329
 
 
330
  if (send_size < 200 && comm_size > 12) {
 
331
    smpi_coll_tuned_alltoall_bruck(NULL, send_size, MPI_BYTE,
 
332
                                   NULL, recv_size, MPI_BYTE,
 
333
                                   MPI_COMM_WORLD);
 
334
  } else if (send_size < 3000) {
 
335
    smpi_coll_tuned_alltoall_basic_linear(NULL, send_size, MPI_BYTE,
 
336
                                          NULL, recv_size, MPI_BYTE,
 
337
                                          MPI_COMM_WORLD);
 
338
  } else {
 
339
    smpi_coll_tuned_alltoall_pairwise(NULL, send_size, MPI_BYTE,
 
340
                                      NULL, recv_size, MPI_BYTE,
 
341
                                      MPI_COMM_WORLD);
 
342
  }
 
343
 
 
344
#ifdef HAVE_TRACING
 
345
  TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
 
346
  TRACE_smpi_computing_in(rank);
 
347
#endif
 
348
 
 
349
  log_timed_action (action, clock);
 
350
}
 
351
 
 
352
static void action_allToAllv(const char *const *action) {
 
353
  double clock = smpi_process_simulated_elapsed();
 
354
 
 
355
  log_timed_action (action, clock);
 
356
}
 
357
 
 
358
void smpi_replay_init(int *argc, char***argv){
 
359
  PMPI_Init(argc, argv);
 
360
  if (!smpi_process_index()){
 
361
    _xbt_replay_action_init();
 
362
    xbt_replay_action_register("init",       action_init);
 
363
    xbt_replay_action_register("finalize",   action_finalize);
 
364
    xbt_replay_action_register("comm_size",  action_comm_size);
 
365
    xbt_replay_action_register("comm_split", action_comm_split);
 
366
    xbt_replay_action_register("comm_dup",   action_comm_dup);
 
367
    xbt_replay_action_register("send",       action_send);
 
368
    xbt_replay_action_register("Isend",      action_Isend);
 
369
    xbt_replay_action_register("recv",       action_recv);
 
370
    xbt_replay_action_register("Irecv",      action_Irecv);
 
371
    xbt_replay_action_register("wait",       action_wait);
 
372
    xbt_replay_action_register("waitAll",    action_waitall);
 
373
    xbt_replay_action_register("barrier",    action_barrier);
 
374
    xbt_replay_action_register("bcast",      action_bcast);
 
375
    xbt_replay_action_register("reduce",     action_reduce);
 
376
    xbt_replay_action_register("allReduce",  action_allReduce);
 
377
    xbt_replay_action_register("allToAll",   action_allToAll);
 
378
    xbt_replay_action_register("allToAllV",  action_allToAllv);
 
379
    xbt_replay_action_register("compute",    action_compute);
 
380
  }
 
381
 
 
382
  xbt_replay_action_runner(*argc, *argv);
 
383
}
 
384
 
 
385
int smpi_replay_finalize(){
 
386
  double sim_time= 1.;
 
387
  /* One active process will stop. Decrease the counter*/
 
388
  active_processes--;
 
389
 
 
390
  if(!active_processes){
 
391
    /* Last process alive speaking */
 
392
    /* end the simulated timer */
 
393
    sim_time = smpi_process_simulated_elapsed();
 
394
    XBT_INFO("Simulation time %g", sim_time);
 
395
    _xbt_replay_action_exit();
 
396
  }
 
397
  return PMPI_Finalize();
 
398
}