4
/* begin_generated_IBM_copyright_prolog */
6
/* ---------------------------------------------------------------- */
7
/* (C)Copyright IBM Corp. 2007, 2008 */
9
/* ---------------------------------------------------------------- */
11
/* end_generated_IBM_copyright_prolog */
13
* \file armci/src/armcix/dcmf/armcix_acc.c
14
* \brief DCMF ARMCI Extension for accumulate operations.
17
#include "armcix_impl.h"
29
typedef union ARMCIX_DCMF_AccInfo_t
48
ARMCIX_DCMF_AccInfo_t __attribute__ ((__aligned__ (16)));
50
typedef struct ARMCIX_DCMF_AccNbInfo_t
52
ARMCIX_DCMF_AccInfo_t info;
53
ARMCIX_DCMF_Connection_t * connection;
56
ARMCIX_DCMF_AccNbInfo_t;
58
#define ACCUMULATE( DTYPE, scale, elems, src, dst) {\
60
DTYPE *a =(DTYPE *)(dst);\
61
DTYPE *b =(DTYPE *)(src);\
62
DTYPE alpha = *(DTYPE *)(scale);\
63
for(j=0;j<(elems);j++)a[j] += alpha*b[j];\
66
#define CPL_ACCUMULATE( DTYPE, scale, elems, src, dst) {\
68
DTYPE *a =(DTYPE *)(dst);\
69
DTYPE *b =(DTYPE *)(src);\
70
DTYPE alpha = *(DTYPE *)(scale);\
71
for(j=0;j<(elems);j++){\
72
a[j].real += alpha.real*b[j].real - alpha.imag*b[j].imag;\
73
a[j].imag += alpha.imag*b[j].real + alpha.real*b[j].imag;\
77
DCMF_Protocol_t __acc_protocol;
80
* \brief DCMF ARMCI Extention receive short accumulate operation callback
82
* \see DCMF_RecvSendShort
84
void ARMCIX_DCMF_RecvAcc1 (void * clientdata,
85
const DCQuad * msginfo,
91
//ARMCIX_DCMF_Connection_t * connection = (ARMCIX_DCMF_Connection_t *) clientdata;
92
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) msginfo;
94
switch (info->datatype)
97
ACCUMULATE( int, &info->ival, bytes/sizeof(int), src, info->dst);
100
ACCUMULATE( double, &info->dval, bytes/sizeof(double), src, info->dst);
103
ACCUMULATE( float, &info->fval, bytes/sizeof(float), src, info->dst);
106
CPL_ACCUMULATE( complex_t, &info->cplxval, bytes/sizeof(complex_t), src, info->dst);
109
CPL_ACCUMULATE( dcomplex_t, &info->dcplxval, bytes/sizeof(dcomplex_t), src, info->dst);
112
ACCUMULATE( long, &info->lval, bytes/sizeof(long), src, info->dst);
121
void ARMCIX_DCMF_AccReceiveComplete (ARMCIX_DCMF_AccNbInfo_t * nbinfo)
123
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &nbinfo->info;
125
switch (info->datatype)
128
ACCUMULATE( int, &info->ival, info->bytes/sizeof(int), nbinfo->buffer, info->dst);
131
ACCUMULATE( double, &info->dval, info->bytes/sizeof(double), nbinfo->buffer, info->dst);
134
ACCUMULATE( float, &info->fval, info->bytes/sizeof(float), nbinfo->buffer, info->dst);
137
CPL_ACCUMULATE( complex_t, &info->cplxval, info->bytes/sizeof(complex_t), nbinfo->buffer, info->dst);
140
CPL_ACCUMULATE( dcomplex_t, &info->dcplxval, info->bytes/sizeof(dcomplex_t), nbinfo->buffer, info->dst);
143
ACCUMULATE( long, &info->lval, info->bytes/sizeof(long), nbinfo->buffer, info->dst);
155
* \brief DCMF ARMCI Extention receive accumulate operation callback
159
DCMF_Request_t * ARMCIX_DCMF_RecvAcc2 (void * clientdata,
160
const DCQuad * msginfo,
166
DCMF_Callback_t * cb_done)
168
ARMCIX_DCMF_Connection_t * connection = (ARMCIX_DCMF_Connection_t *) clientdata;
170
ARMCIX_DCMF_AccNbInfo_t * nbinfo = (ARMCIX_DCMF_AccNbInfo_t *) malloc (sizeof(ARMCIX_DCMF_AccNbInfo_t) + sndlen);
172
memcpy (&nbinfo->info, msginfo, sizeof(ARMCIX_DCMF_AccInfo_t));
173
nbinfo->connection = &connection[peer];
174
nbinfo->buffer = (void *)(((char *) nbinfo) + sizeof(ARMCIX_DCMF_AccNbInfo_t));
177
*rcvbuf = nbinfo->buffer;
179
cb_done->function = (void *) ARMCIX_DCMF_AccReceiveComplete;
180
cb_done->clientdata = (void *) nbinfo;
182
return &connection[peer].request;
187
* \brief Register the DCMF ARMCI Extention accumulate operation.
189
* \param[in] connection_array Connection array
191
* \see DCMF_Send_register
193
void ARMCIX_DCMF_Acc_register (ARMCIX_DCMF_Connection_t * connection_array)
195
DCMF_CriticalSection_enter (0);
197
DCMF_Send_Configuration_t configuration = {
198
DCMF_DEFAULT_SEND_PROTOCOL,
199
DCMF_DEFAULT_NETWORK,
200
ARMCIX_DCMF_RecvAcc1,
202
ARMCIX_DCMF_RecvAcc2,
205
DCMF_Send_register (&__acc_protocol, &configuration);
207
DCMF_CriticalSection_exit (0);
211
* \brief ARMCI Extension blocking accumulate operation.
213
* \param[in] datatype accumulate datatype (operation code)
214
* \param[in] scale opaque pointer to the scaling factor for accumulate
215
* \param[in] src Source buffer on the local node
216
* \param[in] dst Destination buffer on the remote node
217
* \param[in] bytes Number of bytes to transfer
218
* \param[in] proc Remote node rank
222
int ARMCIX_Acc (int datatype, void * scale, void * src, void * dst, int bytes, int proc)
224
DCMF_CriticalSection_enter (0);
226
volatile unsigned active = 1;
227
DCMF_Callback_t cb_wait = { ARMCIX_DCMF_cb_decrement, (void *)&active };
228
DCMF_Request_t request;
230
ARMCIX_DCMF_AccInfo_t info;
233
info.datatype = datatype;
237
info.ival = *((int *)scale);
240
info.dval = *((double *)scale);
243
info.fval = *((float *)scale);
246
info.cplxval.real = ((complex_t *)scale)->real;
247
info.cplxval.imag = ((complex_t *)scale)->imag;
250
info.dcplxval.real = ((dcomplex_t *)scale)->real;
251
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
254
info.lval = *((long *)scale);
261
DCMF_Send ( &__acc_protocol,
264
DCMF_SEQUENTIAL_CONSISTENCY,
271
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
274
while (active) DCMF_Messager_advance ();
277
DCMF_CriticalSection_exit (0);
284
* \brief ARMCI Extension non-blocking accumulate operation.
286
* \param[in] datatype accumulate datatype (operation code)
287
* \param[in] scale opaque pointer to the scaling factor for accumulate
288
* \param[in] src Source buffer on the local node
289
* \param[in] dst Destination buffer on the remote node
290
* \param[in] bytes Number of bytes to transfer
291
* \param[in] proc Remote node rank
292
* \param[in] nb_handle ARMCI non-blocking handle
296
int ARMCIX_NbAcc (int datatype, void * scale, void * src, void * dst, int bytes, int proc, armci_ihdl_t nb_handle)
298
DCMF_CriticalSection_enter (0);
300
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
302
dcmf->connection = &__connection[proc];
304
__connection[proc].active++;
305
__global_connection.active++;
307
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
308
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
309
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
311
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
314
info->datatype = datatype;
318
info->ival = *((int *)scale);
321
info->dval = *((double *)scale);
324
info->fval = *((float *)scale);
327
info->cplxval.real = ((complex_t *)scale)->real;
328
info->cplxval.imag = ((complex_t *)scale)->imag;
331
info->dcplxval.real = ((dcomplex_t *)scale)->real;
332
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
335
info->lval = *((long *)scale);
342
DCMF_Send ( &__acc_protocol,
343
&(new_request->request),
345
DCMF_SEQUENTIAL_CONSISTENCY,
352
DCMF_CriticalSection_exit (0);
359
* \brief ARMCI Extension blocking vector accumulate operation.
361
* \todo something goofy with AccV .. should be able to replace with combination of
362
* ARMCIX_AccV and ARMCIX_Wait, but that causes test-ibm.x to hang. Maybe
363
* related to interrupts?
365
* \param[in] datatype accumulate datatype (operation code)
366
* \param[in] scale opaque pointer to the scaling factor for accumulate
367
* \param[in] darr descriptor array
368
* \param[in] len length of the descriptor array
369
* \param[in] proc process(or) ID
371
int ARMCIX_AccV (int datatype, void * scale, armci_giov_t * darr, int len, int proc)
374
#error causes test-ibm.x to hang!
375
armci_ireq_t nb_request;
376
armci_ihdl_t nb_handle = (armci_ihdl_t) &nb_request;
377
ARMCIX_NbAccV (datatype, scale, darr, len, proc, nb_handle);
378
#warning remove this ARMCIX_Fence() and implement some sort of ack scheme.
380
ARMCIX_Wait (&nb_handle->cmpl_info);
382
DCMF_CriticalSection_enter (0);
384
// Calculate the number of requests
387
for (i = 0; i < len; i++)
388
for (j = 0; j < darr[i].ptr_array_len; j++)
391
volatile unsigned active = n;
392
DCMF_Callback_t cb_wait = { ARMCIX_DCMF_cb_decrement, (void *)&active };
393
DCMF_Request_t request[n];
395
ARMCIX_DCMF_AccInfo_t info;
396
info.datatype = datatype;
400
info.ival = *((int *)scale);
403
info.dval = *((double *)scale);
406
info.fval = *((float *)scale);
409
info.cplxval.real = ((complex_t *)scale)->real;
410
info.cplxval.imag = ((complex_t *)scale)->imag;
413
info.dcplxval.real = ((dcomplex_t *)scale)->real;
414
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
417
info.lval = *((long *)scale);
424
for (i = 0; i < len; i++)
426
info.bytes = darr[i].bytes;
427
for (j = 0; j < darr[i].ptr_array_len; j++)
429
info.dst = darr[i].dst_ptr_array[j];
430
DCMF_Send ( &__acc_protocol,
433
DCMF_SEQUENTIAL_CONSISTENCY,
436
(char *) darr[i].src_ptr_array[j],
442
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
445
// Poll until all accumulate messages have been sent.
446
while (active) DCMF_Messager_advance ();
449
DCMF_CriticalSection_exit (0);
456
* \brief ARMCI Extension non-blocking vector accumulate operation.
458
* \param[in] datatype accumulate datatype (operation code)
459
* \param[in] scale opaque pointer to the scaling factor for accumulate
460
* \param[in] darr Descriptor array
461
* \param[in] len Length of descriptor array
462
* \param[in] proc Remote process(or) ID
463
* \param[in] nb_handle ARMCI non-blocking handle
467
int ARMCIX_NbAccV (int datatype, void * scale, armci_giov_t * darr, int len, int proc, armci_ihdl_t nb_handle)
469
DCMF_CriticalSection_enter (0);
471
// Calculate the number of requests
474
for (i = 0; i < len; i++)
475
for (j = 0; j < darr[i].ptr_array_len; j++)
478
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
479
dcmf->connection = &__connection[proc];
482
__connection[proc].active += n;
483
__global_connection.active += n;
485
ARMCIX_DCMF_AccInfo_t info;
486
info.datatype = datatype;
490
info.ival = *((int *)scale);
493
info.dval = *((double *)scale);
496
info.fval = *((float *)scale);
499
info.cplxval.real = ((complex_t *)scale)->real;
500
info.cplxval.imag = ((complex_t *)scale)->imag;
503
info.dcplxval.real = ((dcomplex_t *)scale)->real;
504
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
507
info.lval = *((long *)scale);
515
for (i = 0; i < len; i++)
517
//info.bytes = darr[i].bytes;
518
for (j = 0; j < darr[i].ptr_array_len; j++)
520
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
521
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
522
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
524
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
526
//info->bytes = bytes;
527
info->datatype = datatype;
528
info->bytes = darr[i].bytes;
532
info->ival = *((int *)scale);
535
info->dval = *((double *)scale);
538
info->fval = *((float *)scale);
541
info->cplxval.real = ((complex_t *)scale)->real;
542
info->cplxval.imag = ((complex_t *)scale)->imag;
545
info->dcplxval.real = ((dcomplex_t *)scale)->real;
546
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
549
info->lval = *((long *)scale);
556
info->dst = darr[i].dst_ptr_array[j];
557
DCMF_Send ( &__acc_protocol,
558
&(new_request->request),
560
DCMF_SEQUENTIAL_CONSISTENCY,
563
(char *) darr[i].src_ptr_array[j],
569
DCMF_CriticalSection_exit (0);
577
unsigned ARMCIX_DCMF_AccS_recurse (int datatype, void * scale,
578
void * src_ptr, int * src_stride_arr,
579
void * dst_ptr, int * dst_stride_arr,
580
int * seg_count, int stride_levels, int proc,
581
armci_ihdl_t nb_handle)
583
unsigned num_requests = 0;
585
//fprintf (stderr, "ARMCIX_DCMF_AccS_recurse() >> \n");
587
if (stride_levels == 0)
589
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
590
ARMCIX_DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
591
DCMF_Callback_t cb_done = { (void (*)(void *, DCMF_Error_t *))ARMCIX_DCMF_request_free, new_request };
593
ARMCIX_DCMF_AccInfo_t * info = (ARMCIX_DCMF_AccInfo_t *) &(new_request->quad[0]);
595
//info->bytes = bytes;
596
info->datatype = datatype;
600
info->ival = *((int *)scale);
603
info->dval = *((double *)scale);
606
info->fval = *((float *)scale);
609
info->cplxval.real = ((complex_t *)scale)->real;
610
info->cplxval.imag = ((complex_t *)scale)->imag;
613
info->dcplxval.real = ((dcomplex_t *)scale)->real;
614
info->dcplxval.imag = ((dcomplex_t *)scale)->imag;
617
info->lval = *((long *)scale);
624
ARMCIX_DCMF_AccInfo_t info;
625
info.datatype = datatype;
629
info.ival = *((int *)scale);
632
info.dval = *((double *)scale);
635
info.fval = *((float *)scale);
638
info.cplxval.real = ((complex_t *)scale)->real;
639
info.cplxval.imag = ((complex_t *)scale)->imag;
642
info.dcplxval.real = ((dcomplex_t *)scale)->real;
643
info.dcplxval.imag = ((dcomplex_t *)scale)->imag;
646
info.lval = *((long *)scale);
653
DCMF_Callback_t cb_free = { ARMCIX_DCMF_NbOp_cb_done, nb_handle };
654
DCMF_Request_t * new_request = ARMCIX_DCMF_request_allocate (cb_free);
655
DCMF_Callback_t cb_done = { (void(*)(void *)) ARMCIX_DCMF_request_free, new_request };
657
info->bytes = seg_count[0];
660
DCMF_Send ( &__acc_protocol,
661
&(new_request->request),
663
DCMF_SEQUENTIAL_CONSISTENCY,
674
char * src_tmp = (char *) src_ptr;
675
char * dst_tmp = (char *) dst_ptr;
677
for (i = 0; i < seg_count[stride_levels]; i++)
679
num_requests += ARMCIX_DCMF_AccS_recurse (datatype, scale,
680
src_tmp, src_stride_arr,
681
dst_tmp, dst_stride_arr,
682
seg_count, (stride_levels-1), proc,
685
src_tmp += src_stride_arr[(stride_levels-1)];
686
dst_tmp += dst_stride_arr[(stride_levels-1)];
690
//fprintf (stderr, "ARMCIX_DCMF_AccS_recurse() << num_requests = %d\n", num_requests);
697
* \brief ARMCI Extension blocking strided accumulate operation.
699
* \param[in] datatype accumulate datatype (operation code)
700
* \param[in] scale opaque pointer to the scaling factor for accumulate
701
* \param[in] src_ptr pointer to 1st segment at source
702
* \param[in] src_stride_arr array of strides at source
703
* \param[in] dst_ptr pointer to 1st segment at destination
704
* \param[in] dst_stride_arr array of strides at destination
705
* \param[in] seg_count number of segments at each stride levels: count[0]=bytes
706
* \param[in] stride_levels number of stride levels
707
* \param[in] proc remote process(or) ID
711
int ARMCIX_AccS (int datatype, void * scale,
712
void * src_ptr, int * src_stride_arr,
713
void * dst_ptr, int * dst_stride_arr,
714
int * seg_count, int stride_levels, int proc)
717
#error causes test-ibm.x to hang!
718
armci_ireq_t nb_request;
719
armci_ihdl_t nb_handle = (armci_ihdl_t) &nb_request;
720
ARMCIX_NbAccS (datatype, scale,
721
src_ptr, src_stride_arr,
722
dst_ptr, dst_stride_arr,
723
seg_count, stride_levels, proc,
725
#warning remove this ARMCIX_Fence() and implement some sort of ack scheme.
727
ARMCIX_Wait (&nb_handle->cmpl_info);
729
DCMF_CriticalSection_enter (0);
731
//fprintf (stderr, "ARMCIX_AccS() >> \n");
732
//fprintf (stderr, "ARMCIX_AccS() -- __connection[%d].sequence.origin=%d, __connection[%d].active=%d, __global_connection.active=%d\n", proc, __connection[proc].sequence.origin, proc, __connection[proc].active, __global_connection.active);
734
// Calculate the number of requests
737
for (i = 0; i < stride_levels; i++) n = n * seg_count[i+1];
739
armci_ireq_t nb_handle;
740
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle.cmpl_info;
741
dcmf->connection = &__connection[proc];
744
__connection[proc].active += n;
745
__global_connection.active += n;
748
count = ARMCIX_DCMF_AccS_recurse (datatype, scale,
749
src_ptr, src_stride_arr,
750
dst_ptr, dst_stride_arr,
751
seg_count, stride_levels, proc,
752
(armci_ihdl_t) &nb_handle);
754
#ifdef BLOCKING_OPERATIONS_REQUIRE_FENCE
758
while (dcmf->active) DCMF_Messager_advance ();
761
//fprintf (stderr, "ARMCIX_AccS() << \n");
763
DCMF_CriticalSection_exit (0);
769
* \brief ARMCI Extension non-blocking strided accumulate operation.
771
* \param[in] datatype accumulate datatype (operation code)
772
* \param[in] scale opaque pointer to the scaling factor for accumulate
773
* \param[in] src_ptr pointer to 1st segment at source
774
* \param[in] src_stride_arr array of strides at source
775
* \param[in] dst_ptr pointer to 1st segment at destination
776
* \param[in] dst_stride_arr array of strides at destination
777
* \param[in] seg_count number of segments at each stride levels: count[0]=bytes
778
* \param[in] stride_levels number of stride levels
779
* \param[in] proc remote process(or) ID
780
* \param[in] nb_handle ARMCI non-blocking handle
784
int ARMCIX_NbAccS (int datatype, void * scale,
785
void * src_ptr, int * src_stride_arr,
786
void * dst_ptr, int * dst_stride_arr,
787
int * seg_count, int stride_levels, int proc,
788
armci_ihdl_t nb_handle)
790
DCMF_CriticalSection_enter (0);
792
// Calculate the number of requests
795
for (i = 0; i < stride_levels; i++) n = n * seg_count[i+1];
797
armcix_dcmf_opaque_t * dcmf = (armcix_dcmf_opaque_t *) &nb_handle->cmpl_info;
798
dcmf->connection = &__connection[proc];
801
__connection[proc].active += n;
802
__global_connection.active += n;
805
count = ARMCIX_DCMF_AccS_recurse (datatype, scale,
806
src_ptr, src_stride_arr,
807
dst_ptr, dst_stride_arr,
808
seg_count, stride_levels, proc,
813
DCMF_CriticalSection_exit (0);