~ubuntu-branches/ubuntu/precise/boinc/precise

« back to all changes in this revision

Viewing changes to samples/multi_thread/multi_thread.cpp

Tags: 6.12.8+dfsg-1
* New upstream release.
* Simplified debian/rules

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
// along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17
17
 
18
18
// Example multi-thread BOINC application.
19
 
// It does 64 "units" of computation, where each units is about 1 GFLOP.
 
19
// This app defines its own classes (THREAD, THREAD_SET) for managing threads.
 
20
// You can also use libraries such as OpenMP.
 
21
// Just make sure you call boinc_init_parallel().
 
22
//
 
23
// This app does 64 "units" of computation, where each units is about 1 GFLOP.
20
24
// It divides this among N "worker" threads.
21
25
// N is passed in the command line, and defaults to 1.
22
26
//
23
 
// The main issue is how to suspend/resume the threads.
24
 
// The standard BOINC API doesn't work - it assumes that
25
 
// the initial thread is the only one.
26
 
// On Linux, there's no API to suspend/resume threads.
27
 
// All you can do is SIGSTOP/SIGCONT, which affects the whole process.
28
 
// So we use the following process/thread structure:
29
 
//
30
 
// Windows:
31
 
// Initial thread:
32
 
//  - launches worker threads,
33
 
//  - in polling loop, checks for suspend/resume messages
34
 
//    from the BOINC client, and handles them itself.
35
 
// Unix:
36
 
//  Initial process
37
 
//    - forks worker process
38
 
//    - in polling loop, checks for worker process completion
39
 
//    - doesn't send status msgs
40
 
//  Worker process
41
 
//    Initial thread:
42
 
//    - forks worker threads, wait for them to finish, exit
43
 
//    - uses BOINC runtime to send status messages (frac done, CPU time)
44
 
//
45
27
// Doesn't do checkpointing.
46
28
 
47
29
#include <stdio.h>
48
30
#include <vector>
49
31
#ifdef _WIN32
 
32
#include "boinc_win.h"
50
33
#else
51
34
#include <sys/types.h>
52
35
#include <sys/wait.h>
60
43
 
61
44
using std::vector;
62
45
 
63
 
#define DEFAULT_NTHREADS 1
64
 
#define TOTAL_UNITS 64
 
46
#define DEFAULT_NTHREADS 4
 
47
#define TOTAL_UNITS 16
65
48
 
66
49
int units_per_thread;
67
50
 
85
68
    int units_done;
86
69
 
87
70
    THREAD(THREAD_FUNC func, int i) {
 
71
        char buf[256];
 
72
 
88
73
        index = i;
89
74
        units_done = 0;
90
75
#ifdef _WIN32
97
82
            NULL
98
83
        );
99
84
        if (!id) {
100
 
            fprintf(stderr, "Can't start thread\n");
 
85
            fprintf(stderr, "%s Can't start thread\n",
 
86
                boinc_msg_prefix(buf, sizeof(buf))
 
87
            );
101
88
            exit(1);
102
89
        }
103
90
#else
104
91
        int retval;
105
92
        retval = pthread_create(&id, 0, func, (void*)this);
106
93
        if (retval) {
107
 
            fprintf(stderr, "can't start thread\n");
 
94
            fprintf(stderr, "%s can't start thread\n",
 
95
                boinc_msg_prefix(buf, sizeof(buf))
 
96
            );
108
97
            exit(1);
109
98
        }
110
99
#endif
111
100
    }
112
 
 
113
 
#ifdef _WIN32
114
 
    void suspend(bool if_susp) {
115
 
        if (if_susp) {
116
 
            SuspendThread(id);
117
 
        } else {
118
 
            ResumeThread(id);
119
 
        }
120
 
    }
121
 
#endif
122
101
};
123
102
 
124
103
struct THREAD_SET {
125
104
    vector<THREAD*> threads;
126
 
#ifdef _WIN32
127
 
    void suspend(bool if_susp) {
128
 
        for (unsigned int i=0; i<threads.size(); i++) {
129
 
            THREAD* t = threads[i];
130
 
            if (t->id != THREAD_ID_NULL) t->suspend(if_susp);
131
 
        }
132
 
        fprintf(stderr, "suspended all\n");
133
 
    }
134
 
#endif
135
105
    bool all_done() {
136
106
        for (unsigned int i=0; i<threads.size(); i++) {
137
107
            if (threads[i]->id != THREAD_ID_NULL) return false;
167
137
#else
168
138
void* worker(void* p) {
169
139
#endif
 
140
    char buf[256];
170
141
    THREAD* t = (THREAD*)p;
171
142
    for (int i=0; i<units_per_thread; i++) {
172
143
        double x = do_a_giga_flop(i);
173
144
        t->units_done++;
174
 
        fprintf(stderr, "thread %d finished %d: %f\n", t->index, i, x);
 
145
        fprintf(stderr, "%s thread %d finished %d: %f\n",
 
146
            boinc_msg_prefix(buf, sizeof(buf)), t->index, i, x
 
147
        );
175
148
    }
176
149
    t->id = THREAD_ID_NULL;
177
150
#ifdef _WIN32
179
152
#endif
180
153
}
181
154
 
182
 
void main_thread(int nthreads) {
183
 
    int i;
184
 
#ifdef _WIN32
185
 
    static BOINC_STATUS status;
186
 
#endif
 
155
int main(int argc, char** argv) {
 
156
    int i, nthreads = DEFAULT_NTHREADS;
 
157
    double start_time = dtime();
 
158
    char buf[256];
 
159
 
 
160
    boinc_init_parallel();
 
161
 
 
162
    for (i=1; i<argc; i++) {
 
163
        if (!strcmp(argv[i], "--nthreads")) {
 
164
            nthreads = atoi(argv[++i]);
 
165
        } else {
 
166
            fprintf(stderr, "%s unrecognized arg: %s\n",
 
167
                boinc_msg_prefix(buf, sizeof(buf)), argv[i]
 
168
            );
 
169
        }
 
170
    }
 
171
 
 
172
    units_per_thread = TOTAL_UNITS/nthreads;
 
173
 
187
174
    THREAD_SET thread_set;
188
175
    for (i=0; i<nthreads; i++) {
189
176
        thread_set.threads.push_back(new THREAD(worker, i));
192
179
        double f = thread_set.units_done()/((double)TOTAL_UNITS);
193
180
        boinc_fraction_done(f);
194
181
        if (thread_set.all_done()) break;
195
 
#ifdef _WIN32
196
 
        int old_susp = status.suspended;
197
 
        boinc_get_status(&status);
198
 
        if (status.quit_request || status.abort_request || status.no_heartbeat) {
199
 
            exit(0);
200
 
        }
201
 
        if (status.suspended != old_susp) {
202
 
            thread_set.suspend(status.suspended != 0);
203
 
        }
204
 
        boinc_sleep(0.1);
205
 
#else
206
182
        boinc_sleep(1.0);
207
 
#endif
208
 
    }
209
 
}
210
 
 
211
 
int main(int argc, char** argv) {
212
 
    BOINC_OPTIONS options;
213
 
    int nthreads = DEFAULT_NTHREADS;
214
 
    double start_time = dtime();
215
 
 
216
 
    boinc_options_defaults(options);
217
 
    options.direct_process_action = 0;
218
 
 
219
 
    for (int i=1; i<argc; i++) {
220
 
        if (!strcmp(argv[i], "--nthreads")) {
221
 
            nthreads = atoi(argv[++i]);
222
 
        } else {
223
 
            fprintf(stderr, "unrecognized arg: %s\n", argv[i]);
224
 
        }
225
 
    }
226
 
 
227
 
    units_per_thread = TOTAL_UNITS/nthreads;
228
 
 
229
 
#ifdef _WIN32
230
 
    boinc_init_options(&options);
231
 
    main_thread(nthreads);
232
 
#else
233
 
    options.send_status_msgs = 0;
234
 
    boinc_init_options(&options);
235
 
    int pid = fork();
236
 
    if (pid) {          // parent
237
 
        BOINC_STATUS status;
238
 
        boinc_get_status(&status);
239
 
        int exit_status;
240
 
        while (1) {
241
 
            bool old_susp = status.suspended;
242
 
            boinc_get_status(&status);
243
 
            if (status.quit_request || status.abort_request || status.no_heartbeat) {
244
 
                kill(pid, SIGKILL);
245
 
                exit(0);
246
 
            }
247
 
            if (status.suspended != old_susp) {
248
 
                kill(pid, status.suspended?SIGSTOP:SIGCONT);
249
 
            }
250
 
            if (waitpid(pid, &exit_status, WNOHANG) == pid) {
251
 
                break;
252
 
            }
253
 
            boinc_sleep(0.1);
254
 
        }
255
 
    } else {            // child (worker)
256
 
        memset(&options, 0, sizeof(options));
257
 
        options.send_status_msgs = 1;
258
 
        boinc_init_options(&options);
259
 
        main_thread(nthreads);
260
 
    }
261
 
#endif
 
183
    }
262
184
 
263
185
    double elapsed_time = dtime()-start_time;
264
186
    fprintf(stderr,
265
 
        "All done.  Used %d threads.  Elapsed time %f\n",
266
 
        nthreads, elapsed_time
 
187
        "%s All done.  Used %d threads.  Elapsed time %f\n",
 
188
        boinc_msg_prefix(buf, sizeof(buf)), nthreads, elapsed_time
267
189
    );
268
190
    boinc_finish(0);
269
191
}