5
/** $Id: aggregate.c,v 1.6 2003-10-22 22:12:14 d3h325 Exp $
6
* Aggregate Put/Get requests
10
#include <string.h> /* memcpy */
13
#define _MAX_AGG_BUFFERS 32 /* Maximum # of aggregation buffers available*/
14
#define _MAX_AGG_BUFSIZE 2048 /* size of each buffer. should be < 2^15 */
15
#define _MAX_PTRS 256 /* < 2^15, as it is "short int" in agg_req_t */
16
#define _MAX_AGG_HANDLE _MAX_AGG_BUFFERS /* Max # of aggregation handles */
18
/* aggregate request handle */
20
unsigned int tag; /* non-blocking request tag */
21
short int proc; /* remote process id */
22
short int request_len ; /* number of requests */
23
short int ptr_array_len; /* pointer length for this request */
24
short int buf_pos_end; /* position of buffer (from right end) */
25
armci_giov_t *darr; /* giov vectors */
27
static agg_req_t *aggr[_MAX_AGG_HANDLE]; /* aggregate request handle */
30
/* data structure for dynamic buffer management */
32
int size; /* represents the size of the list (not linked list) */
33
int index[_MAX_AGG_HANDLE];
35
static agg_list_t ulist, alist;/*in-use & available aggr buffer index list*/
38
/* aggregation buffer */
39
static char agg_buf[_MAX_AGG_BUFFERS][_MAX_AGG_BUFSIZE];
40
/* aggregation buffer to store the pointers */
41
static void* agg_src_ptr[_MAX_AGG_BUFFERS][_MAX_PTRS];
42
static void* agg_dst_ptr[_MAX_AGG_BUFFERS][_MAX_PTRS];
45
* ---------------------------------------------------------------------
46
* fill descriptor from this side (left to right)
48
* _______________________________________________
49
* | | | |. . . . . . . . . . | | | |
50
* |__|__|__|_____________________________|__|__|__|
53
* fill src and dst pointer (arrays) in this direction
56
* Once they are about to cross each other (implies buffer is full),
57
* complete the data transfer.
58
* ---------------------------------------------------------------------
61
#define AGG_INIT_NB_HANDLE(op_type, p, nb_handle) \
62
if(nb_handle->proc < 0) { \
63
nb_handle->tag = GET_NEXT_NBTAG(); \
64
nb_handle->op = op_type; \
65
nb_handle->proc = p; \
66
nb_handle->bufid= NB_NONE; \
68
else if(nb_handle->op != op_type) \
69
armci_die("ARMCI_NbXXX: AGG_INIT_NB_HANDLE(): Aggregate Failed, Invalid non-blocking handle", nb_handle->op); \
70
else if(nb_handle->proc != p) \
71
armci_die("ARMCI_NbXXX: AGG_INIT_NB_HANDLE(): Aggregate Failed, Invalid non-blocking handle", p)
74
/* initialize/set the fields in the buffer*/
75
#define _armci_agg_set_buffer(index, tag, proc, len) { \
76
aggr[(index)]->tag = (tag); \
77
aggr[(index)]->proc = (proc); \
78
aggr[(index)]->request_len = (len); \
79
ulist.index[ulist.size++] = (index);/* add the new index to the in-use list and increment it's size*/ \
82
/* get the index of the aggregation buffer to be used */
83
static int _armci_agg_get_bufferid(armci_ihdl_t nb_handle) {
84
int i, index, tag = nb_handle->tag, proc = nb_handle->proc;
86
/* check if there is an entry for this handle in the existing list*/
87
for(i=ulist.size-1; i>=0; i--) {
88
index = ulist.index[i];
89
if(aggr[index]->tag == tag && aggr[index]->proc == proc)
93
/* else it is a new handle, so get a aggr buffer from either
94
of the lists. ???? don't throw exception here */
95
if(ulist.size >= _MAX_AGG_BUFFERS && alist.size == 0)
96
armci_die("_armci_agg_get_index: Too many outstanding aggregation requests\n", ulist.size);
98
/*If there is a buffer in readily available list,use it*/
99
if(alist.size > 0) index = alist.index[--alist.size];
100
else { /* else use/get a buffer from the main list */
103
/* allocate memory for aggregate request handle */
104
aggr[index] = (agg_req_t *)agg_buf[index];
106
aggr[index]->request_len = 0;
107
aggr[index]->ptr_array_len = 0;
108
aggr[index]->buf_pos_end = _MAX_AGG_BUFSIZE;
110
/* allocate memory for giov vector field in aggregate request handler */
111
aggr[index]->darr = (armci_giov_t *)(agg_buf[index]+sizeof(agg_req_t));
114
_armci_agg_set_buffer(index, tag, proc, 0);
118
static void _armci_agg_update_lists(int index) {
120
/* remove that index from the in-use list and bring the last element
121
in the in-use list to the position of the removed one. */
122
for(i=0; i<ulist.size; i++)
123
if(ulist.index[i] == index) {
124
ulist.index[i] = ulist.index[ulist.size-1];
129
/* and add the removed index to the available list and increment */
130
alist.index[alist.size++] = index;
134
/* replace with macro later */
136
_armci_agg_get_descriptor(int *ptr_array_len,int bytes,armci_ihdl_t nb_handle,
137
int is_registered_put, void **registered_put_data) {
139
short unsigned int get_new_descr=0, bytes_needed=0, rid;
141
int index = _armci_agg_get_bufferid(nb_handle);
143
rid = aggr[index]->request_len; /* index of giov descriptor */
144
bytes_remaining = aggr[index]->buf_pos_end -
145
(sizeof(agg_req_t) + aggr[index]->request_len*sizeof(armci_giov_t));
147
/* extra bytes required to store registered put data */
148
if(is_registered_put) bytes_needed = bytes;
150
/* if (byte-)sizes are equal, use previously created descriptor
151
else get a new descriptor */
152
if( rid && bytes==aggr[index]->darr[rid-1].bytes) --rid;
153
else { get_new_descr=1; bytes_needed += sizeof(armci_giov_t); }
155
/* If buffer is full, then complete data transfer. After completion,
156
if still ptr array_len is greater than maximum limit(_MAX_PTRS),
157
then do it by parts. Determine new ptr_array_len that fits buffer */
158
if( (bytes_needed > bytes_remaining) ||
159
(_MAX_PTRS - aggr[index]->ptr_array_len < *ptr_array_len)) {
160
armci_agg_complete(nb_handle, SET);
161
rid = 0; get_new_descr=1;
162
if(*ptr_array_len > _MAX_PTRS) *ptr_array_len = _MAX_PTRS;
165
/* if new descriptor, allocate memory for src_ptr & dst_ptr arrays */
167
int i = aggr[index]->ptr_array_len;
168
aggr[index]->darr[rid].src_ptr_array = (void **)&agg_src_ptr[index][i];
169
aggr[index]->darr[rid].dst_ptr_array = (void **)&agg_dst_ptr[index][i];
170
aggr[index]->darr[rid].ptr_array_len = 0;
171
aggr[index]->request_len++;
174
/* store registered put data */
175
if(is_registered_put) {
176
aggr[index]->buf_pos_end -= bytes;
177
memcpy(&((char *)aggr[index])[aggr[index]->buf_pos_end],
178
*((char **)registered_put_data), bytes);
179
*(char **)registered_put_data = (char *)&((char *)aggr[index])[aggr[index]->buf_pos_end];
182
aggr[index]->ptr_array_len += *ptr_array_len;
183
return (&aggr[index]->darr[rid]);
186
int armci_agg_save_descriptor(void *src, void *dst, int bytes, int proc, int op,
187
int is_registered_put, armci_ihdl_t nb_handle) {
192
/* set up the handle if it is a new aggregation request */
193
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
195
darr = _armci_agg_get_descriptor(&one, bytes, nb_handle,
196
is_registered_put, &src);
197
idx = darr->ptr_array_len;
199
darr->src_ptr_array[idx] = src;
200
darr->dst_ptr_array[idx] = dst;
202
darr->ptr_array_len += 1;
209
int armci_agg_save_giov_descriptor(armci_giov_t dscr[], int len, int proc,
210
int op, armci_ihdl_t nb_handle) {
211
int i, j, k, idx, bytes, ptr_array_len;
214
/* set up the handle if it is a new aggregation request */
215
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
217
for(i=0; i<len; i++) {
219
bytes = dscr[i].bytes;
220
ptr_array_len = dscr[i].ptr_array_len;
222
darr=_armci_agg_get_descriptor(&ptr_array_len,bytes,nb_handle,0,0);
223
idx = darr->ptr_array_len;
225
for(j=idx; j<idx+ptr_array_len; j++, k++) {
226
darr->src_ptr_array[j] = dscr[i].src_ptr_array[k];
227
darr->dst_ptr_array[j] = dscr[i].dst_ptr_array[k];
229
darr->bytes = dscr[i].bytes;
230
darr->ptr_array_len += ptr_array_len;
232
ptr_array_len = dscr[i].ptr_array_len - ptr_array_len;
233
if(ptr_array_len <0) armci_die("agg_save_giov_descr failed", 0L);
234
} while(k < darr[i].ptr_array_len);
239
int armci_agg_save_strided_descriptor(void *src_ptr, int src_stride_arr[],
240
void* dst_ptr, int dst_stride_arr[],
241
int count[], int stride_levels, int proc,
242
int op, armci_ihdl_t nb_handle) {
244
int i, j, k, idx, ptr_array_len=1, total1D=1, num1D=0;
245
int offset1, offset2, factor[MAX_STRIDE_LEVEL];
248
/* set up the handle if it is a new aggregation request */
249
AGG_INIT_NB_HANDLE(op, proc, nb_handle);
251
for(i=1; i<=stride_levels; i++) {
255
ptr_array_len = total1D;
258
darr=_armci_agg_get_descriptor(&ptr_array_len,count[0],nb_handle,0,0);
259
idx = darr->ptr_array_len;
261
/* converting stride into giov vector */
262
for(i=idx; i<idx+ptr_array_len; i++) {
263
for(j=0, offset1=0, offset2=0; j<stride_levels; j++) {
264
offset1 += src_stride_arr[j]*factor[j];
265
offset2 += dst_stride_arr[j]*factor[j];
267
darr->src_ptr_array[i] = (char *)src_ptr + offset1;
268
darr->dst_ptr_array[i] = (char *)dst_ptr + offset2;
271
for(j=1; j<stride_levels; j++)
272
if(num1D%count[j]==0) {
274
for(k=0; k<j;k++) factor[k]=0;
278
darr->bytes = count[0];
279
darr->ptr_array_len += ptr_array_len;
280
ptr_array_len = total1D - ptr_array_len;
281
if(ptr_array_len <0) armci_die("agg_save_strided_descr failed", 0L);
282
} while(num1D < total1D);
288
void armci_agg_complete(armci_ihdl_t nb_handle, int condition) {
291
/* get the buffer index for this handle */
292
for(i=ulist.size-1; i>=0; i--) {
293
index = ulist.index[i];
294
if(aggr[index]->tag == nb_handle->tag &&
295
aggr[index]->proc == nb_handle->proc)
298
if(i<0) return; /* implies this handle has no requests at all */
301
printf("%d: Aggregation Complete to remote process %d (%d:%d requests)\n",
302
armci_me, nb_handle->proc, index, aggr[index]->request_len);
305
/* complete the data transfer. NOTE: in LAPI, Non-blocking calls
306
(followed by wait) performs better than blocking put/get */
307
if(aggr[index]->request_len) {
308
switch(nb_handle->op) {
312
ARMCI_INIT_HANDLE(&usr_hdl);
313
if((rc=PARMCI_NbPutV(aggr[index]->darr, aggr[index]->request_len,
314
nb_handle->proc, (armci_hdl_t*)&usr_hdl)))
315
ARMCI_Error("armci_agg_complete: nbputv failed",rc);
316
PARMCI_Wait((armci_hdl_t*)&usr_hdl);
319
ARMCI_INIT_HANDLE(&usr_hdl);
320
if((rc=PARMCI_NbGetV(aggr[index]->darr, aggr[index]->request_len,
321
nb_handle->proc, (armci_hdl_t*)&usr_hdl)))
322
ARMCI_Error("armci_agg_complete: nbgetv failed",rc);
323
PARMCI_Wait((armci_hdl_t*)&usr_hdl);
327
if((rc=PARMCI_PutV(aggr[index]->darr, aggr[index]->request_len,
329
ARMCI_Error("armci_agg_complete: putv failed",rc);
332
if((rc=PARMCI_GetV(aggr[index]->darr, aggr[index]->request_len,
334
ARMCI_Error("armci_agg_complete: getv failed",rc);
340
/* setting request length to zero, as the requests are completed */
341
aggr[index]->request_len = 0;
342
aggr[index]->ptr_array_len = 0;
343
aggr[index]->buf_pos_end = _MAX_AGG_BUFSIZE;
345
/* If armci_agg_complete() is called PARMCI_Wait(), then unset nb_handle*/
346
if(condition==UNSET) {
347
nb_handle->proc = -1;
348
_armci_agg_update_lists(index);