5
/** $Id: aggregate.c,v 1.6 2003-10-22 22:12:14 d3h325 Exp $
6
* Aggregate Put/Get requests
11
# include <string.h> /* memcpy */
17
#define _MAX_AGG_BUFFERS 32 /* Maximum # of aggregation buffers available*/
18
#define _MAX_AGG_BUFSIZE 2048 /* size of each buffer. should be < 2^15 */
19
#define _MAX_PTRS 256 /* < 2^15, as it is "short int" in agg_req_t */
20
#define _MAX_AGG_HANDLE _MAX_AGG_BUFFERS /* Max # of aggregation handles */
22
/* aggregate request handle */
24
unsigned int tag; /* non-blocking request tag */
25
short int proc; /* remote process id */
26
short int request_len ; /* number of requests */
27
short int ptr_array_len; /* pointer length for this request */
28
short int buf_pos_end; /* position of buffer (from right end) */
29
armci_giov_t *darr; /* giov vectors */
31
static agg_req_t *aggr[_MAX_AGG_HANDLE]; /* aggregate request handle */
34
/* data structure for dynamic buffer management */
36
int size; /* represents the size of the list (not linked list) */
37
int index[_MAX_AGG_HANDLE];
39
static agg_list_t ulist, alist;/*in-use & available aggr buffer index list*/
42
/* aggregation buffer */
43
static char agg_buf[_MAX_AGG_BUFFERS][_MAX_AGG_BUFSIZE];
44
/* aggregation buffer to store the pointers */
45
static void* agg_src_ptr[_MAX_AGG_BUFFERS][_MAX_PTRS];
46
static void* agg_dst_ptr[_MAX_AGG_BUFFERS][_MAX_PTRS];
49
* ---------------------------------------------------------------------
50
* fill descriptor from this side (left to right)
52
* _______________________________________________
53
* | | | |. . . . . . . . . . | | | |
54
* |__|__|__|_____________________________|__|__|__|
57
* fill src and dst pointer (arrays) in this direction
60
* Once they are about to cross each other (implies buffer is full),
61
* complete the data transfer.
62
* ---------------------------------------------------------------------
65
#define AGG_INIT_NB_HANDLE(op_type, p, nb_handle) \
66
if(nb_handle->proc < 0) { \
67
nb_handle->tag = GET_NEXT_NBTAG(); \
68
nb_handle->op = op_type; \
69
nb_handle->proc = p; \
70
nb_handle->bufid= NB_NONE; \
72
else if(nb_handle->op != op_type) \
73
armci_die("ARMCI_NbXXX: AGG_INIT_NB_HANDLE(): Aggregate Failed, Invalid non-blocking handle", nb_handle->op); \
74
else if(nb_handle->proc != p) \
75
armci_die("ARMCI_NbXXX: AGG_INIT_NB_HANDLE(): Aggregate Failed, Invalid non-blocking handle", p)
78
/* initialize/set the fields in the buffer*/
79
#define _armci_agg_set_buffer(index, tag, proc, len) { \
80
aggr[(index)]->tag = (tag); \
81
aggr[(index)]->proc = (proc); \
82
aggr[(index)]->request_len = (len); \
83
ulist.index[ulist.size++] = (index);/* add the new index to the in-use list and increment it's size*/ \
86
/* get the index of the aggregation buffer to be used */
87
static int _armci_agg_get_bufferid(armci_ihdl_t nb_handle) {
88
int i, index, proc = nb_handle->proc;
89
unsigned int tag = nb_handle->tag;
91
/* check if there is an entry for this handle in the existing list*/
92
for(i=ulist.size-1; i>=0; i--) {
93
index = ulist.index[i];
94
if(aggr[index]->tag == tag && aggr[index]->proc == proc)
98
/* else it is a new handle, so get a aggr buffer from either
99
of the lists. ???? don't throw exception here */
100
if(ulist.size >= _MAX_AGG_BUFFERS && alist.size == 0)
101
armci_die("_armci_agg_get_index: Too many outstanding aggregation requests\n", ulist.size);
103
/*If there is a buffer in readily available list,use it*/
104
if(alist.size > 0) index = alist.index[--alist.size];
105
else { /* else use/get a buffer from the main list */
108
/* allocate memory for aggregate request handle */
109
aggr[index] = (agg_req_t *)agg_buf[index];
111
aggr[index]->request_len = 0;
112
aggr[index]->ptr_array_len = 0;
113
aggr[index]->buf_pos_end = _MAX_AGG_BUFSIZE;
115
/* allocate memory for giov vector field in aggregate request handler */
116
aggr[index]->darr = (armci_giov_t *)(agg_buf[index]+sizeof(agg_req_t));
119
_armci_agg_set_buffer(index, tag, proc, 0);
123
static void _armci_agg_update_lists(int index) {
125
/* remove that index from the in-use list and bring the last element
126
in the in-use list to the position of the removed one. */
127
for(i=0; i<ulist.size; i++)
128
if(ulist.index[i] == index) {
129
ulist.index[i] = ulist.index[ulist.size-1];
134
/* and add the removed index to the available list and increment */
135
alist.index[alist.size++] = index;
139
/* replace with macro later */
141
_armci_agg_get_descriptor(int *ptr_array_len,int bytes,armci_ihdl_t nb_handle,
142
int is_registered_put, void **registered_put_data) {
144
short unsigned int get_new_descr=0, bytes_needed=0, rid;
146
int index = _armci_agg_get_bufferid(nb_handle);
148
rid = aggr[index]->request_len; /* index of giov descriptor */
149
bytes_remaining = aggr[index]->buf_pos_end -
150
(sizeof(agg_req_t) + aggr[index]->request_len*sizeof(armci_giov_t));
152
/* extra bytes required to store registered put data */
153
if(is_registered_put) bytes_needed = bytes;
155
/* if (byte-)sizes are equal, use previously created descriptor
156
else get a new descriptor */
157
if( rid && bytes==aggr[index]->darr[rid-1].bytes) --rid;
158
else { get_new_descr=1; bytes_needed += sizeof(armci_giov_t); }
160
/* If buffer is full, then complete data transfer. After completion,
161
if still ptr array_len is greater than maximum limit(_MAX_PTRS),
162
then do it by parts. Determine new ptr_array_len that fits buffer */
163
if( (bytes_needed > bytes_remaining) ||
164
(_MAX_PTRS - aggr[index]->ptr_array_len < *ptr_array_len)) {
165
armci_agg_complete(nb_handle, SET);
166
rid = 0; get_new_descr=1;
167
if(*ptr_array_len > _MAX_PTRS) *ptr_array_len = _MAX_PTRS;
170
/* if new descriptor, allocate memory for src_ptr & dst_ptr arrays */
172
int i = aggr[index]->ptr_array_len;
173
aggr[index]->darr[rid].src_ptr_array = (void **)&agg_src_ptr[index][i];
174
aggr[index]->darr[rid].dst_ptr_array = (void **)&agg_dst_ptr[index][i];
175
aggr[index]->darr[rid].ptr_array_len = 0;
176
aggr[index]->request_len++;
179
/* store registered put data */
180
if(is_registered_put) {
181
aggr[index]->buf_pos_end -= bytes;
182
memcpy(&((char *)aggr[index])[aggr[index]->buf_pos_end],
183
*((char **)registered_put_data), bytes);
184
*(char **)registered_put_data = (char *)&((char *)aggr[index])[aggr[index]->buf_pos_end];
187
aggr[index]->ptr_array_len += *ptr_array_len;
188
return (&aggr[index]->darr[rid]);
191
int armci_agg_save_descriptor(void *src, void *dst, int bytes, int proc, int op,
192
int is_registered_put, armci_ihdl_t nb_handle) {
197
/* set up the handle if it is a new aggregation request */
198
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
200
darr = _armci_agg_get_descriptor(&one, bytes, nb_handle,
201
is_registered_put, &src);
202
idx = darr->ptr_array_len;
204
darr->src_ptr_array[idx] = src;
205
darr->dst_ptr_array[idx] = dst;
207
darr->ptr_array_len += 1;
214
int armci_agg_save_giov_descriptor(armci_giov_t dscr[], int len, int proc,
215
int op, armci_ihdl_t nb_handle) {
216
int i, j, k, idx, bytes, ptr_array_len;
219
/* set up the handle if it is a new aggregation request */
220
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
222
for(i=0; i<len; i++) {
224
bytes = dscr[i].bytes;
225
ptr_array_len = dscr[i].ptr_array_len;
227
darr=_armci_agg_get_descriptor(&ptr_array_len,bytes,nb_handle,0,0);
228
idx = darr->ptr_array_len;
230
for(j=idx; j<idx+ptr_array_len; j++, k++) {
231
darr->src_ptr_array[j] = dscr[i].src_ptr_array[k];
232
darr->dst_ptr_array[j] = dscr[i].dst_ptr_array[k];
234
darr->bytes = dscr[i].bytes;
235
darr->ptr_array_len += ptr_array_len;
237
ptr_array_len = dscr[i].ptr_array_len - ptr_array_len;
238
if(ptr_array_len <0) armci_die("agg_save_giov_descr failed", 0L);
239
} while(k < darr[i].ptr_array_len);
244
int armci_agg_save_strided_descriptor(void *src_ptr, int src_stride_arr[],
245
void* dst_ptr, int dst_stride_arr[],
246
int count[], int stride_levels, int proc,
247
int op, armci_ihdl_t nb_handle) {
249
int i, j, k, idx, ptr_array_len=1, total1D=1, num1D=0;
250
int offset1, offset2, factor[MAX_STRIDE_LEVEL];
253
/* set up the handle if it is a new aggregation request */
254
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
256
for(i=1; i<=stride_levels; i++) {
260
ptr_array_len = total1D;
263
darr=_armci_agg_get_descriptor(&ptr_array_len,count[0],nb_handle,0,0);
264
idx = darr->ptr_array_len;
266
/* converting stride into giov vector */
267
for(i=idx; i<idx+ptr_array_len; i++) {
268
for(j=0, offset1=0, offset2=0; j<stride_levels; j++) {
269
offset1 += src_stride_arr[j]*factor[j];
270
offset2 += dst_stride_arr[j]*factor[j];
272
darr->src_ptr_array[i] = (char *)src_ptr + offset1;
273
darr->dst_ptr_array[i] = (char *)dst_ptr + offset2;
276
for(j=1; j<stride_levels; j++)
277
if(num1D%count[j]==0) {
279
for(k=0; k<j;k++) factor[k]=0;
283
darr->bytes = count[0];
284
darr->ptr_array_len += ptr_array_len;
285
ptr_array_len = total1D - ptr_array_len;
286
if(ptr_array_len <0) armci_die("agg_save_strided_descr failed", 0L);
287
} while(num1D < total1D);
293
void armci_agg_complete(armci_ihdl_t nb_handle, int condition) {
296
/* get the buffer index for this handle */
297
for(i=ulist.size-1; i>=0; i--) {
298
index = ulist.index[i];
299
if(aggr[index]->tag == nb_handle->tag &&
300
aggr[index]->proc == nb_handle->proc)
303
if(i<0) return; /* implies this handle has no requests at all */
306
printf("%d: Aggregation Complete to remote process %d (%d:%d requests)\n",
307
armci_me, nb_handle->proc, index, aggr[index]->request_len);
310
/* complete the data transfer. NOTE: in LAPI, Non-blocking calls
311
(followed by wait) performs better than blocking put/get */
312
if(aggr[index]->request_len) {
313
switch(nb_handle->op) {
317
ARMCI_INIT_HANDLE(&usr_hdl);
318
if((rc=PARMCI_NbPutV(aggr[index]->darr, aggr[index]->request_len,
319
nb_handle->proc, (armci_hdl_t*)&usr_hdl)))
320
ARMCI_Error("armci_agg_complete: nbputv failed",rc);
321
PARMCI_Wait((armci_hdl_t*)&usr_hdl);
324
ARMCI_INIT_HANDLE(&usr_hdl);
325
if((rc=PARMCI_NbGetV(aggr[index]->darr, aggr[index]->request_len,
326
nb_handle->proc, (armci_hdl_t*)&usr_hdl)))
327
ARMCI_Error("armci_agg_complete: nbgetv failed",rc);
328
PARMCI_Wait((armci_hdl_t*)&usr_hdl);
332
if((rc=PARMCI_PutV(aggr[index]->darr, aggr[index]->request_len,
334
ARMCI_Error("armci_agg_complete: putv failed",rc);
337
if((rc=PARMCI_GetV(aggr[index]->darr, aggr[index]->request_len,
339
ARMCI_Error("armci_agg_complete: getv failed",rc);
345
/* setting request length to zero, as the requests are completed */
346
aggr[index]->request_len = 0;
347
aggr[index]->ptr_array_len = 0;
348
aggr[index]->buf_pos_end = _MAX_AGG_BUFSIZE;
350
/* If armci_agg_complete() is called PARMCI_Wait(), then unset nb_handle*/
351
if(condition==UNSET) {
352
nb_handle->proc = -1;
353
_armci_agg_update_lists(index);