1
/* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2
* All rights reserved. */
4
/* This program is free software; you can redistribute it and/or modify it
5
* under the terms of the license (GNU LGPL) which comes with this package. */
8
#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
9
#include "xbt/sysdep.h" /* calloc, printf */
11
/* Create a log channel to have nice outputs. */
13
#include "xbt/asserts.h"
15
/* we are going to use a generated platform */
16
#include "simgrid/platf_generator.h"
17
#include "simgrid/platf_interface.h"
19
XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
20
"Messages specific for this msg example");
22
int master(int argc, char *argv[]);
23
int slave(int argc, char *argv[]);
24
void promoter_1(context_node_t node);
25
void labeler_1(context_edge_t edge);
27
#define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */
29
#define TASK_COUNT_PER_HOST 5 /* Number of tasks each slave has to do */
30
#define TASK_COMP_SIZE 15000000 /* computation cost of each task */
31
#define TASK_COMM_SIZE 1200000 /* communication time of task */
34
* Just promote each node into a host, with fixed power
35
* Set one node as master
36
* Add a state trace on other nodes
38
void promoter_1(context_node_t node) {
40
s_sg_platf_host_cbarg_t host_parameters;
41
static int master_choosen = FALSE;
43
host_parameters.id = NULL;
44
host_parameters.power_peak = 25000000;
46
host_parameters.core_amount = 1;
47
host_parameters.power_scale = 1;
48
host_parameters.power_trace = NULL;
49
host_parameters.initial_state = SURF_RESOURCE_ON;
50
host_parameters.state_trace = NULL;
51
host_parameters.coord = NULL;
52
host_parameters.properties = NULL;
55
master_choosen = TRUE;
56
host_parameters.id = "host_master";
59
char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
60
char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
61
probabilist_event_generator_t pw_date_generator =
62
tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
63
probabilist_event_generator_t pw_value_generator =
64
tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
65
host_parameters.power_trace =
66
tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
71
platf_graph_promote_to_host(node, &host_parameters);
76
* Set all links with the same bandwidth and latency
77
* Add a state trace to each link
79
void labeler_1(context_edge_t edge) {
80
s_sg_platf_link_cbarg_t link_parameters;
81
link_parameters.id = NULL;
82
link_parameters.bandwidth = 100000000;
83
link_parameters.bandwidth_trace = NULL;
84
link_parameters.latency = 0.01;
85
link_parameters.latency_trace = NULL;
86
link_parameters.state = SURF_RESOURCE_ON;
87
link_parameters.state_trace = NULL;
88
link_parameters.policy = SURF_LINK_SHARED;
89
link_parameters.properties = NULL;
91
char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
92
char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
94
probabilist_event_generator_t avail_generator =
95
tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
96
probabilist_event_generator_t unavail_generator =
97
tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
99
link_parameters.state_trace =
100
tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
106
platf_graph_link_label(edge, &link_parameters);
110
int master(int argc, char *argv[])
112
int slaves_count = 0;
113
msg_host_t *slaves = NULL;
114
int number_of_tasks = 0;
115
double task_comp_size = 0;
116
double task_comm_size = 0;
119
number_of_tasks = TASK_COUNT_PER_HOST*argc;
120
task_comp_size = TASK_COMP_SIZE;
121
task_comm_size = TASK_COMM_SIZE;
123
{ /* Process organization */
125
slaves = xbt_new0(msg_host_t, slaves_count);
127
for (i = 0; i < argc; i++) {
128
slaves[i] = MSG_get_host_by_name(argv[i]);
129
if (slaves[i] == NULL) {
130
XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
136
XBT_INFO("Got %d slave(s) :", slaves_count);
137
for (i = 0; i < slaves_count; i++)
138
XBT_INFO("%s", MSG_host_get_name(slaves[i]));
140
XBT_INFO("Got %d task to process :", number_of_tasks);
142
for (i = 0; i < number_of_tasks; i++) {
143
msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
144
xbt_new0(double, 1));
146
*((double *) task->data) = MSG_get_clock();
148
a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
151
XBT_INFO("Send completed");
152
} else if (a == MSG_HOST_FAILURE) {
154
("Gloups. The cpu on which I'm running just turned off!. See you!");
156
MSG_task_destroy(task);
159
} else if (a == MSG_TRANSFER_FAILURE) {
161
("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
162
MSG_host_get_name(slaves[i % slaves_count]));
164
MSG_task_destroy(task);
165
} else if (a == MSG_TIMEOUT) {
167
("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
168
MSG_host_get_name(slaves[i % slaves_count]));
170
MSG_task_destroy(task);
172
XBT_INFO("Hey ?! What's up ? ");
173
xbt_die( "Unexpected behavior");
178
("All tasks have been dispatched. Let's tell everybody the computation is over.");
179
for (i = 0; i < slaves_count; i++) {
180
msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
181
int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
184
if (a == MSG_HOST_FAILURE) {
186
("Gloups. The cpu on which I'm running just turned off!. See you!");
187
MSG_task_destroy(task);
190
} else if (a == MSG_TRANSFER_FAILURE) {
191
XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
192
MSG_host_get_name(slaves[i]));
193
MSG_task_destroy(task);
194
} else if (a == MSG_TIMEOUT) {
196
("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
197
MSG_host_get_name(slaves[i % slaves_count]));
198
MSG_task_destroy(task);
200
XBT_INFO("Hey ?! What's up ? ");
201
xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
205
XBT_INFO("Goodbye now!");
208
} /* end_of_master */
210
int slave(int argc, char *argv[])
213
msg_task_t task = NULL;
217
time1 = MSG_get_clock();
218
a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
219
time2 = MSG_get_clock();
221
XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
222
if (MSG_task_get_data(task) == FINALIZE) {
223
MSG_task_destroy(task);
226
if (time1 < *((double *) task->data))
227
time1 = *((double *) task->data);
228
XBT_INFO("Communication time : \"%f\"", time2 - time1);
229
XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
230
a = MSG_task_execute(task);
232
XBT_INFO("\"%s\" done", MSG_task_get_name(task));
234
MSG_task_destroy(task);
235
} else if (a == MSG_HOST_FAILURE) {
237
("Gloups. The cpu on which I'm running just turned off!. See you!");
240
XBT_INFO("Hey ?! What's up ? ");
241
xbt_die("Unexpected behavior");
243
} else if (a == MSG_HOST_FAILURE) {
245
("Gloups. The cpu on which I'm running just turned off!. See you!");
247
} else if (a == MSG_TRANSFER_FAILURE) {
248
XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
249
} else if (a == MSG_TIMEOUT) {
250
XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
252
XBT_INFO("Hey ?! What's up ? ");
253
xbt_die("Unexpected behavior");
256
XBT_INFO("I'm done. See you!");
261
int main(int argc, char *argv[])
263
msg_error_t res = MSG_OK;
264
unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
265
unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
270
MSG_init(&argc, argv);
272
//Set up the seed for the platform generation
273
platf_random_seed(seed_platf_gen);
275
//Set up the RngStream for trace generation
276
sg_platf_rng_stream_init(seed_trace_gen);
278
XBT_INFO("creating nodes...");
279
platf_graph_uniform(10);
283
XBT_INFO("creating links...");
284
platf_graph_clear_links();
285
platf_graph_interconnect_waxman(0.9, 0.4);
286
XBT_INFO("done. Check connectedness...");
287
connected = platf_graph_is_connected();
288
XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
289
} while(!connected && max_tries);
291
if(!connected && !max_tries) {
292
xbt_die("Impossible to connect the graph, aborting.");
295
XBT_INFO("registering callbacks...");
296
platf_graph_promoter(promoter_1);
297
platf_graph_labeler(labeler_1);
299
XBT_INFO("promoting...");
302
XBT_INFO("labeling...");
305
XBT_INFO("Putting it in surf...");
308
XBT_INFO("Let's get the available hosts and dispatch work:");
311
msg_host_t host = NULL;
312
msg_host_t host_master = NULL;
313
msg_process_t process = NULL;
314
xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
315
char** hostname_list =
316
xbt_malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
318
xbt_dynar_foreach(host_dynar, i, host) {
319
process = MSG_process_create("slave", slave, NULL, host);
320
MSG_process_auto_restart_set(process, TRUE);
321
hostname_list[i] = (char*) MSG_host_get_name(host);
323
host_master = MSG_get_host_by_name("host_master");
324
MSG_process_create_with_arguments("master", master, NULL, host_master,
325
xbt_dynar_length(host_dynar),