4
* Created on: Mar 9, 2009
14
#include <nssi_server.h>
22
#include "aggregation.h"
27
bool operator()(const char* s1, const char* s2) const
29
return strcmp(s1, s2) < 0;
40
} chunk_location_count_t;
42
typedef list<aggregation_chunk_details_t *> chunk_details_t;
43
typedef list<aggregation_chunk_details_t *>::iterator chunk_details_iterator_t;
46
aggregation_chunk_details_t *details;
47
chunk_details_t component_chunks;
48
} aggregation_chunk_t;
50
typedef list<aggregation_chunk_t *> chunks_t;
51
typedef list<aggregation_chunk_t *>::iterator chunks_iterator_t;
55
char var_path[ADIOS_PATH_MAX];
56
char var_name[ADIOS_PATH_MAX];
61
typedef map<const char *, per_var_details_t *, ltstr> var_map_t;
62
typedef map<const char *, per_var_details_t *, ltstr>::iterator var_map_iterator_t;
63
typedef pair<const char *, per_var_details_t *> var_map_pair_t;
68
write_type type; /* direct, aggregate independent, aggregate collective */
71
static map<int, file_details_t *> open_file_map;
72
typedef map<int, file_details_t *>::iterator open_file_map_iterator_t;
73
typedef pair<int, file_details_t *> open_file_map_pair_t;
82
bool compare_chunks_for_aggregation(const aggregation_chunk_t* c1, const aggregation_chunk_t* c2)
84
aggregation_chunk_details_t *details1=c1->details;
85
aggregation_chunk_details_t *details2=c2->details;
87
for (int i=0;i<details1->ndims;i++) {
88
if (details1->count[i] < details2->count[i]) {
90
} else if (details1->count[i] > details2->count[i]) {
94
for (int i=0;i<details1->ndims;i++) {
95
if (details1->offset[i] < details2->offset[i]) {
97
} else if (details1->offset[i] > details2->offset[i]) {
104
bool compare_chunks_for_caching(const aggregation_chunk_t* c1, const aggregation_chunk_t* c2)
106
aggregation_chunk_details_t *details1=c1->details;
107
aggregation_chunk_details_t *details2=c2->details;
109
for (int i=0;i<details1->ndims;i++) {
110
if (details1->offset[i] < details2->offset[i]) {
112
} else if (details1->offset[i] > details2->offset[i]) {
119
file_details_t *new_open_file(const int fd)
121
file_details_t *details=NULL;
123
details = new file_details_t;
129
int use_aggregation(const int fd)
131
file_details_t *details=NULL;
133
details = open_file_map[fd];
134
if ((details->type == WRITE_AGGREGATE_INDEPENDENT) ||
135
(details->type == WRITE_AGGREGATE_COLLECTIVE)) {
142
int use_caching(const int fd)
144
file_details_t *details=NULL;
146
details = open_file_map[fd];
147
if ((details->type == WRITE_CACHING_INDEPENDENT) ||
148
(details->type == WRITE_CACHING_COLLECTIVE)) {
155
int use_collective(const int fd)
157
file_details_t *details=NULL;
159
details = open_file_map[fd];
160
if ((details->type == WRITE_AGGREGATE_COLLECTIVE) ||
161
(details->type == WRITE_CACHING_COLLECTIVE)) {
168
int use_independent(const int fd)
170
file_details_t *details=NULL;
172
details = open_file_map[fd];
173
if ((details->type == WRITE_AGGREGATE_INDEPENDENT) ||
174
(details->type == WRITE_CACHING_INDEPENDENT)) {
181
int use_direct(const int fd)
183
file_details_t *details=NULL;
185
details = open_file_map[fd];
186
if (details->type == WRITE_DIRECT) {
194
enum ADIOS_DATATYPES type,
200
case adios_unsigned_byte:
204
return strlen ((char *) val);
207
case adios_unsigned_short:
211
case adios_unsigned_integer:
218
case adios_unsigned_long:
224
case adios_long_double:
230
case adios_double_complex:
238
void add_file(const int fd,
239
const write_type write_type)
241
file_details_t *details=NULL;
243
details = open_file_map[fd];
244
if (details == NULL) {
245
details=new_open_file(fd);
246
open_file_map[fd]=details;
249
details->type = write_type;
252
void add_chunk(aggregation_chunk_details_t *chunk_details)
254
file_details_t *file_details=NULL;
256
if (DEBUG > 3) printf("adding chunk: fd(%d) var_name(%s)\n", chunk_details->fd, chunk_details->var_name);
258
chunk_details->atype_size = getTypeSize(chunk_details->atype, chunk_details->buf);
259
if ((chunk_details->len > 0) && (chunk_details->num_elements > 0)) {
260
if ((chunk_details->len/chunk_details->num_elements) != chunk_details->atype_size) {
261
printf("datatype size conflict: (%lu/%d)==%lu is not equal to %d\n",
262
chunk_details->len, chunk_details->num_elements, chunk_details->len/chunk_details->num_elements, chunk_details->atype_size);
263
print_chunk(chunk_details);
267
file_details = open_file_map[chunk_details->fd];
268
if (file_details == NULL) {
269
printf("failed to add chunk. cannot aggregate.\n");
272
per_var_details_t *var_details = file_details->vars[chunk_details->var_name];
273
if (var_details == NULL) {
274
// if (DEBUG > 3) printf("var_details don't exist for %s\n", chunk_details->var_name);
275
var_details=new per_var_details_t;
276
strcpy(var_details->var_path, chunk_details->var_path);
277
strcpy(var_details->var_name, chunk_details->var_name);
278
var_details->chunks = new chunks_t;
279
var_details->chunks_received=0;
280
file_details->vars[chunk_details->var_name]=var_details;
282
// if (DEBUG > 3) printf("var_details already exist for %s\n", chunk_details->var_name);
284
aggregation_chunk_t *chunk=new aggregation_chunk_t;
285
chunk->details = chunk_details;
286
var_details->chunks->push_back(chunk);
287
var_details->chunks_received++;
292
void destroy_chunk(aggregation_chunk_details_t *details)
294
free(details->offset);
295
free(details->count);
296
for (int i=0;i<details->ndims;i++) {
297
free(details->offset_path[i]);
298
free(details->offset_name[i]);
299
free(details->count_path[i]);
300
free(details->count_name[i]);
302
free(details->offset_path);
303
free(details->offset_name);
304
free(details->count_path);
305
free(details->count_name);
306
// if (DEBUG > 3) printf("freeing details->buf(%p)\n", details->buf);
311
void cleanup_aggregation_chunks(const int fd)
313
file_details_t *details=NULL;
314
var_map_iterator_t var_iter;
315
per_var_details_t *var_details=NULL;
316
aggregation_chunk_t *chunk=NULL;
317
chunks_iterator_t chunks_iter;
318
chunk_details_iterator_t component_iter;
320
// if (DEBUG > 3) printf("entered cleanup_aggregation_chunks\n");
321
// if (DEBUG > 3) printf("cleaning up - fd(%d)\n", fd);
323
details = open_file_map[fd];
324
if (details == NULL) {
327
// var_iter = details->vars.begin();
328
// for (; var_iter != details->vars.end(); ++var_iter) {
329
// var_details = var_iter->second;
330
// if (var_details != NULL) {
331
// if (DEBUG > 3) printf("var_details first(%p) second(%s)\n", var_iter->first, var_details->var_name);
333
// if (DEBUG > 3) printf("var_details is NULL\n");
336
var_iter = details->vars.begin();
337
for (; var_iter != details->vars.end();) {
338
var_details = var_iter->second;
339
if (var_details != NULL) {
340
// cleanup_aggregation_chunks(fd, var_details->var_name);
341
chunks_iter = var_details->chunks->begin();
342
for (;chunks_iter != var_details->chunks->end(); ++chunks_iter) {
343
chunk = *chunks_iter;
344
component_iter = chunk->component_chunks.begin();
345
for (;component_iter != chunk->component_chunks.end(); ++component_iter) {
346
// if (DEBUG > 3) printf("cleanup - destroying component\n");
347
destroy_chunk(*component_iter);
349
chunk->component_chunks.clear();
350
// if (DEBUG > 3) printf("cleanup - destroying details\n");
351
destroy_chunk(chunk->details);
354
var_details->chunks->clear();
355
var_details->chunks_received=0;
358
// if (DEBUG > 3) printf("cannot cleanup - var_details is NULL\n");
360
details->vars.erase(var_iter++);
362
// details->vars.clear();
364
for(var_map_iterator_t vars_iter=details->vars.begin(); vars_iter!=details->vars.end(); ++vars_iter) {
365
per_var_details_t *pvd=vars_iter->second;
367
if (DEBUG > 3) printf("var_details first(%p) second(%s)\n", vars_iter->first, vars_iter->second->var_name);
369
if (DEBUG > 3) printf("var_details is NULL\n");
374
void cleanup_aggregation_chunks(const int fd, const char *var_name)
376
file_details_t *details=NULL;
377
aggregation_chunk_t *chunk=NULL;
378
chunks_iterator_t chunks_iter;
379
chunk_details_iterator_t component_iter;
380
var_map_iterator_t vars_iter;
382
// if (DEBUG > 3) printf("cleaning up - fd(%d) var_name(%s)\n", fd, var_name);
384
// for each variable, iterate over the chunks and destroy them
386
details = open_file_map[fd];
388
per_var_details_t *var_details = details->vars[var_name];
389
if (var_details != NULL) {
390
chunks_iter = var_details->chunks->begin();
391
for (;chunks_iter != var_details->chunks->end(); ++chunks_iter) {
392
chunk = *chunks_iter;
393
component_iter = chunk->component_chunks.begin();
394
for (;component_iter != chunk->component_chunks.end(); ++component_iter) {
395
// if (DEBUG > 3) printf("cleanup - destroying component\n");
396
destroy_chunk(*component_iter);
398
chunk->component_chunks.clear();
399
// if (DEBUG > 3) printf("cleanup - destroying details\n");
400
destroy_chunk(chunk->details);
403
var_details->chunks->clear();
404
var_details->chunks_received=0;
407
// if (DEBUG > 3) printf("cleanup failed - var_details is NULL (%s)\n", var_name);
410
var_map_iterator_t iter=details->vars.find(var_name);
411
if (iter != details->vars.end()) {
412
// if (DEBUG > 3) printf("erasing var_details with iter\n");
413
details->vars.erase(iter);
415
// if (DEBUG > 3) printf("cannot erase var_details with iter. var_details not found.\n");
420
static void recursive_print_chunk(aggregation_chunk_details_t *details, int offset, int *index, int current_dim)
427
if (current_dim < details->ndims-1) {
428
for (int i=0;i<details->count[current_dim];i++) {
429
my_offset = index[current_dim];
430
for (int i=current_dim+1;i<details->ndims;i++) {
431
my_offset *= details->count[i];
434
index[current_dim+1]=0;
435
recursive_print_chunk(details, offset+my_offset, index, current_dim+1);
436
index[current_dim] += details->atype_size;
438
//if (DEBUG > 3) printf("-----------------------------\n");
440
if (details->buf == NULL) {
441
if (DEBUG > 3) printf("details->buf == NULL\n");
444
for (int i=0;i<details->count[current_dim];i++) {
445
my_offset = offset+index[current_dim];
447
// if (i==0) if (DEBUG > 3) printf("[%d][%d][%d] (my_offset==%d)\n", index[0], index[1], index[2], my_offset);
448
if ((details->atype == adios_byte) || (details->atype == adios_unsigned_byte)) {
449
sprintf(tmp_str, "%c, ", *(char *)(((char *)details->buf) + my_offset));
451
else if (details->atype == adios_short || details->atype == adios_unsigned_short) {
452
sprintf(tmp_str, "%hx, ", *(short *)(((char *)details->buf) + my_offset));
454
else if (details->atype == adios_integer || details->atype == adios_unsigned_integer) {
455
sprintf(tmp_str, "%x, ", *(int *)(((char *)details->buf) + my_offset));
457
else if (details->atype == adios_long || details->atype == adios_unsigned_long) {
458
sprintf(tmp_str, "%lx, ", *(int *)(((char *)details->buf) + my_offset));
460
else if (details->atype == adios_real) {
461
sprintf(tmp_str, "%f, ", *(float *)(((char *)details->buf) + my_offset));
463
else if (details->atype == adios_double) {
464
sprintf(tmp_str, "%f, ", *(double *)(((char *)details->buf) + my_offset));
466
strncat(out_str, tmp_str, remaining);
467
remaining -= strlen(out_str);
469
index[current_dim] += details->atype_size;
471
// if (DEBUG > 3) printf("[%d][%d][%d] (my_offset==%d)\n", index[0], index[1], index[2], my_offset);
472
if (DEBUG > 3) printf("%s\n", out_str);
477
void print_chunk(aggregation_chunk_details_t *details)
479
int *index=(int *)calloc(details->ndims, sizeof(int));
484
if (DEBUG > 3) printf("+++++++++++++++++++++++++++++\n");
486
if (DEBUG > 3) printf("fd==%d\n", details->fd);
487
if (DEBUG > 3) printf("var_path==%s\n", details->var_path);
488
if (DEBUG > 3) printf("var_name==%s\n", details->var_name);
489
if (DEBUG > 3) printf("ndims==%d\n", details->ndims);
490
if (DEBUG > 3) printf("len==%ld\n", details->len);
491
if (DEBUG > 3) printf("num_elements==%d\n", details->num_elements);
494
for (int i=0;(i<details->ndims) && (remaining>0);i++) {
495
sprintf(tmp_str, "%ld,", details->offset[i]);
496
strncat(out_str, tmp_str, remaining);
497
remaining -= strlen(tmp_str);
499
if (DEBUG > 3) printf("offset[]==%s\n", out_str);
502
for (int i=0;(i<details->ndims) && (remaining>0);i++) {
503
sprintf(tmp_str, "%ld,", details->count[i]);
504
strncat(out_str, tmp_str, remaining);
505
remaining -= strlen(tmp_str);
507
if (DEBUG > 3) printf("count[]==%s\n", out_str);
508
if (DEBUG > 3) printf("buf==%p\n", details->buf);
511
// int current_dim=0;
512
// recursive_print_chunk(details, offset, index, current_dim);
513
if (DEBUG > 3) printf("+++++++++++++++++++++++++++++\n");
518
void print_chunk(aggregation_chunk_t *c)
520
if (c->details == NULL) {
521
if (DEBUG > 3) printf("chunk has no details. perhaps it was aggregated into another chunk.\n");
524
print_chunk(c->details);
527
static void recursive_copy_chunk(aggregation_chunk_details_t *src,
528
aggregation_chunk_details_t *dst,
538
if (current_dim < src->ndims-1) {
539
for (int i=0;i<src->count[current_dim];i++) {
540
my_src_offset = src_index[current_dim];
541
my_dst_offset = dst_index[current_dim];
542
// if (DEBUG > 3) printf("join_offset(%d) offset_diff[%d](%d)\n",
543
// join_offset, current_dim, src->offset[current_dim] - dst->offset[current_dim]);
544
my_dst_offset += ((src->offset[current_dim] - dst->offset[current_dim]) * src->atype_size);
545
for (int j=current_dim+1;j<src->ndims;j++) {
546
my_src_offset *= src->count[j];
547
my_dst_offset *= dst->count[j];
550
src_index[current_dim+1]=0;
551
dst_index[current_dim+1]=0;
552
recursive_copy_chunk(src, dst, src_offset+my_src_offset, dst_offset+my_dst_offset,
553
src_index, dst_index, current_dim+1);
554
src_index[current_dim] += src->atype_size;
555
dst_index[current_dim] += dst->atype_size;
558
dst_offset += ((src->offset[current_dim] - dst->offset[current_dim]) * src->atype_size);
559
memcpy(((char *)dst->buf) + dst_offset,
560
((char *)src->buf) + src_offset,
561
src->count[current_dim]*src->atype_size);
565
static void recursive_aggregate_chunks(aggregation_chunk_t *src1,
566
aggregation_chunk_t *src2,
567
aggregation_chunk_t *dst)
569
int *src_index=(int *)calloc(src1->details->ndims, sizeof(int));
570
int *dst_index=(int *)calloc(dst->details->ndims, sizeof(int));
575
memset(src_index, 0, src1->details->ndims*sizeof(int));
576
memset(dst_index, 0, dst->details->ndims*sizeof(int));
577
recursive_copy_chunk(src1->details, dst->details, src_offset, dst_offset, src_index, dst_index, current_dim);
578
memset(src_index, 0, src2->details->ndims*sizeof(int));
579
memset(dst_index, 0, dst->details->ndims*sizeof(int));
580
recursive_copy_chunk(src2->details, dst->details, src_offset, dst_offset, src_index, dst_index, current_dim);
586
static void copy_chunk(aggregation_chunk_details_t *src,
587
aggregation_chunk_details_t *dst)
589
int *src_index=(int *)calloc(src->ndims, sizeof(int));
590
int *dst_index=(int *)calloc(dst->ndims, sizeof(int));
595
memset(src_index, 0, src->ndims*sizeof(int));
596
memset(dst_index, 0, dst->ndims*sizeof(int));
597
recursive_copy_chunk(src, dst, src_offset, dst_offset, src_index, dst_index, current_dim);
603
aggregation_chunk_t *aggregate_chunks(aggregation_chunk_t *c1,
604
aggregation_chunk_t *c2,
607
aggregation_chunk_t *out=new aggregation_chunk_t;
609
//if (DEBUG > 3) printf("entered aggregate_chunks\n");
611
assert(c1->details->ndims == c2->details->ndims);
614
out->details = new aggregation_chunk_details_t;
616
out->details->fd = c1->details->fd;
617
strcpy(out->details->var_path, c1->details->var_path);
618
strcpy(out->details->var_name, c1->details->var_name);
619
out->details->ndims = c1->details->ndims;
620
// out->details->buf = calloc(c1->details->len+c2->details->len, c1->details->atype_size);
621
out->details->buf = NULL;
622
out->details->atype = c1->details->atype;
623
out->details->len = c1->details->len+c2->details->len;
624
out->details->atype = c1->details->atype;
625
out->details->num_elements = c1->details->num_elements+c2->details->num_elements;
626
out->details->atype_size = c1->details->atype_size;
627
out->details->offset_path = (char **)calloc(c1->details->ndims, sizeof(char *));
628
out->details->offset_name = (char **)calloc(c1->details->ndims, sizeof(char *));
629
out->details->offset = (uint64_t *)calloc(c1->details->ndims, sizeof(uint64_t));
630
out->details->count_path = (char **)calloc(c1->details->ndims, sizeof(char *));
631
out->details->count_name = (char **)calloc(c1->details->ndims, sizeof(char *));
632
out->details->count = (uint64_t *)calloc(c1->details->ndims, sizeof(uint64_t));
634
for (int i=0;i<c1->details->ndims;i++) {
635
out->details->offset_path[i] = strdup(c1->details->offset_path[i]);
636
out->details->offset_name[i] = strdup(c1->details->offset_name[i]);
637
out->details->count_path[i] = strdup(c1->details->count_path[i]);
638
out->details->count_name[i] = strdup(c1->details->count_name[i]);
640
memcpy(out->details->offset, c1->details->offset, c1->details->ndims*sizeof(uint64_t));
641
memcpy(out->details->count, c1->details->count, c1->details->ndims*sizeof(uint64_t));
642
out->details->count[join_dim] += c2->details->count[join_dim];
644
// recursive_aggregate_chunks(c1, c2, out);
646
if (c1->component_chunks.size() > 0) {
647
out->component_chunks.merge(c1->component_chunks);
648
c1->component_chunks.clear();
649
destroy_chunk(c1->details);
651
out->component_chunks.push_back(c1->details);
654
if (c2->component_chunks.size() > 0) {
655
out->component_chunks.merge(c2->component_chunks);
656
c2->component_chunks.clear();
657
destroy_chunk(c2->details);
659
out->component_chunks.push_back(c2->details);
665
//if (DEBUG > 3) printf("finished\n");
671
* Aggregate a particular variable in the file.
674
* - dimension count must be equal
675
* - strides must be equal
676
* - counts on matching faces must be equal
680
int try_aggregation(const int fd, const char *var_name)
682
int aggregation_success=FALSE;
684
file_details_t *file_details=NULL;
685
per_var_details_t *var_details=NULL;
686
aggregation_chunk_t *base_chunk=NULL;
687
aggregation_chunk_t *candidate_chunk=NULL;
688
aggregation_chunk_t *new_chunk=NULL;
689
chunks_iterator_t base_iter, candidate_iter;
691
chunk_location_count_t chunk_location_count;
692
int dim_with_movement=-1;
698
file_details = open_file_map[fd];
699
if (file_details == NULL) {
700
// if (DEBUG > 3) printf("agg failed for %s: file_details==NULL\n", var_name);
701
return(aggregation_success);
703
var_details = file_details->vars[var_name];
704
if (var_details == NULL) {
705
// if (DEBUG > 3) printf("agg failed for %s: var_details==NULL\n", var_name);
706
return(aggregation_success);
708
if (var_details->chunks->size() < 2) {
709
// if (DEBUG > 3) printf("returning with chunk count(%d)\n", var_details->chunks->size());
710
return(aggregation_success);
712
if (DEBUG > 3) printf("chunk count(%d)\n", var_details->chunks->size());
715
if (DEBUG > 3) printf("trying aggregation - fd(%d) var_name(%s)\n", fd, var_name);
717
var_details->chunks->sort(compare_chunks_for_aggregation);
720
printf("*****************\n");
721
printf("start aggregation (begin list)\n");
722
printf("*****************\n");
724
aggregation_chunk_details_t **chunks = get_chunks(fd, var_name, &chunk_count);
725
for (int i=0;i<chunk_count;i++) {
726
print_chunk(chunks[i]);
729
printf("*****************\n");
730
printf("start aggregation (end list)\n");
731
printf("*****************\n");
734
int success_this_pass=TRUE;
735
while (success_this_pass==TRUE) {
736
success_this_pass=FALSE;
738
// if (DEBUG > 3) printf("top: while loop\n");
741
base_iter = var_details->chunks->begin();
742
base_chunk = *base_iter;
743
offset_diff=new int[base_chunk->details->ndims];
744
for (;base_iter != var_details->chunks->end(); ++base_iter) {
745
// if (DEBUG > 3) printf("top: base_iter loop\n");
747
base_chunk = *base_iter;
749
//if (base_chunk != NULL) print_chunk(base_chunk);
751
// look for a chunk that can be aggregated to the base chunk
752
candidate_iter = base_iter;
754
for (;candidate_iter != var_details->chunks->end(); ++candidate_iter) {
755
// if (DEBUG > 3) printf("top: candidate_iter loop\n");
757
candidate_chunk = *candidate_iter;
759
//if (candidate_chunk != NULL) print_chunk(candidate_chunk);
763
if (base_chunk->details->ndims != candidate_chunk->details->ndims) {
766
// if (candidate_chunk->details->offset[0] != base_chunk->details->offset[0]) {
769
for (int i=0; i<base_chunk->details->ndims; i++) {
770
offset_diff[i] = candidate_chunk->details->offset[i] - base_chunk->details->offset[i];
772
if (failed) continue;
774
chunk_location_count.ahead_count=0;
775
chunk_location_count.behind_count=0;
776
chunk_location_count.same_count=0;
777
chunk_location_count.no_match_count=0;
778
int agg_dims=base_chunk->details->ndims; /* the number of dimensions to aggregate */
779
for (int i=0; i<agg_dims; i++) {
780
if ((offset_diff[i] < 0) && (-offset_diff[i] == candidate_chunk->details->count[i])) {
781
// the candidate is "behind/below" and touching the base chunk in this dimension
782
chunk_location_count.behind_count++;
784
} else if ((offset_diff[i] > 0) && (offset_diff[i] == base_chunk->details->count[i])) {
785
// the candidate is "ahead of/above" and touching the base chunk in this dimension
786
chunk_location_count.ahead_count++;
788
} else if (offset_diff[i] == 0) {
789
// the candidate is "equal to" the base chunk in this dimension
790
chunk_location_count.same_count++;
792
// the candidate and the base chunk don't match in this dimension
793
chunk_location_count.no_match_count++;
799
* These tests can be interesting, but are not required to get the job done.
801
if (chunk_location_count.no_match_count > 0) {
802
// no matching face found. can't aggregate.
806
if (chunk_location_count.same_count == base_chunk->ndims) {
807
// base and candidate have same offset. bad? can't aggregate.
811
if (chunk_location_count.ahead_count > 1) {
812
// movement in more than one direction
815
if (chunk_location_count.behind_count > 1) {
816
// movement in more than one direction
819
if ((chunk_location_count.ahead_count > 0) &&
820
(chunk_location_count.behind_count > 0)) {
821
// movement in more than one direction
825
if ((chunk_location_count.ahead_count == 0) &&
826
(chunk_location_count.behind_count == 0)) {
827
// possible movement, but the chunks don't touch
832
// check that the matching faces have the same dimensions
833
for (int i=0; i<base_chunk->details->ndims; i++) {
834
if ((i != dim_with_movement) &&
835
(base_chunk->details->count[i] != candidate_chunk->details->count[i])) {
840
if (failed) continue;
843
* Do NOT uncomment these print_chunk() lines in production code.
844
* They are *very* slow even if the debug level is set low and
845
* nothing is being logged.
847
// netcdf_debug_level=LOG_ALL;
848
// if (DEBUG > 3) printf("*****************\n");
849
// if (DEBUG > 3) printf("base chunk\n");
850
// if (DEBUG > 3) printf("*****************\n");
851
// if (base_chunk != NULL) print_chunk(base_chunk);
852
// if (DEBUG > 3) printf("*****************\n");
853
// if (DEBUG > 3) printf("candidate chunk\n");
854
// if (DEBUG > 3) printf("*****************\n");
855
// if (candidate_chunk != NULL) print_chunk(candidate_chunk);
856
// netcdf_debug_level=old;
858
if ((chunk_location_count.ahead_count == 1) &&
859
(chunk_location_count.behind_count == 0) &&
860
(chunk_location_count.same_count == agg_dims-1)) {
861
// aggregation is base + candidate
862
new_chunk = aggregate_chunks(base_chunk, candidate_chunk, dim_with_movement);
863
} else if ((chunk_location_count.ahead_count == 0) &&
864
(chunk_location_count.behind_count == 1) &&
865
(chunk_location_count.same_count == agg_dims-1)) {
866
// aggregation is candidate + base
867
new_chunk = aggregate_chunks(candidate_chunk, base_chunk, dim_with_movement);
869
// chunks aren't aligned
870
//if (DEBUG > 3) printf("**********\nchunks are not aligned\n**********\n");
874
assert(new_chunk != NULL);
877
* Do NOT uncomment these print_chunk() lines in production code.
878
* They are *very* slow even if the debug level is set low and
879
* nothing is being logged.
881
// netcdf_debug_level=LOG_ALL;
882
// if (DEBUG > 3) printf("*****************\n");
883
// if (DEBUG > 3) printf("new chunk\n");
884
// if (DEBUG > 3) printf("*****************\n");
885
// if (new_chunk != NULL) print_chunk(new_chunk);
886
// netcdf_debug_level=old;
888
var_details->chunks->remove(base_chunk);
889
var_details->chunks->remove(candidate_chunk);
891
delete candidate_chunk;
893
agg_chunks.push_back(new_chunk);
895
aggregation_success = TRUE;
896
success_this_pass = TRUE;
900
if (success_this_pass == TRUE) break;
902
chunks_iterator_t agg_iter = agg_chunks.begin();
903
for (;agg_iter != agg_chunks.end();agg_iter++) {
904
var_details->chunks->push_back(*agg_iter);
908
delete[] offset_diff;
912
printf("*****************\n");
913
printf("end aggregation (begin list)\n");
914
printf("*****************\n");
916
aggregation_chunk_details_t **chunks = get_chunks(fd, var_name, &chunk_count);
917
for (int i=0;i<chunk_count;i++) {
918
print_chunk(chunks[i]);
921
printf("*****************\n");
922
printf("end aggregation (end list)\n");
923
printf("*****************\n");
926
// netcdf_debug_level=LOG_ALL;
927
chunks_iterator_t dst_iter=var_details->chunks->begin();
928
for(;dst_iter != var_details->chunks->end();dst_iter++) {
929
chunk_details_iterator_t component_iter=(*dst_iter)->component_chunks.begin();
930
if (((*dst_iter)->details->buf == NULL) && ((*dst_iter)->details->len > 0)) {
931
(*dst_iter)->details->buf = (char *)malloc((*dst_iter)->details->len);
932
// if (DEBUG > 3) printf("allocated dst_iter->details->buf(%p), len(%ld)\n",
933
// (*dst_iter)->details->buf,
934
// (*dst_iter)->details->len);
936
// if (DEBUG > 3) printf("did not allocate dst_iter->details->buf(%p)\n", (*dst_iter)->details->buf);
938
for(;component_iter != (*dst_iter)->component_chunks.end();component_iter++) {
939
//if (DEBUG > 3) printf("copying component\n");
940
copy_chunk(*component_iter, (*dst_iter)->details);
941
//if (DEBUG > 3) printf("destroying component\n");
942
destroy_chunk(*component_iter);
944
(*dst_iter)->component_chunks.clear();
946
// netcdf_debug_level=old;
948
// netcdf_debug_level=LOG_ALL;
949
//if (DEBUG > 3) printf("*****************\n");
950
//if (DEBUG > 3) printf("chunks after aggregation\n");
951
//if (DEBUG > 3) printf("*****************\n");
952
// base_iter = var_details->chunks->begin();
953
// for (;base_iter != var_details->chunks->end(); ++base_iter) {
954
// base_chunk = *base_iter;
955
// if (base_chunk != NULL)
956
// print_chunk(base_chunk);
958
// netcdf_debug_level=old;
960
return(aggregation_success);
964
* Aggregate all variables in the file.
967
int try_aggregation(const int fd)
969
int aggregation_success=FALSE;
971
file_details_t *file_details=NULL;
972
var_map_iterator_t var_iter;
973
per_var_details_t *var_details=NULL;
975
if (DEBUG > 3) printf("entered try_aggregation - fd(%d)\n", fd);
977
file_details = open_file_map[fd];
978
if (file_details == NULL) {
979
return(aggregation_success);
981
var_iter = file_details->vars.begin();
982
for (; var_iter != file_details->vars.end(); var_iter++) {
983
var_details = var_iter->second;
984
if (var_details == NULL) {
985
// if (DEBUG > 3) printf("var_details==NULL. continuing\n");
988
// if (DEBUG > 3) printf("aggregating var_name(%s)\n", var_details->var_name);
989
while(try_aggregation(fd, var_details->var_name) == TRUE);
993
aggregation_success = TRUE;
995
return(aggregation_success);
998
int aggregate_data_ready_to_write(const int fd, const char *var_name)
1000
// file_details_t *details = open_file_map[fd];
1001
// int chunks_needed=0;
1003
// if (details->num_participants > 0) {
1004
// chunks_needed = details->num_participants;
1006
// chunks_needed = details->participants->size();
1009
// if (details->vars[var_name]->chunks_received == chunks_needed) {
1016
int cache_data_ready_to_write(const int fd, const char *var_name)
1018
// file_details_t *details = open_file_map[fd];
1019
// int chunks_needed=0;
1021
// if (details->num_participants > 0) {
1022
// chunks_needed = details->num_participants;
1024
// chunks_needed = details->participants->size();
1027
// if (details->vars[var_name]->chunks_received == chunks_needed) {
1034
aggregation_chunk_details_t **get_chunks(const int fd, const char *var_name, int *chunk_count)
1036
file_details_t *details=NULL;
1037
per_var_details_t *var_details=NULL;
1038
aggregation_chunk_details_t **chunks=NULL;
1039
chunks_iterator_t iter;
1041
if (DEBUG > 3) printf("entered get_chunks - fd(%d) var_name(%s)\n", fd, var_name);
1045
details = open_file_map[fd];
1046
if (details == NULL) {
1049
var_details = details->vars[var_name];
1050
if (var_details == NULL) {
1054
*chunk_count = details->vars[var_name]->chunks->size();
1056
if (DEBUG > 3) printf("found %d chunks to return\n", *chunk_count);
1058
if (*chunk_count == 0) {
1061
chunks = (aggregation_chunk_details_t **)malloc(*chunk_count*sizeof(aggregation_chunk_details_t *));
1063
var_details->chunks->sort(compare_chunks_for_caching);
1065
iter = var_details->chunks->begin();
1066
for (int i=0;iter != var_details->chunks->end(); ++iter,i++) {
1067
chunks[i] = (*iter)->details;
1068
// print_chunk(chunks[i]);
1071
//if (DEBUG > 3) printf("finished\n");
1076
aggregation_chunk_details_t **get_chunks(const int fd, int *chunk_count)
1078
file_details_t *details=NULL;
1079
var_map_iterator_t var_iter;
1080
per_var_details_t *var_details=NULL;
1081
aggregation_chunk_details_t **chunks=NULL;
1082
chunks_iterator_t chunks_iter;
1084
if (DEBUG > 3) printf("entered get_chunks - fd(%d)\n", fd);
1088
details = open_file_map[fd];
1089
if (details == NULL) {
1092
var_iter = details->vars.begin();
1093
for (; var_iter != details->vars.end(); ++var_iter) {
1094
var_details = var_iter->second;
1095
*chunk_count += var_details->chunks->size();
1098
if (DEBUG > 3) printf("found %d chunks to return\n", *chunk_count);
1100
if (*chunk_count == 0) {
1103
chunks = (aggregation_chunk_details_t **)malloc(*chunk_count*sizeof(aggregation_chunk_details_t *));
1106
var_iter = details->vars.begin();
1107
for (; var_iter != details->vars.end(); var_iter++) {
1108
var_details = var_iter->second;
1109
var_details->chunks->sort(compare_chunks_for_caching);
1110
chunks_iter = var_details->chunks->begin();
1111
for (;chunks_iter != var_details->chunks->end(); ++chunks_iter,i++) {
1112
chunks[i] = (*chunks_iter)->details;
1113
// print_chunk(chunks[i]);
1117
//if (DEBUG > 3) printf("finished\n");