5
/* begin_generated_IBM_copyright_prolog */
7
/* ---------------------------------------------------------------- */
8
/* (C)Copyright IBM Corp. 2007, 2008 */
10
/* ---------------------------------------------------------------- */
12
/* end_generated_IBM_copyright_prolog */
14
* \file armci/src/armcix/dcmf/armcix_acc.c
15
* \brief DCMF ARMCI Extension for accumulate operations.
18
#include "armcix_impl.h"
30
typedef union ARMCIX_DCMF_AccInfo_t
49
ARMCIX_DCMF_AccInfo_t __attribute__ ((__aligned__ (16)));
51
typedef struct ARMCIX_DCMF_AccNbInfo_t
53
ARMCIX_DCMF_AccInfo_t info;
54
ARMCIX_DCMF_Connection_t * connection;
57
ARMCIX_DCMF_AccNbInfo_t;
59
#define ACCUMULATE( DTYPE, scale, elems, src, dst) {\
61
DTYPE *a =(DTYPE *)(dst);\
62
DTYPE *b =(DTYPE *)(src);\
63
DTYPE alpha = *(DTYPE *)(scale);\
64
for(j=0;j<(elems);j++)a[j] += alpha*b[j];\
67
#define CPL_ACCUMULATE( DTYPE, scale, elems, src, dst) {\
69
DTYPE *a =(DTYPE *)(dst);\
70
DTYPE *b =(DTYPE *)(src);\
71
DTYPE alpha = *(DTYPE *)(scale);\
72
for(j=0;j<(elems);j++){\
73
a[j].real += alpha.real*b[j].real - alpha.imag*b[j].imag;\
74
a[j].imag += alpha.imag*b[j].real + alpha.real*b[j].imag;\
78
DCMF_Protocol_t __acc_protocol;
81
* \brief DCMF ARMCI Extention receive short accumulate operation callback
83
* \see DCMF_RecvSendShort
85
void ARMCIX_DCMF_RecvAcc1 (void * clientdata,
86
const DCQuad * msginfo,
92
//ARMCIX_DCMF_Connection_t * connection = (ARMCIX_DCMF_Connection_t *) clientdata;
93
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) msginfo;
95
switch (info->datatype)
98
ACCUMULATE( int, &info->ival, bytes/sizeof(int), src, info->dst);
101
ACCUMULATE( double, &info->dval, bytes/sizeof(double), src, info->dst);
104
ACCUMULATE( float, &info->fval, bytes/sizeof(float), src, info->dst);
107
CPL_ACCUMULATE( complex_t, &info->cplxval, bytes/sizeof(complex_t), src, info->dst);
110
CPL_ACCUMULATE( dcomplex_t, &info->dcplxval, bytes/sizeof(dcomplex_t), src, info->dst);
113
ACCUMULATE( long, &info->lval, bytes/sizeof(long), src, info->dst);
122
void ARMCIX_DCMF_AccReceiveComplete (ARMCIX_DCMF_AccNbInfo_t * nbinfo)
124
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &nbinfo->info;
126
switch (info->datatype)
129
ACCUMULATE( int, &info->ival, info->bytes/sizeof(int), nbinfo->buffer, info->dst);
132
ACCUMULATE( double, &info->dval, info->bytes/sizeof(double), nbinfo->buffer, info->dst);
135
ACCUMULATE( float, &info->fval, info->bytes/sizeof(float), nbinfo->buffer, info->dst);
138
CPL_ACCUMULATE( complex_t, &info->cplxval, info->bytes/sizeof(complex_t), nbinfo->buffer, info->dst);
141
CPL_ACCUMULATE( dcomplex_t, &info->dcplxval, info->bytes/sizeof(dcomplex_t), nbinfo->buffer, info->dst);
144
ACCUMULATE( long, &info->lval, info->bytes/sizeof(long), nbinfo->buffer, info->dst);
156
* \brief DCMF ARMCI Extention receive accumulate operation callback
160
DCMF_Request_t * ARMCIX_DCMF_RecvAcc2 (void * clientdata,
161
const DCQuad * msginfo,
167
DCMF_Callback_t * cb_done)
169
ARMCIX_DCMF_Connection_t * connection = (ARMCIX_DCMF_Connection_t *) clientdata;
171
ARMCIX_DCMF_AccNbInfo_t * nbinfo = (ARMCIX_DCMF_AccNbInfo_t *) malloc (sizeof(ARMCIX_DCMF_AccNbInfo_t) + sndlen);
173
memcpy (&nbinfo->info, msginfo, sizeof(ARMCIX_DCMF_AccInfo_t));
174
nbinfo->connection = &connection[peer];
175
nbinfo->buffer = (void *)(((char *) nbinfo) + sizeof(ARMCIX_DCMF_AccNbInfo_t));
178
*rcvbuf = nbinfo->buffer;
180
cb_done->function = (void *) ARMCIX_DCMF_AccReceiveComplete;
181
cb_done->clientdata = (void *) nbinfo;
183
return &connection[peer].request;
188
* \brief Register the DCMF ARMCI Extention accumulate operation.
190
* \param[in] connection_array Connection array
192
* \see DCMF_Send_register
194
void ARMCIX_DCMF_Acc_register (ARMCIX_DCMF_Connection_t * connection_array)
196
DCMF_CriticalSection_enter (0);
198
DCMF_Send_Configuration_t configuration = {
199
DCMF_DEFAULT_SEND_PROTOCOL,
201
ARMCIX_DCMF_RecvAcc1,
203
ARMCIX_DCMF_RecvAcc2,
206
DCMF_Send_register (&__acc_protocol, &configuration);
208
DCMF_CriticalSection_exit (0);
212
* \brief ARMCI Extension blocking accumulate operation.
214
* \param[in] datatype accumulate datatype (operation code)
215
* \param[in] scale opaque pointer to the scaling factor for accumulate
216
* \param[in] src Source buffer on the local node
217
* \param[in] dst Destination buffer on the remote node
218
* \param[in] bytes Number of bytes to transfer
219
* \param[in] proc Remote node rank
223
int ARMCIX_Acc (int datatype, void * scale, void * src, void * dst, int bytes, int proc)
225
DCMF_CriticalSection_enter (0);
227
volatile unsigned active = 1;
228
DCMF_Callback_t cb_wait = { ARMCIX_DCMF_cb_decrement, (void *)&active };
229
DCMF_Request_t request;
231
ARMCIX_DCMF_AccInfo_t info;
234
info.datatype = datatype;
238
info.ival = *((int *)scale);
241
info.dval = *((double *)scale);
244
info.fval = *((float *)scale);
247
info.cplxval.real = ((complex_t *)scale)->real;
248
info.cplxval.imag = ((complex_t *)scale)->imag;
251
info.dcplxval.real = ((dcomplex_t *)scale)->real;
252
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
255
info.lval = *((long *)scale);
262
DCMF_Send ( &__acc_protocol,
265
DCMF_SEQUENTIAL_CONSISTENCY,
272
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
275
while (active) DCMF_Messager_advance ();
278
DCMF_CriticalSection_exit (0);
285
* \brief ARMCI Extension non-blocking accumulate operation.
287
* \param[in] datatype accumulate datatype (operation code)
288
* \param[in] scale opaque pointer to the scaling factor for accumulate
289
* \param[in] src Source buffer on the local node
290
* \param[in] dst Destination buffer on the remote node
291
* \param[in] bytes Number of bytes to transfer
292
* \param[in] proc Remote node rank
293
* \param[in] nb_handle ARMCI non-blocking handle
297
int ARMCIX_NbAcc (int datatype, void * scale, void * src, void * dst, int bytes, int proc, armci_ihdl_t nb_handle)
299
DCMF_CriticalSection_enter (0);
301
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
303
dcmf->connection = &__connection[proc];
305
__connection[proc].active++;
306
__global_connection.active++;
308
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
309
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
310
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
312
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
315
info->datatype = datatype;
319
info->ival = *((int *)scale);
322
info->dval = *((double *)scale);
325
info->fval = *((float *)scale);
328
info->cplxval.real = ((complex_t *)scale)->real;
329
info->cplxval.imag = ((complex_t *)scale)->imag;
332
info->dcplxval.real = ((dcomplex_t *)scale)->real;
333
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
336
info->lval = *((long *)scale);
343
DCMF_Send ( &__acc_protocol,
344
&(new_request->request),
346
DCMF_SEQUENTIAL_CONSISTENCY,
353
DCMF_CriticalSection_exit (0);
360
* \brief ARMCI Extension blocking vector accumulate operation.
362
* \todo something goofy with AccV .. should be able to replace with combination of
363
* ARMCIX_AccV and ARMCIX_Wait, but that causes test-ibm.x to hang. Maybe
364
* related to interrupts?
366
* \param[in] datatype accumulate datatype (operation code)
367
* \param[in] scale opaque pointer to the scaling factor for accumulate
368
* \param[in] darr descriptor array
369
* \param[in] len length of the descriptor array
370
* \param[in] proc process(or) ID
372
int ARMCIX_AccV (int datatype, void * scale, armci_giov_t * darr, int len, int proc)
375
#error causes test-ibm.x to hang!
376
armci_ireq_t nb_request;
377
armci_ihdl_t nb_handle = (armci_ihdl_t) &nb_request;
378
ARMCIX_NbAccV (datatype, scale, darr, len, proc, nb_handle);
379
#warning remove this ARMCIX_Fence() and implement some sort of ack scheme.
381
ARMCIX_Wait (&nb_handle->cmpl_info);
383
DCMF_CriticalSection_enter (0);
385
// Calculate the number of requests
388
for (i = 0; i < len; i++)
389
for (j = 0; j < darr[i].ptr_array_len; j++)
392
volatile unsigned active = n;
393
DCMF_Callback_t cb_wait = { ARMCIX_DCMF_cb_decrement, (void *)&active };
394
DCMF_Request_t request[n];
396
ARMCIX_DCMF_AccInfo_t info;
397
info.datatype = datatype;
401
info.ival = *((int *)scale);
404
info.dval = *((double *)scale);
407
info.fval = *((float *)scale);
410
info.cplxval.real = ((complex_t *)scale)->real;
411
info.cplxval.imag = ((complex_t *)scale)->imag;
414
info.dcplxval.real = ((dcomplex_t *)scale)->real;
415
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
418
info.lval = *((long *)scale);
425
for (i = 0; i < len; i++)
427
info.bytes = darr[i].bytes;
428
for (j = 0; j < darr[i].ptr_array_len; j++)
430
info.dst = darr[i].dst_ptr_array[j];
431
DCMF_Send ( &__acc_protocol,
434
DCMF_SEQUENTIAL_CONSISTENCY,
437
(char *) darr[i].src_ptr_array[j],
443
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
446
// Poll until all accumulate messages have been sent.
447
while (active) DCMF_Messager_advance ();
450
DCMF_CriticalSection_exit (0);
457
* \brief ARMCI Extension non-blocking vector accumulate operation.
459
* \param[in] datatype accumulate datatype (operation code)
460
* \param[in] scale opaque pointer to the scaling factor for accumulate
461
* \param[in] darr Descriptor array
462
* \param[in] len Length of descriptor array
463
* \param[in] proc Remote process(or) ID
464
* \param[in] nb_handle ARMCI non-blocking handle
468
int ARMCIX_NbAccV (int datatype, void * scale, armci_giov_t * darr, int len, int proc, armci_ihdl_t nb_handle)
470
DCMF_CriticalSection_enter (0);
472
// Calculate the number of requests
475
for (i = 0; i < len; i++)
476
for (j = 0; j < darr[i].ptr_array_len; j++)
479
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
480
dcmf->connection = &__connection[proc];
483
__connection[proc].active += n;
484
__global_connection.active += n;
486
ARMCIX_DCMF_AccInfo_t info;
487
info.datatype = datatype;
491
info.ival = *((int *)scale);
494
info.dval = *((double *)scale);
497
info.fval = *((float *)scale);
500
info.cplxval.real = ((complex_t *)scale)->real;
501
info.cplxval.imag = ((complex_t *)scale)->imag;
504
info.dcplxval.real = ((dcomplex_t *)scale)->real;
505
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
508
info.lval = *((long *)scale);
516
for (i = 0; i < len; i++)
518
//info.bytes = darr[i].bytes;
519
for (j = 0; j < darr[i].ptr_array_len; j++)
521
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
522
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
523
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
525
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
527
//info->bytes = bytes;
528
info->datatype = datatype;
529
info->bytes = darr[i].bytes;
533
info->ival = *((int *)scale);
536
info->dval = *((double *)scale);
539
info->fval = *((float *)scale);
542
info->cplxval.real = ((complex_t *)scale)->real;
543
info->cplxval.imag = ((complex_t *)scale)->imag;
546
info->dcplxval.real = ((dcomplex_t *)scale)->real;
547
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
550
info->lval = *((long *)scale);
557
info->dst = darr[i].dst_ptr_array[j];
558
DCMF_Send ( &__acc_protocol,
559
&(new_request->request),
561
DCMF_SEQUENTIAL_CONSISTENCY,
564
(char *) darr[i].src_ptr_array[j],
570
DCMF_CriticalSection_exit (0);
578
unsigned ARMCIX_DCMF_AccS_recurse (int datatype, void * scale,
579
void * src_ptr, int * src_stride_arr,
580
void * dst_ptr, int * dst_stride_arr,
581
int * seg_count, int stride_levels, int proc,
582
armci_ihdl_t nb_handle)
584
unsigned num_requests = 0;
586
//fprintf (stderr, "ARMCIX_DCMF_AccS_recurse() >> \n");
588
if (stride_levels == 0)
590
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
591
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
592
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
594
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
596
//info->bytes = bytes;
597
info->datatype = datatype;
601
info->ival = *((int *)scale);
604
info->dval = *((double *)scale);
607
info->fval = *((float *)scale);
610
info->cplxval.real = ((complex_t *)scale)->real;
611
info->cplxval.imag = ((complex_t *)scale)->imag;
614
info->dcplxval.real = ((dcomplex_t *)scale)->real;
615
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
618
info->lval = *((long *)scale);
625
ARMCIX_DCMF_AccInfo_t info;
626
info.datatype = datatype;
630
info.ival = *((int *)scale);
633
info.dval = *((double *)scale);
636
info.fval = *((float *)scale);
639
info.cplxval.real = ((complex_t *)scale)->real;
640
info.cplxval.imag = ((complex_t *)scale)->imag;
643
info.dcplxval.real = ((dcomplex_t *)scale)->real;
644
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
647
info.lval = *((long *)scale);
654
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
655
DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
656
DCMF_Callback_t cb_done = { (void(*)(void *)) ARMCIX_DCMF_request_free, new_request };
658
info->bytes = seg_count[0];
661
DCMF_Send ( &__acc_protocol,
662
&(new_request->request),
664
DCMF_SEQUENTIAL_CONSISTENCY,
675
char * src_tmp = (char *) src_ptr;
676
char * dst_tmp = (char *) dst_ptr;
678
for (i = 0; i < seg_count[stride_levels]; i++)
680
num_requests += ARMCIX_DCMF_AccS_recurse (datatype, scale,
681
src_tmp, src_stride_arr,
682
dst_tmp, dst_stride_arr,
683
seg_count, (stride_levels-1), proc,
686
src_tmp += src_stride_arr[(stride_levels-1)];
687
dst_tmp += dst_stride_arr[(stride_levels-1)];
691
//fprintf (stderr, "ARMCIX_DCMF_AccS_recurse() << num_requests = %d\n", num_requests);
698
* \brief ARMCI Extension blocking strided accumulate operation.
700
* \param[in] datatype accumulate datatype (operation code)
701
* \param[in] scale opaque pointer to the scaling factor for accumulate
702
* \param[in] src_ptr pointer to 1st segment at source
703
* \param[in] src_stride_arr array of strides at source
704
* \param[in] dst_ptr pointer to 1st segment at destination
705
* \param[in] dst_stride_arr array of strides at destination
706
* \param[in] seg_count number of segments at each stride levels: count[0]=bytes
707
* \param[in] stride_levels number of stride levels
708
* \param[in] proc remote process(or) ID
712
int ARMCIX_AccS (int datatype, void * scale,
713
void * src_ptr, int * src_stride_arr,
714
void * dst_ptr, int * dst_stride_arr,
715
int * seg_count, int stride_levels, int proc)
718
#error causes test-ibm.x to hang!
719
armci_ireq_t nb_request;
720
armci_ihdl_t nb_handle = (armci_ihdl_t) &nb_request;
721
ARMCIX_NbAccS (datatype, scale,
722
src_ptr, src_stride_arr,
723
dst_ptr, dst_stride_arr,
724
seg_count, stride_levels, proc,
726
#warning remove this ARMCIX_Fence() and implement some sort of ack scheme.
728
ARMCIX_Wait (&nb_handle->cmpl_info);
730
DCMF_CriticalSection_enter (0);
732
//fprintf (stderr, "ARMCIX_AccS() >> \n");
733
//fprintf (stderr, "ARMCIX_AccS() -- __connection[%d].sequence.origin=%d, __connection[%d].active=%d, __global_connection.active=%d\n", proc, __connection[proc].sequence.origin, proc, __connection[proc].active, __global_connection.active);
735
// Calculate the number of requests
738
for (i = 0; i < stride_levels; i++) n = n * seg_count[i+1];
740
armci_ireq_t nb_handle;
741
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle.cmpl_info;
742
dcmf->connection = &__connection[proc];
745
__connection[proc].active += n;
746
__global_connection.active += n;
749
count = ARMCIX_DCMF_AccS_recurse (datatype, scale,
750
src_ptr, src_stride_arr,
751
dst_ptr, dst_stride_arr,
752
seg_count, stride_levels, proc,
753
(armci_ihdl_t) &nb_handle);
755
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
759
while (dcmf->active) DCMF_Messager_advance ();
762
//fprintf (stderr, "ARMCIX_AccS() << \n");
764
DCMF_CriticalSection_exit (0);
770
* \brief ARMCI Extension non-blocking strided accumulate operation.
772
* \param[in] datatype accumulate datatype (operation code)
773
* \param[in] scale opaque pointer to the scaling factor for accumulate
774
* \param[in] src_ptr pointer to 1st segment at source
775
* \param[in] src_stride_arr array of strides at source
776
* \param[in] dst_ptr pointer to 1st segment at destination
777
* \param[in] dst_stride_arr array of strides at destination
778
* \param[in] seg_count number of segments at each stride levels: count[0]=bytes
779
* \param[in] stride_levels number of stride levels
780
* \param[in] proc remote process(or) ID
781
* \param[in] nb_handle ARMCI non-blocking handle
785
int ARMCIX_NbAccS (int datatype, void * scale,
786
void * src_ptr, int * src_stride_arr,
787
void * dst_ptr, int * dst_stride_arr,
788
int * seg_count, int stride_levels, int proc,
789
armci_ihdl_t nb_handle)
791
DCMF_CriticalSection_enter (0);
793
// Calculate the number of requests
796
for (i = 0; i < stride_levels; i++) n = n * seg_count[i+1];
798
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
799
dcmf->connection = &__connection[proc];
802
__connection[proc].active += n;
803
__global_connection.active += n;
806
count = ARMCIX_DCMF_AccS_recurse (datatype, scale,
807
src_ptr, src_stride_arr,
808
dst_ptr, dst_stride_arr,
809
seg_count, stride_levels, proc,
814
DCMF_CriticalSection_exit (0);