16
16
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
18
18
// Example multi-thread BOINC application.
19
// It does 64 "units" of computation, where each units is about 1 GFLOP.
19
// This app defines its own classes (THREAD, THREAD_SET) for managing threads.
20
// You can also use libraries such as OpenMP.
21
// Just make sure you call boinc_init_parallel().
23
// This app does 64 "units" of computation, where each units is about 1 GFLOP.
20
24
// It divides this among N "worker" threads.
21
25
// N is passed in the command line, and defaults to 1.
23
// The main issue is how to suspend/resume the threads.
24
// The standard BOINC API doesn't work - it assumes that
25
// the initial thread is the only one.
26
// On Linux, there's no API to suspend/resume threads.
27
// All you can do is SIGSTOP/SIGCONT, which affects the whole process.
28
// So we use the following process/thread structure:
32
// - launches worker threads,
33
// - in polling loop, checks for suspend/resume messages
34
// from the BOINC client, and handles them itself.
37
// - forks worker process
38
// - in polling loop, checks for worker process completion
39
// - doesn't send status msgs
42
// - forks worker threads, wait for them to finish, exit
43
// - uses BOINC runtime to send status messages (frac done, CPU time)
45
27
// Doesn't do checkpointing.
32
#include "boinc_win.h"
51
34
#include <sys/types.h>
52
35
#include <sys/wait.h>
100
fprintf(stderr, "Can't start thread\n");
85
fprintf(stderr, "%s Can't start thread\n",
86
boinc_msg_prefix(buf, sizeof(buf))
105
92
retval = pthread_create(&id, 0, func, (void*)this);
107
fprintf(stderr, "can't start thread\n");
94
fprintf(stderr, "%s can't start thread\n",
95
boinc_msg_prefix(buf, sizeof(buf))
114
void suspend(bool if_susp) {
124
103
struct THREAD_SET {
125
104
vector<THREAD*> threads;
127
void suspend(bool if_susp) {
128
for (unsigned int i=0; i<threads.size(); i++) {
129
THREAD* t = threads[i];
130
if (t->id != THREAD_ID_NULL) t->suspend(if_susp);
132
fprintf(stderr, "suspended all\n");
135
105
bool all_done() {
136
106
for (unsigned int i=0; i<threads.size(); i++) {
137
107
if (threads[i]->id != THREAD_ID_NULL) return false;
168
138
void* worker(void* p) {
170
141
THREAD* t = (THREAD*)p;
171
142
for (int i=0; i<units_per_thread; i++) {
172
143
double x = do_a_giga_flop(i);
174
fprintf(stderr, "thread %d finished %d: %f\n", t->index, i, x);
145
fprintf(stderr, "%s thread %d finished %d: %f\n",
146
boinc_msg_prefix(buf, sizeof(buf)), t->index, i, x
176
149
t->id = THREAD_ID_NULL;
182
void main_thread(int nthreads) {
185
static BOINC_STATUS status;
155
int main(int argc, char** argv) {
156
int i, nthreads = DEFAULT_NTHREADS;
157
double start_time = dtime();
160
boinc_init_parallel();
162
for (i=1; i<argc; i++) {
163
if (!strcmp(argv[i], "--nthreads")) {
164
nthreads = atoi(argv[++i]);
166
fprintf(stderr, "%s unrecognized arg: %s\n",
167
boinc_msg_prefix(buf, sizeof(buf)), argv[i]
172
units_per_thread = TOTAL_UNITS/nthreads;
187
174
THREAD_SET thread_set;
188
175
for (i=0; i<nthreads; i++) {
189
176
thread_set.threads.push_back(new THREAD(worker, i));
192
179
double f = thread_set.units_done()/((double)TOTAL_UNITS);
193
180
boinc_fraction_done(f);
194
181
if (thread_set.all_done()) break;
196
int old_susp = status.suspended;
197
boinc_get_status(&status);
198
if (status.quit_request || status.abort_request || status.no_heartbeat) {
201
if (status.suspended != old_susp) {
202
thread_set.suspend(status.suspended != 0);
206
182
boinc_sleep(1.0);
211
int main(int argc, char** argv) {
212
BOINC_OPTIONS options;
213
int nthreads = DEFAULT_NTHREADS;
214
double start_time = dtime();
216
boinc_options_defaults(options);
217
options.direct_process_action = 0;
219
for (int i=1; i<argc; i++) {
220
if (!strcmp(argv[i], "--nthreads")) {
221
nthreads = atoi(argv[++i]);
223
fprintf(stderr, "unrecognized arg: %s\n", argv[i]);
227
units_per_thread = TOTAL_UNITS/nthreads;
230
boinc_init_options(&options);
231
main_thread(nthreads);
233
options.send_status_msgs = 0;
234
boinc_init_options(&options);
238
boinc_get_status(&status);
241
bool old_susp = status.suspended;
242
boinc_get_status(&status);
243
if (status.quit_request || status.abort_request || status.no_heartbeat) {
247
if (status.suspended != old_susp) {
248
kill(pid, status.suspended?SIGSTOP:SIGCONT);
250
if (waitpid(pid, &exit_status, WNOHANG) == pid) {
255
} else { // child (worker)
256
memset(&options, 0, sizeof(options));
257
options.send_status_msgs = 1;
258
boinc_init_options(&options);
259
main_thread(nthreads);
263
185
double elapsed_time = dtime()-start_time;
265
"All done. Used %d threads. Elapsed time %f\n",
266
nthreads, elapsed_time
187
"%s All done. Used %d threads. Elapsed time %f\n",
188
boinc_msg_prefix(buf, sizeof(buf)), nthreads, elapsed_time