9
/* $Id: mutex.c,v 1.24.10.1 2006-12-21 23:50:48 manoj Exp $ */
16
#define MAX_LOCKS 32768
19
#if defined(LAPI) || defined(GM)
23
double _dummy_work_=0.;
24
#ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
26
double _dummy_server_work_=0.;
28
static int num_mutexes=0, *tickets;
37
/* data structure to store info about blocked (waiting) process for mutex */
38
static waiting_list_t* blocked=(waiting_list_t*)0;
49
mutex_entry_t *glob_mutex;
52
int PARMCI_Create_mutexes(int num)
55
int *mutex_count = (int*)armci_internal_buffer;
57
if((sizeof(int)*armci_nproc) > armci_getbufsize()){
58
mutex_count = (double *)malloc(sizeof(int)*armci_nproc);
60
if (num < 0 || num > MAX_LOCKS) return(FAIL);
61
if(num_mutexes) armci_die("mutexes already created",num_mutexes);
63
if(armci_nproc == 1){ num_mutexes=1; return(0); }
65
/* local memory allocation for mutex arrays*/
66
mutex_mem_ar = (void*) malloc(armci_nproc*sizeof(void*));
67
if(!mutex_mem_ar) armci_die("ARMCI_Create_mutexes: malloc failed",0);
68
glob_mutex = (void*)malloc(armci_nproc*sizeof(mutex_entry_t));
71
armci_die("ARMCI_Create_mutexes: malloc 2 failed",0);
75
/* bzero(mutex_count,armci_nproc*sizeof(int));*/
76
bzero((char*)mutex_count,sizeof(int)*armci_nproc);
78
/* find out how many mutexes everybody allocated */
79
mutex_count[armci_me]=num;
80
armci_msg_igop(mutex_count, armci_nproc, "+");
81
for(p=totcount=0; p< armci_nproc; p++)totcount+=mutex_count[p];
83
tickets = calloc(totcount,sizeof(int));
90
/* we need memory for token and turn - 2 ints */
91
rc = PARMCI_Malloc(mutex_mem_ar,2*num*sizeof(int));
99
if(num)bzero((char*)mutex_mem_ar[armci_me],2*num*sizeof(int));
101
/* setup global mutex array */
102
for(p=totcount=0; p< armci_nproc; p++){
103
glob_mutex[p].token = mutex_mem_ar[p];
104
glob_mutex[p].turn = glob_mutex[p].token + mutex_count[p];
105
glob_mutex[p].count = mutex_count[p];
106
glob_mutex[p].tickets = tickets + totcount;
107
totcount += mutex_count[p];
110
num_mutexes= totcount;
117
fprintf(stderr,"%d created (%d,%d) mutexes\n",armci_me,num,totcount);
123
void armci_serv_mutex_create()
125
int mem = armci_nproc*sizeof(waiting_list_t);
126
blocked = (waiting_list_t*)malloc(mem);
127
if(!blocked) armci_die("armci server:error allocating mutex memory ",0);
131
void armci_serv_mutex_close()
133
if(blocked) free(blocked );
134
blocked = (waiting_list_t*)0;
138
int PARMCI_Destroy_mutexes()
140
#ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
141
int proc, mutex, i,factor=0;
143
if(num_mutexes==0)armci_die("armci_destroy_mutexes: not created",0);
144
if(armci_nproc == 1) return(0);
148
#ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
149
for(proc=0;proc<armci_nproc;proc++){
150
for(mutex=0;mutex<glob_mutex[proc].count;mutex++){
151
_dummy_server_work_ = 0.; /* must be global to fool the compiler */
152
while(!armci_mutex_free(mutex,proc)){
153
for(i=0; i< SPINMAX *factor; i++) _dummy_server_work_ += 1.;
161
# if defined(SERVER_LOCK)
162
armci_serv_mutex_close();
165
if(glob_mutex[armci_me].count)PARMCI_Free(glob_mutex[armci_me].token);
175
static int register_in_mutex_queue(int id, int proc)
177
int *mutex_entry, ticket;
179
if(glob_mutex[proc].count < id)
180
armci_die2("armci:invalid mutex id",id, glob_mutex[proc].count);
181
mutex_entry = glob_mutex[proc].token + id;
182
PARMCI_Rmw(ARMCI_FETCH_AND_ADD, &ticket, mutex_entry, 1, proc);
188
/*\ check if mutex is available by comparing turn and token
189
* can only be called by a process on the same SMP node as proc
191
static int armci_mutex_free(int mutex, int proc)
193
volatile int *mutex_ticket=glob_mutex[proc].turn + mutex;
194
volatile int *turn = glob_mutex[proc].token +mutex;
196
/* here we will put code to check if other processes on the node
197
* are waiting for this mutex
198
* lockinfo_node[me].ticket = mutex_ticket;
199
* lockinfo_node[me].mutex = mutex;
202
if(*mutex_ticket == *turn) return 1;
208
static void armci_generic_lock(int mutex, int proc)
210
int i, myturn, factor=0, len=sizeof(int);
211
int *mutex_ticket, next_in_line;
213
mutex_ticket= glob_mutex[proc].turn + mutex;
214
myturn = register_in_mutex_queue(mutex, proc);
216
/* code to reduce cost of unlocking mutex on the same SMP node goes here
217
* lockinfo_node[me].ticket = mutex_ticket;
218
* lockinfo_node[me].mutex = mutex;
221
_dummy_work_ = 0.; /* must be global to fool the compiler */
224
PARMCI_Get(mutex_ticket, &next_in_line, len, proc);
225
if(next_in_line > myturn)
226
armci_die2("armci: problem with tickets",myturn,next_in_line);
228
/* apply a linear backoff delay before retrying */
229
for(i=0; i< SPINMAX * factor; i++) _dummy_work_ += 1.;
233
}while (myturn != next_in_line);
235
glob_mutex[proc].tickets[mutex] = myturn; /* save ticket value */
239
static void armci_generic_unlock(int mutex, int proc)
241
int *mutex_ticket= glob_mutex[proc].turn + mutex;
242
int *newval = glob_mutex[proc].tickets +mutex;
245
/* update ticket for next process requesting this mutex */
248
/* write new ticket value stored previously in tickets */
249
PARMCI_Put(newval, mutex_ticket, len, proc);
254
/*\ Acquire mutex for "proc"
255
* -must be executed in hrecv/AM handler thread
256
* -application thread must use generic_lock routine
258
int armci_server_lock_mutex(int mutex, int proc, msg_tag_t tag)
261
int *mutex_ticket, next_in_line, len=sizeof(int);
262
int owner = armci_me;
265
if(DEBUG)fprintf(stderr,"SLOCK=%d owner=%d p=%d m=%d\n",
266
armci_me,owner, proc,mutex);
268
mutex_ticket= glob_mutex[owner].turn + mutex;
269
myturn = register_in_mutex_queue(mutex, owner);
271
armci_copy(mutex_ticket, &next_in_line, len);
273
if(next_in_line > myturn)
274
armci_die2("armci-s: problem with tickets",myturn,next_in_line);
276
if(next_in_line != myturn){
277
if(!blocked)armci_serv_mutex_create();
278
blocked[proc].mutex = mutex;
279
blocked[proc].turn = myturn;
280
blocked[proc].tag = tag;
281
if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d blocked (%d,%d)\n",
282
armci_me, proc, next_in_line,myturn);
287
if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d sending ticket (%d)\n",
288
armci_me, proc, myturn);
290
/* send ticket to requesting node */
291
/* GA_SEND_REPLY(tag, &myturn, sizeof(int), proc); */
298
/*\ Release mutex "id" held by proc
299
* called from hrecv/AM handler AND application thread
301
int armci_server_unlock_mutex(int mutex, int proc, int Ticket, msg_tag_t* ptag)
304
int owner = armci_me;
305
int i, p=NOBODY, *mutex_ticket= glob_mutex[owner].turn + mutex;
308
if(DEBUG) fprintf(stderr,"SUNLOCK=%d node=%d mutex=%d ticket=%d\n",
309
armci_me,proc,mutex,Ticket);
312
armci_copy(&Ticket, mutex_ticket, len);
314
/* if mutex is free then nobody is reqistered in queue */
315
if(armci_mutex_free(mutex, proc)) return -1;
317
/* search for the next process in queue waiting for this mutex */
318
for(i=0; i< armci_nproc; i++){
319
if(!blocked)break; /* not allocated yet - nobody is waiting */
320
if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d list=(%d,%d)\n",
321
armci_me, i, blocked[i].mutex, blocked[i].turn);
322
if((blocked[i].mutex == mutex) && (blocked[i].turn == Ticket)){
328
/* send Ticket to a process waiting for mutex */
330
if(p == armci_me)armci_die("server_unlock: cannot unlock self",0);
333
if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d unlock ticket=%d go=%d\n",
334
armci_me, proc, Ticket, p);
336
/* GA_SEND_REPLY(blocked[p].tag, &Ticket, sizeof(int), p); */
337
*ptag = blocked[p].tag;
343
return -1; /* nobody is waiting */
348
void PARMCI_Lock(int mutex, int proc)
350
#if defined(SERVER_LOCK)
354
if(DEBUG)fprintf(stderr,"%d enter lock\n",armci_me);
356
if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
358
if(mutex > glob_mutex[proc].count)
359
armci_die2("armci_lock: mutex not allocated", mutex,
360
glob_mutex[proc].count);
362
if(armci_nproc == 1) return;
364
# if defined(SERVER_LOCK)
365
direct=SAMECLUSNODE(proc);
367
armci_rem_lock(mutex,proc, glob_mutex[proc].tickets + mutex );
370
armci_generic_lock(mutex,proc);
372
if(DEBUG)fprintf(stderr,"%d leave lock\n",armci_me);
377
void PARMCI_Unlock(int mutex, int proc)
379
if(DEBUG)fprintf(stderr,"%d enter unlock\n",armci_me);
381
if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
383
if(mutex > glob_mutex[proc].count)
384
armci_die2("armci_lock: mutex not allocated", mutex,
385
glob_mutex[proc].count);
387
if(armci_nproc == 1) return;
389
# if defined(SERVER_LOCK)
392
armci_rem_unlock(mutex, proc, glob_mutex[proc].tickets[mutex]);
394
int ticket = glob_mutex[proc].tickets[mutex];
398
waiting = armci_server_unlock_mutex(mutex, proc, ticket, &tag);
400
armci_unlock_waiting_process(tag, waiting, ++ticket);
405
armci_generic_unlock(mutex, proc);
407
if(DEBUG)fprintf(stderr,"%d leave unlock\n",armci_me);