2
//@+node:gcross.20090120225710.4:@thin io.h
9
#include "ampi_controller.h"
16
void ROMIO_initialize();
17
void ROMIO_initialize_thread_variable();
21
//@+node:gcross.20090120225710.5:BlockIOSlave Declaration
22
//template<typename T_numtype, int N_rank> class AbstractBlockIOOperation<T_numtype,N_rank>;
24
template<typename T_numtype, int N_rank> class BlockIOSlave : public AMPISlave {
26
typedef TinyVector<int,N_rank> Coordinates;
30
Coordinates sizes, PE_dimensions;
32
//static const Coordinates distributions, distribution_arguments;
36
MPI_Datatype dtype, filetype;
38
//friend class AbstractBlockIOOperation<T_numtype,N_rank>;
41
BlockIOSlave(const Coordinates& sizes_, const Coordinates& PE_dimensions_, bool direct_write_) :
42
AMPISlave(blitz::product(PE_dimensions_)),
43
sizes(sizes_), PE_dimensions(PE_dimensions_), direct_write(direct_write_)
46
BlockIOSlave(CkMigrateMessage* msg) : AMPISlave(msg) { }
48
static BlockIOSlave* getThread() { return dynamic_cast<BlockIOSlave*>(BlockIOSlave::getThread()); }
50
virtual void initialize(void) {
53
Coordinates distributions = MPI_DISTRIBUTE_BLOCK,
54
distribution_arguments = MPI_DISTRIBUTE_DFLT_DARG;
56
AMPISlave::initialize();
58
dtype = getMPIType<T_numtype>();
60
PMPI_Type_create_darray(
61
product(PE_dimensions),
66
distribution_arguments.data(),
72
MPI_Type_commit(&filetype);
79
//template<typename T_numtype, int N_rank> Coordinates BlockIOSlave<T_numtype,N_rank>::distributions = MPI_DISTRIBUTE_BLOCK;
80
//template<typename T_numtype, int N_rank> Coordinates BlockIOSlave<T_numtype,N_rank>::distributions_arguments = MPI_DISTRIBUTE_DFLT_DARG;
81
//@-node:gcross.20090120225710.5:BlockIOSlave Declaration
82
//@+node:gcross.20090120225710.6:BlockIOMaster Declaration
83
template<typename T_numtype, int N_rank> class ReadBlockIOOperation;
84
template<typename T_numtype, int N_rank> class WriteBlockIOOperation;
86
template<typename T_numtype, int N_rank> class BlockIOMaster : public AMPIMaster {
88
typedef DistributedArray<T_numtype,N_rank> DistributedArrayType;
89
typedef ReadBlockIOOperation<T_numtype,N_rank> ReadBlockIOOperationType;
90
typedef WriteBlockIOOperation<T_numtype,N_rank> WriteBlockIOOperationType;
91
typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
95
DistributedArray<T_numtype,N_rank>& arr;
98
BlockIOMaster(DistributedArrayType& arr_) :
102
CkArrayOptions opts(arr.numPEs());
105
// If the array uses C ordering, then we can write the file data
106
// directly into the array. Otherwise, we need to make a copy
107
// of the data in memory first.
108
bool direct_write = true;
109
for(int i = 0; i < N_rank; ++i)
110
if(arr.ordering(i)!=i) {
111
direct_write = false;
115
thread_array_proxy = CProxy_BlockIOSlave<T_numtype,N_rank>::ckNew(
117
arr.processor_dimensions(),
124
void load(string filename) {
125
ReadBlockIOOperationType op(
126
allocate_next_operation_id(),
127
static_cast<SlaveProxyType&>(thread_array_proxy),
130
arr.broadcast_operation(op);
133
void save(string filename) {
134
WriteBlockIOOperationType op(
135
allocate_next_operation_id(),
136
static_cast<SlaveProxyType&>(thread_array_proxy),
139
arr.broadcast_operation(op);
143
//@-node:gcross.20090120225710.6:BlockIOMaster Declaration
144
//@+node:gcross.20090120225710.7:IO Operations
145
//@+node:gcross.20090120225710.8:AbstractBlockIOOperation
146
template<typename T_numtype, int N_rank> class AbstractBlockIOOperation : public DistributedArraySegmentOperation<T_numtype,N_rank> {
148
PUPable_abstract(AbstractBlockIOOperation)
152
typedef DistributedArraySegmentOperation<T_numtype,N_rank> BaseType;
153
typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
154
typedef Array<T_numtype,N_rank> ArrayType;
156
typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
157
typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
159
friend class PUP::able;
161
AbstractBlockIOOperation() { }
163
AbstractBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
164
operation_id(operation_id_),
169
AbstractBlockIOOperation(CkMigrateMessage* msg) { }
171
SlaveProxyType io_proxy;
175
ArraySegmentType* segment;
178
static void Function(vector<Operation*>& arguments) {
179
AbstractBlockIOOperation* operation = dynamic_cast<AbstractBlockIOOperation*>(arguments[0]);
180
SlaveType* thread = operation->io_proxy[operation->segment->thisIndex].ckLocal();
181
CkAssert(thread != NULL);
182
operation->doIO(*thread,*(dynamic_cast<ArrayType*>(operation->segment)));
187
virtual void doIO(const SlaveType& slave, ArrayType& arr) = 0;
189
virtual bool perform(ArraySegmentType& segment_) {
191
SlaveType* thread = io_proxy[segment_.thisIndex].ckLocal();
192
CkAssert(thread != NULL);
193
thread->perform_synchronized_operation(operation_id,1,0,this,Function);
197
virtual void pup(PUP::er &p) {
206
//@-node:gcross.20090120225710.8:AbstractBlockIOOperation
207
//@+node:gcross.20090120225710.12:ReadBlockIOOperation
208
template<typename T_numtype, int N_rank> class ReadBlockIOOperation : public AbstractBlockIOOperation<T_numtype,N_rank> {
210
PUPable_decl_template(ReadBlockIOOperation)
214
typedef AbstractBlockIOOperation<T_numtype,N_rank> BaseType;
215
typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
216
typedef Array<T_numtype,N_rank> ArrayType;
218
typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
219
typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
221
friend class BlockIOMaster<T_numtype,N_rank>;
223
ReadBlockIOOperation() { }
225
ReadBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
226
BaseType(operation_id_,io_proxy_,filename_)
229
ReadBlockIOOperation(CkMigrateMessage* msg) : BaseType(msg) { }
231
using BaseType::segment;
232
using BaseType::filename;
236
virtual void doIO(const SlaveType& slave, ArrayType& arr) {
238
PMPI_File_open(MPI_COMM_WORLD, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
239
PMPI_File_set_view(fh,0,slave.dtype,slave.filetype,"native",MPI_INFO_NULL);
243
if(slave.direct_write) {
244
PMPI_File_read_all(fh,arr.data(),arr.numElements(),slave.dtype,&status);
246
ArrayType temp_arr(arr.shape());
247
PMPI_File_read_all(fh,temp_arr.data(),arr.numElements(),slave.dtype,&status);
251
PMPI_File_close(&fh);
255
//@-node:gcross.20090120225710.12:ReadBlockIOOperation
256
//@+node:gcross.20090120225710.13:WriteBlockIOOperation
257
template<typename T_numtype, int N_rank> class WriteBlockIOOperation : public AbstractBlockIOOperation<T_numtype,N_rank> {
259
PUPable_decl_template(WriteBlockIOOperation)
263
typedef AbstractBlockIOOperation<T_numtype,N_rank> BaseType;
264
typedef DistributedArraySegment<T_numtype,N_rank> ArraySegmentType;
265
typedef Array<T_numtype,N_rank> ArrayType;
267
typedef BlockIOSlave<T_numtype,N_rank> SlaveType;
268
typedef CProxy_BlockIOSlave<T_numtype,N_rank> SlaveProxyType;
270
friend class BlockIOMaster<T_numtype,N_rank>;
272
WriteBlockIOOperation() { }
274
WriteBlockIOOperation(int operation_id_, SlaveProxyType io_proxy_, string filename_) :
275
BaseType(operation_id_,io_proxy_,filename_)
278
WriteBlockIOOperation(CkMigrateMessage* msg) : BaseType(msg) { }
280
using BaseType::segment;
281
using BaseType::filename;
285
virtual void doIO(const SlaveType& slave, ArrayType& arr) {
287
PMPI_File_open(MPI_COMM_WORLD, const_cast<char*>(filename.c_str()), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
288
PMPI_File_set_view(fh,0,slave.dtype,slave.filetype,"native",MPI_INFO_NULL);
292
if(slave.direct_write) {
293
PMPI_File_write_all(fh,arr.data(),arr.numElements(),slave.dtype,&status);
295
ArrayType temp_arr(arr.shape());
297
PMPI_File_write_all(fh,temp_arr.data(),temp_arr.numElements(),slave.dtype,&status);
300
PMPI_File_close(&fh);
304
//@-node:gcross.20090120225710.13:WriteBlockIOOperation
305
//@-node:gcross.20090120225710.7:IO Operations
308
#define CK_TEMPLATES_ONLY
310
#undef CK_TEMPLATES_ONLY
313
//@-node:gcross.20090120225710.4:@thin io.h