12
#include "SharedQueue.h"
15
using namespace tascel;
16
using namespace tascel::comm;
19
SharedQueue::SharedQueue(int _tsk_size, int _max_ntsks)
20
: max_ntsks(_max_ntsks)
29
q = new char *[nproc()];
31
ARMCI_Malloc((void **)q, max_ntsks * tsk_size);
32
massertl(q[me()], error1);
33
ARMCI_Create_mutexes(1);
34
sq_state = new sq_state_t*[nproc()];
35
massertl(sq_state, error1);
36
ARMCI_Malloc((void **)sq_state, nproc()*sizeof(sq_state_t));
37
massertl(sq_state[me()], error2);
38
head = &sq_state[me()]->head;
39
tail = &sq_state[me()]->tail;
40
size = &sq_state[me()]->size;
41
dirty = &sq_state[me()]->dirty;
42
*head = *tail = *size = *dirty = 0;
43
//printf("(cons) %d: *head=%p *tail=%p *size=%p\n", me(), head, tail, size);
47
printf("Error2 in SharedQueue constructor\n");
50
printf("Error1 in SharedQueue constructor\n");
53
printf("Error in SharedQueue constructor\n");
57
SharedQueue::~SharedQueue() {
59
ARMCI_Free(sq_state[me()]);
60
ARMCI_Destroy_mutexes();
67
SharedQueue::empty() const {
68
/*SK: speculative implementation. It could become empty while this
69
operation is ongoing. However, the converse is not true, since
70
only the host process call empty() or can add tasks (thus
80
SharedQueue::getTask(void *dscr, int dlen) {
82
massert(dlen == tsk_size);
85
ARMCI_Unlock(0, me());
88
*head = (*head - 1 + max_ntsks) % max_ntsks;
89
memcpy(dscr, &q[me()][*head*tsk_size], tsk_size);
91
ARMCI_Unlock(0, me());
100
SharedQueue::addTask(void *dscr, int dlen) {
102
massert(dlen == tsk_size);
104
// printf("addTask. %d: *head=%p *tail=%p *size=%p\n", me(), head, tail, size);
105
massertl(*size != max_ntsks, error2);
106
memcpy(&q[me()][*head*tsk_size], dscr, tsk_size);
107
*head = (*head + 1) % max_ntsks;
109
ARMCI_Unlock(0, me());
113
ARMCI_Unlock(0, me());
119
SharedQueue::steal(int proc) {
122
buf = new char[tsk_size];
125
//FIXME: might need a lock+unlock to ensure both size and dirty are
126
//updated atomically by any thief (say through RDMA)
127
td.progress(*dirty == 0);
129
if (td.hasTerminated()) {
135
ARMCI_Get(sq_state[proc], &sq, sizeof(sq), proc);
136
// printf("%d: Locked %d for steal. size=%d\n", me(), proc, sq.size);
139
ARMCI_Unlock(0, proc);
142
sq.size = sq.size - 1;
143
sq.dirty = 1; //mark dirty for termination detection
144
sq.head = (sq.head - 1 + max_ntsks) % max_ntsks;
145
ARMCI_Get(&q[proc][sq.head*tsk_size], buf, tsk_size, proc);
146
ARMCI_Put(&sq, sq_state[proc], sizeof(sq), proc);
147
//FIXME: does unlock imply an ARMCI_Fence. If not, add one
148
ARMCI_Unlock(0, proc);
150
addTask(buf, tsk_size);
159
SharedQueue::hasTerminated() {
160
return td.hasTerminated();
164
SharedQueue::td_progress() {
165
td.progress(*dirty == 0);