~gcrosswhite/charon/trunk

« back to all changes in this revision

Viewing changes to src/io.h

  • Committer: Gregory Crosswhite
  • Date: 2009-04-14 20:37:52 UTC
  • Revision ID: gcross@phys.washington.edu-20090414203752-aayz9j7fqmrzccun
Cleared out all but the simplest and most solid functionality.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
//@+leo-ver=4-thin
2
 
//@+node:gcross.20090120225710.4:@thin io.h
3
 
//@@language cplusplus
4
 
 
5
 
#ifndef IO_H
6
 
#define IO_H
7
 
 
8
 
#include "charon.h"
9
 
#include "ampi_controller.h"
10
 
#include "io.decl.h"
11
 
#include "mpitypes.h"
12
 
#include "mpi.h"
13
 
#include "mpio.h"
14
 
 
15
 
extern "C" {
16
 
    void ROMIO_initialize();
17
 
    void ROMIO_initialize_thread_variable();
18
 
}
19
 
 
20
 
//@+others
21
 
//@+node:gcross.20090120225710.5:BlockIOSlave Declaration
22
 
//template<typename T_numtype, int N_rank> class AbstractBlockIOOperation<T_numtype,N_rank>;
23
 
 
24
 
template<typename T_numtype, int N_rank> class BlockIOSlave : public AMPISlave {
25
 
 
26
 
        typedef TinyVector<int,N_rank> Coordinates;
27
 
 
28
 
    public:
29
 
 
30
 
        Coordinates sizes, PE_dimensions;
31
 
 
32
 
        //static const Coordinates distributions, distribution_arguments;
33
 
 
34
 
        bool direct_write;
35
 
 
36
 
        MPI_Datatype dtype, filetype;
37
 
 
38
 
        //friend class AbstractBlockIOOperation<T_numtype,N_rank>;
39
 
 
40
 
    public:
41
 
        BlockIOSlave(const Coordinates& sizes_, const Coordinates& PE_dimensions_, bool direct_write_) :
42
 
            AMPISlave(blitz::product(PE_dimensions_)),
43
 
            sizes(sizes_), PE_dimensions(PE_dimensions_), direct_write(direct_write_)
44
 
        {
45
 
        }
46
 
        BlockIOSlave(CkMigrateMessage* msg) : AMPISlave(msg) { }
47
 
 
48
 
        static BlockIOSlave* getThread() { return dynamic_cast<BlockIOSlave*>(BlockIOSlave::getThread()); }
49
 
 
50
 
        virtual void initialize(void) {
51
 
            ROMIO_initialize();
52
 
 
53
 
            Coordinates distributions = MPI_DISTRIBUTE_BLOCK,
54
 
                        distribution_arguments = MPI_DISTRIBUTE_DFLT_DARG;
55
 
 
56
 
            AMPISlave::initialize();
57
 
 
58
 
            dtype = getMPIType<T_numtype>();
59
 
 
60
 
            PMPI_Type_create_darray(
61
 
                product(PE_dimensions),
62
 
                thisIndex,
63
 
                N_rank,
64
 
                sizes.data(),
65
 
                distributions.data(),
66
 
                distribution_arguments.data(),
67
 
                PE_dimensions.data(),
68
 
                MPI_ORDER_C,
69
 
                dtype,
70
 
                &filetype
71
 
            );
72
 
            MPI_Type_commit(&filetype);
73
 
 
74
 
        }
75
 
 
76
 
 
77
 
};
78
 
 
79
 
//template<typename T_numtype, int N_rank> Coordinates BlockIOSlave<T_numtype,N_rank>::distributions = MPI_DISTRIBUTE_BLOCK;
80
 
//template<typename T_numtype, int N_rank> Coordinates BlockIOSlave<T_numtype,N_rank>::distributions_arguments = MPI_DISTRIBUTE_DFLT_DARG;
81
 
//@-node:gcross.20090120225710.5:BlockIOSlave Declaration
82
 
//@+node:gcross.20090120225710.6:BlockIOMaster Declaration
83
 
template<typename T_numtype, int N_rank> class ReadBlockIOOperation;
84
 
template<typename T_numtype, int N_rank> class WriteBlockIOOperation;
85
 
 
86
 
template<typename T_numtype, int N_rank> class BlockIOMaster : public AMPIMaster {
87
 
 
88
 
        typedef DistributedArray<T_numtype,N_rank> DistributedArrayType;
89
 
        typedef ReadBlockIOOperation<T_numtype,N_rank> ReadBlockIOOperationType;
90
 
        typedef WriteBlockIOOperation<T_numtype,N_rank> WriteBlockIOOperationType;
91
 
        typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
92
 
 
93
 
    protected:
94
 
 
95
 
        DistributedArray<T_numtype,N_rank>& arr;
96
 
 
97
 
    public:
98
 
        BlockIOMaster(DistributedArrayType& arr_) :
99
 
            AMPIMaster(),
100
 
            arr(arr_)
101
 
        {
102
 
            CkArrayOptions opts(arr.numPEs());
103
 
            arr.bind(opts);
104
 
 
105
 
            // If the array uses C ordering, then we can write the file data
106
 
            // directly into the array.  Otherwise, we need to make a copy
107
 
            // of the data in memory first.
108
 
            bool direct_write = true;
109
 
            for(int i = 0; i < N_rank; ++i)
110
 
                if(arr.ordering(i)!=i) {
111
 
                    direct_write = false;
112
 
                    break;
113
 
                }
114
 
 
115
 
            thread_array_proxy = CProxy_BlockIOSlave<T_numtype,N_rank>::ckNew(
116
 
                arr.extent(),
117
 
                arr.processor_dimensions(),
118
 
                direct_write,
119
 
                opts
120
 
            );
121
 
 
122
 
        }
123
 
 
124
 
        void load(string filename) {
125
 
            ReadBlockIOOperationType op(
126
 
                allocate_next_operation_id(),
127
 
                static_cast<SlaveProxyType&>(thread_array_proxy),
128
 
                filename
129
 
            );
130
 
            arr.broadcast_operation(op);
131
 
        }
132
 
 
133
 
        void save(string filename) {
134
 
            WriteBlockIOOperationType op(
135
 
                allocate_next_operation_id(),
136
 
                static_cast<SlaveProxyType&>(thread_array_proxy),
137
 
                filename
138
 
            );
139
 
            arr.broadcast_operation(op);
140
 
        }
141
 
 
142
 
};
143
 
//@-node:gcross.20090120225710.6:BlockIOMaster Declaration
144
 
//@+node:gcross.20090120225710.7:IO Operations
145
 
//@+node:gcross.20090120225710.8:AbstractBlockIOOperation
146
 
template<typename T_numtype, int N_rank> class AbstractBlockIOOperation : public DistributedArraySegmentOperation<T_numtype,N_rank> {
147
 
 
148
 
PUPable_abstract(AbstractBlockIOOperation)
149
 
 
150
 
protected:
151
 
 
152
 
    typedef DistributedArraySegmentOperation<T_numtype,N_rank> BaseType;
153
 
    typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
154
 
    typedef Array<T_numtype,N_rank> ArrayType;
155
 
 
156
 
    typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
157
 
    typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
158
 
 
159
 
    friend class PUP::able;
160
 
 
161
 
    AbstractBlockIOOperation() { }
162
 
 
163
 
    AbstractBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
164
 
        operation_id(operation_id_),
165
 
        io_proxy(io_proxy_),
166
 
        filename(filename_)
167
 
    { }
168
 
 
169
 
    AbstractBlockIOOperation(CkMigrateMessage* msg) { }
170
 
 
171
 
    SlaveProxyType io_proxy;
172
 
    int operation_id;
173
 
    string filename;
174
 
 
175
 
    ArraySegmentType* segment;
176
 
    int index;
177
 
 
178
 
    static void Function(vector<Operation*>& arguments) {
179
 
        AbstractBlockIOOperation* operation = dynamic_cast<AbstractBlockIOOperation*>(arguments[0]);
180
 
        SlaveType* thread = operation->io_proxy[operation->segment->thisIndex].ckLocal();
181
 
        CkAssert(thread != NULL);
182
 
        operation->doIO(*thread,*(dynamic_cast<ArrayType*>(operation->segment)));
183
 
    }
184
 
 
185
 
public:
186
 
 
187
 
    virtual void doIO(const SlaveType& slave, ArrayType& arr) = 0;
188
 
 
189
 
    virtual bool perform(ArraySegmentType& segment_) {
190
 
        segment = &segment_;
191
 
        SlaveType* thread = io_proxy[segment_.thisIndex].ckLocal();
192
 
        CkAssert(thread != NULL);
193
 
        thread->perform_synchronized_operation(operation_id,1,0,this,Function);
194
 
        return false;
195
 
    }
196
 
 
197
 
    virtual void pup(PUP::er &p) {
198
 
        BaseType::pup(p);
199
 
        p|io_proxy;
200
 
        p|operation_id;
201
 
        p|filename;
202
 
    }
203
 
 
204
 
 
205
 
};
206
 
//@-node:gcross.20090120225710.8:AbstractBlockIOOperation
207
 
//@+node:gcross.20090120225710.12:ReadBlockIOOperation
208
 
template<typename T_numtype, int N_rank> class ReadBlockIOOperation : public AbstractBlockIOOperation<T_numtype,N_rank> {
209
 
 
210
 
PUPable_decl_template(ReadBlockIOOperation)
211
 
 
212
 
protected:
213
 
 
214
 
    typedef AbstractBlockIOOperation<T_numtype,N_rank> BaseType;
215
 
    typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
216
 
    typedef Array<T_numtype,N_rank> ArrayType;
217
 
 
218
 
    typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
219
 
    typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
220
 
 
221
 
    friend class BlockIOMaster<T_numtype,N_rank>;
222
 
 
223
 
    ReadBlockIOOperation() { }
224
 
 
225
 
    ReadBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
226
 
        BaseType(operation_id_,io_proxy_,filename_)
227
 
    { }
228
 
 
229
 
    ReadBlockIOOperation(CkMigrateMessage* msg) : BaseType(msg) { }
230
 
 
231
 
    using BaseType::segment;
232
 
    using BaseType::filename;
233
 
 
234
 
public:
235
 
 
236
 
    virtual void doIO(const SlaveType& slave, ArrayType& arr) {
237
 
        MPI_File fh;
238
 
        PMPI_File_open(MPI_COMM_WORLD, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
239
 
        PMPI_File_set_view(fh,0,slave.dtype,slave.filetype,"native",MPI_INFO_NULL);
240
 
 
241
 
        MPI_Status status;
242
 
 
243
 
        if(slave.direct_write) {
244
 
            PMPI_File_read_all(fh,arr.data(),arr.numElements(),slave.dtype,&status);
245
 
        } else {
246
 
            ArrayType temp_arr(arr.shape());
247
 
            PMPI_File_read_all(fh,temp_arr.data(),arr.numElements(),slave.dtype,&status);
248
 
            arr = temp_arr;
249
 
        }
250
 
 
251
 
        PMPI_File_close(&fh);
252
 
    }
253
 
 
254
 
};
255
 
//@-node:gcross.20090120225710.12:ReadBlockIOOperation
256
 
//@+node:gcross.20090120225710.13:WriteBlockIOOperation
257
 
template<typename T_numtype, int N_rank> class WriteBlockIOOperation : public AbstractBlockIOOperation<T_numtype,N_rank> {
258
 
 
259
 
PUPable_decl_template(WriteBlockIOOperation)
260
 
 
261
 
protected:
262
 
 
263
 
    typedef AbstractBlockIOOperation<T_numtype,N_rank> BaseType;
264
 
    typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
265
 
    typedef Array<T_numtype,N_rank> ArrayType;
266
 
 
267
 
    typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
268
 
    typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
269
 
 
270
 
    friend class BlockIOMaster<T_numtype,N_rank>;
271
 
 
272
 
    WriteBlockIOOperation() { }
273
 
 
274
 
    WriteBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
275
 
        BaseType(operation_id_,io_proxy_,filename_)
276
 
    { }
277
 
 
278
 
    WriteBlockIOOperation(CkMigrateMessage* msg) : BaseType(msg) { }
279
 
 
280
 
    using BaseType::segment;
281
 
    using BaseType::filename;
282
 
 
283
 
public:
284
 
 
285
 
    virtual void doIO(const SlaveType& slave, ArrayType& arr) {
286
 
        MPI_File fh;
287
 
        PMPI_File_open(MPI_COMM_WORLD, const_cast<char*>(filename.c_str()), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
288
 
        PMPI_File_set_view(fh,0,slave.dtype,slave.filetype,"native",MPI_INFO_NULL);
289
 
 
290
 
        MPI_Status status;
291
 
 
292
 
        if(slave.direct_write) {
293
 
            PMPI_File_write_all(fh,arr.data(),arr.numElements(),slave.dtype,&status);
294
 
        } else {
295
 
            ArrayType temp_arr(arr.shape());
296
 
            temp_arr = arr;
297
 
            PMPI_File_write_all(fh,temp_arr.data(),temp_arr.numElements(),slave.dtype,&status);
298
 
        }
299
 
 
300
 
        PMPI_File_close(&fh);
301
 
    }
302
 
 
303
 
};
304
 
//@-node:gcross.20090120225710.13:WriteBlockIOOperation
305
 
//@-node:gcross.20090120225710.7:IO Operations
306
 
//@-others
307
 
 
308
 
#define CK_TEMPLATES_ONLY
309
 
#include "io.def.h"
310
 
#undef CK_TEMPLATES_ONLY
311
 
 
312
 
#endif
313
 
//@-node:gcross.20090120225710.4:@thin io.h
314
 
//@-leo