~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
 
 
18
#include "Ndbfs.hpp"
 
19
#include "AsyncFile.hpp"
 
20
#include "Filename.hpp"
 
21
 
 
22
#include <signaldata/FsOpenReq.hpp>
 
23
#include <signaldata/FsCloseReq.hpp>
 
24
#include <signaldata/FsReadWriteReq.hpp>
 
25
#include <signaldata/FsAppendReq.hpp>
 
26
#include <signaldata/FsRemoveReq.hpp>
 
27
#include <signaldata/FsConf.hpp>
 
28
#include <signaldata/FsRef.hpp>
 
29
#include <signaldata/NdbfsContinueB.hpp>
 
30
#include <signaldata/DumpStateOrd.hpp>
 
31
 
 
32
#include <RefConvert.hpp>
 
33
#include <NdbSleep.h>
 
34
#include <NdbOut.hpp>
 
35
#include <Configuration.hpp>
 
36
 
 
37
#define DEBUG(x) { ndbout << "FS::" << x << endl; }
 
38
 
 
39
inline
 
40
int pageSize( const NewVARIABLE* baseAddrRef )
 
41
{
 
42
   int log_psize;
 
43
   int log_qsize = baseAddrRef->bits.q;
 
44
   int log_vsize = baseAddrRef->bits.v;
 
45
   if (log_vsize < 3)
 
46
      log_vsize = 3;
 
47
   log_psize = log_qsize + log_vsize - 3;
 
48
   return (1 << log_psize);
 
49
}
 
50
 
 
51
 
 
52
Ndbfs::Ndbfs(Block_context& ctx) :
 
53
  SimulatedBlock(NDBFS, ctx),
 
54
  scanningInProgress(false),
 
55
  theLastId(0),
 
56
  theRequestPool(0),
 
57
  m_maxOpenedFiles(0)
 
58
{
 
59
  BLOCK_CONSTRUCTOR(Ndbfs);
 
60
 
 
61
  // Set received signals
 
62
  addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
 
63
  addRecSignal(GSN_DUMP_STATE_ORD,  &Ndbfs::execDUMP_STATE_ORD);
 
64
  addRecSignal(GSN_STTOR,  &Ndbfs::execSTTOR);
 
65
  addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
 
66
  addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
 
67
  addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
 
68
  addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
 
69
  addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
 
70
  addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
 
71
  addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
 
72
  addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
 
73
   // Set send signals
 
74
}
 
75
 
 
76
Ndbfs::~Ndbfs()
 
77
{
 
78
  // Delete all files
 
79
  // AsyncFile destuctor will take care of deleting
 
80
  // the thread it has created
 
81
  for (unsigned i = 0; i < theFiles.size(); i++){
 
82
    AsyncFile* file = theFiles[i];
 
83
    delete file; 
 
84
    theFiles[i] = NULL;
 
85
  }//for
 
86
  theFiles.clear();
 
87
  if (theRequestPool)
 
88
    delete theRequestPool;
 
89
}
 
90
 
 
91
void 
 
92
Ndbfs::execREAD_CONFIG_REQ(Signal* signal)
 
93
{
 
94
  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
 
95
 
 
96
  Uint32 ref = req->senderRef;
 
97
  Uint32 senderData = req->senderData;
 
98
 
 
99
  const ndb_mgm_configuration_iterator * p = 
 
100
    m_ctx.m_config.getOwnConfigIterator();
 
101
  ndbrequire(p != 0);
 
102
  theFileSystemPath.assfmt("%sndb_%u_fs%s", m_ctx.m_config.fileSystemPath(),
 
103
                           getOwnNodeId(), DIR_SEPARATOR);
 
104
  theBackupFilePath.assign(m_ctx.m_config.backupFilePath());
 
105
 
 
106
  theRequestPool = new Pool<Request>;
 
107
 
 
108
  m_maxFiles = 0;
 
109
  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
 
110
  Uint32 noIdleFiles = 27;
 
111
  ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
 
112
  if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
 
113
    m_maxFiles = noIdleFiles;
 
114
  // Create idle AsyncFiles
 
115
  for (Uint32 i = 0; i < noIdleFiles; i++){
 
116
    theIdleFiles.push_back(createAsyncFile());
 
117
  }
 
118
 
 
119
  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
 
120
  conf->senderRef = reference();
 
121
  conf->senderData = senderData;
 
122
  sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 
 
123
             ReadConfigConf::SignalLength, JBB);
 
124
}
 
125
 
 
126
/* Received a restart signal.
 
127
 * Answer it like any other block
 
128
 * PR0  : StartCase
 
129
 * DR0  : StartPhase
 
130
 * DR1  : ?
 
131
 * DR2  : ?
 
132
 * DR3  : ?
 
133
 * DR4  : ?
 
134
 * DR5  : SignalKey
 
135
 */
 
136
void
 
137
Ndbfs::execSTTOR(Signal* signal)
 
138
{
 
139
  jamEntry();
 
140
  
 
141
  if(signal->theData[1] == 0){ // StartPhase 0
 
142
    jam();
 
143
    
 
144
    {
 
145
#ifdef NDB_WIN32
 
146
      CreateDirectory(theFileSystemPath.c_str(), 0);
 
147
#else
 
148
      mkdir(theFileSystemPath.c_str(),
 
149
            S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
 
150
#endif
 
151
    }      
 
152
    
 
153
    cownref = NDBFS_REF;
 
154
    // close all open files
 
155
    ndbrequire(theOpenFiles.size() == 0);
 
156
    
 
157
    scanningInProgress = false;
 
158
    
 
159
    signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
 
160
    sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1);
 
161
 
 
162
    signal->theData[3] = 255;
 
163
    sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
 
164
    return;
 
165
  }
 
166
  ndbrequire(0);
 
167
}
 
168
 
 
169
int 
 
170
Ndbfs::forward( AsyncFile * file, Request* request)
 
171
{
 
172
  jam();
 
173
  file->execute(request);
 
174
  return 1;
 
175
}
 
176
 
 
177
void 
 
178
Ndbfs::execFSOPENREQ(Signal* signal)
 
179
{
 
180
  jamEntry();
 
181
  const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
 
182
  const BlockReference userRef = fsOpenReq->userReference;
 
183
  AsyncFile* file = getIdleFile();
 
184
  ndbrequire(file != NULL);
 
185
  Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
 
186
 
 
187
  Uint32 userPointer = fsOpenReq->userPointer;
 
188
  
 
189
  if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
 
190
  {
 
191
    Ptr<GlobalPage> page_ptr;
 
192
    if(m_global_page_pool.seize(page_ptr) == false)
 
193
    {
 
194
      FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
195
      fsRef->userPointer  = userPointer; 
 
196
      fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
 
197
      fsRef->osErrorCode  = ~0; // Indicate local error
 
198
      sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
 
199
      return;
 
200
    }
 
201
    file->m_page_ptr = page_ptr;
 
202
  } 
 
203
  else
 
204
  {
 
205
    ndbassert(file->m_page_ptr.isNull());
 
206
    file->m_page_ptr.setNull();
 
207
  }
 
208
  
 
209
  if(signal->getNoOfSections() == 0){
 
210
    jam();
 
211
    file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
 
212
  } else {
 
213
    jam();
 
214
    SegmentedSectionPtr ptr;
 
215
    signal->getSection(ptr, FsOpenReq::FILENAME);
 
216
    file->theFileName.set(spec, ptr, g_sectionSegmentPool);
 
217
    releaseSections(signal);
 
218
  }
 
219
  file->reportTo(&theFromThreads);
 
220
  if (getenv("NDB_TRACE_OPEN"))
 
221
    ndbout_c("open(%s)", file->theFileName.c_str());
 
222
  
 
223
  Request* request = theRequestPool->get();
 
224
  request->action = Request::open;
 
225
  request->error = 0;
 
226
  request->set(userRef, userPointer, newId() );
 
227
  request->file = file;
 
228
  request->theTrace = signal->getTrace();
 
229
  request->par.open.flags = fsOpenReq->fileFlags;
 
230
  request->par.open.page_size = fsOpenReq->page_size;
 
231
  request->par.open.file_size = fsOpenReq->file_size_hi;
 
232
  request->par.open.file_size <<= 32;
 
233
  request->par.open.file_size |= fsOpenReq->file_size_lo;
 
234
  request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
 
235
  
 
236
  ndbrequire(forward(file, request));
 
237
}
 
238
 
 
239
void 
 
240
Ndbfs::execFSREMOVEREQ(Signal* signal)
 
241
{
 
242
  jamEntry();
 
243
  const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
 
244
  const BlockReference userRef = req->userReference;
 
245
  AsyncFile* file = getIdleFile();
 
246
  ndbrequire(file != NULL);
 
247
 
 
248
  Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
 
249
  file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
 
250
  file->reportTo(&theFromThreads);
 
251
  
 
252
  Request* request = theRequestPool->get();
 
253
  request->action = Request::rmrf;
 
254
  request->par.rmrf.directory = req->directory;
 
255
  request->par.rmrf.own_directory = req->ownDirectory;
 
256
  request->error = 0;
 
257
  request->set(userRef, req->userPointer, newId() );
 
258
  request->file = file;
 
259
  request->theTrace = signal->getTrace();
 
260
  
 
261
  ndbrequire(forward(file, request));
 
262
}
 
263
 
 
264
/*
 
265
 * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
 
266
 * remove file
 
267
 */
 
268
void 
 
269
Ndbfs::execFSCLOSEREQ(Signal * signal)
 
270
{
 
271
  jamEntry();
 
272
  const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
 
273
  const BlockReference userRef = fsCloseReq->userReference;
 
274
  const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
 
275
  const UintR userPointer = fsCloseReq->userPointer; 
 
276
 
 
277
  AsyncFile* openFile = theOpenFiles.find(filePointer);
 
278
  if (openFile == NULL) {
 
279
    // The file was not open, send error back to sender
 
280
    jam();    
 
281
    // Initialise FsRef signal
 
282
    FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
283
    fsRef->userPointer  = userPointer; 
 
284
    fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
 
285
    fsRef->osErrorCode  = ~0; // Indicate local error
 
286
    sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
 
287
    return;
 
288
  }
 
289
 
 
290
  Request *request = theRequestPool->get();
 
291
  if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
 
292
     jam();
 
293
     request->action = Request::closeRemove;
 
294
  } else {
 
295
     jam();
 
296
     request->action = Request::close;
 
297
  }
 
298
  request->set(userRef, fsCloseReq->userPointer, filePointer);
 
299
  request->file = openFile;
 
300
  request->error = 0;
 
301
  request->theTrace = signal->getTrace();
 
302
 
 
303
  ndbrequire(forward(openFile, request));
 
304
}
 
305
 
 
306
void 
 
307
Ndbfs::readWriteRequest(int action, Signal * signal)
 
308
{
 
309
  const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0];
 
310
  Uint16 filePointer =  (Uint16)fsRWReq->filePointer;
 
311
  const UintR userPointer = fsRWReq->userPointer; 
 
312
  const BlockReference userRef = fsRWReq->userReference;
 
313
  const BlockNumber blockNumber = refToBlock(userRef);
 
314
 
 
315
  AsyncFile* openFile = theOpenFiles.find(filePointer);
 
316
 
 
317
  const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
 
318
  UintPtr tPageSize;
 
319
  UintPtr tClusterSize;
 
320
  UintPtr tNRR;
 
321
  UintPtr tPageOffset;
 
322
  char*        tWA;
 
323
  FsRef::NdbfsErrorCodeType errorCode;
 
324
 
 
325
  Request *request = theRequestPool->get();
 
326
  request->error = 0;
 
327
  request->set(userRef, userPointer, filePointer);
 
328
  request->file = openFile;
 
329
  request->action = (Request::Action) action;
 
330
  request->theTrace = signal->getTrace();
 
331
 
 
332
  Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
 
333
 
 
334
  if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
 
335
    jam();
 
336
    errorCode = FsRef::fsErrInvalidParameters;
 
337
    goto error;
 
338
  }
 
339
 
 
340
  if(format != FsReadWriteReq::fsFormatGlobalPage &&
 
341
     format != FsReadWriteReq::fsFormatSharedPage)
 
342
  {     
 
343
    if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
 
344
      jam();// Ensure that a valid variable is used    
 
345
      errorCode = FsRef::fsErrInvalidParameters;
 
346
      goto error;
 
347
    }
 
348
    if (myBaseAddrRef == NULL) {
 
349
      jam(); // Ensure that a valid variable is used
 
350
      errorCode = FsRef::fsErrInvalidParameters;
 
351
      goto error;
 
352
    }
 
353
    if (openFile == NULL) {
 
354
      jam(); //file not open
 
355
      errorCode = FsRef::fsErrFileDoesNotExist;
 
356
      goto error;
 
357
    }
 
358
    tPageSize = pageSize(myBaseAddrRef);
 
359
    tClusterSize = myBaseAddrRef->ClusterSize;
 
360
    tNRR = myBaseAddrRef->nrr;
 
361
    tWA = (char*)myBaseAddrRef->WA;
 
362
    
 
363
    switch (format) {
 
364
      
 
365
      // List of memory and file pages pairs
 
366
    case FsReadWriteReq::fsFormatListOfPairs: { 
 
367
      jam();
 
368
      for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
 
369
        jam();
 
370
        const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
 
371
        const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
 
372
        if (varIndex >= tNRR) {
 
373
          jam();
 
374
          errorCode = FsRef::fsErrInvalidParameters;
 
375
          goto error;
 
376
        }//if
 
377
        request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
 
378
        request->par.readWrite.pages[i].size = tPageSize;
 
379
        request->par.readWrite.pages[i].offset = fileOffset * tPageSize;
 
380
      }//for
 
381
      request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
 
382
      break;
 
383
    }//case
 
384
      
 
385
      // Range of memory page with one file page
 
386
    case FsReadWriteReq::fsFormatArrayOfPages: { 
 
387
      if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
 
388
        jam();
 
389
        errorCode = FsRef::fsErrInvalidParameters;
 
390
        goto error;
 
391
      }//if
 
392
      const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
 
393
      const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
 
394
      
 
395
      request->par.readWrite.pages[0].offset = fileOffset * tPageSize;
 
396
      request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
 
397
      request->par.readWrite.numberOfPages = 1;
 
398
      request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
 
399
      break;
 
400
    }//case
 
401
      
 
402
      // List of memory pages followed by one file page
 
403
    case FsReadWriteReq::fsFormatListOfMemPages: { 
 
404
      
 
405
      tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
 
406
      tPageOffset *= tPageSize;
 
407
      
 
408
      for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
 
409
        jam();
 
410
        UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
 
411
        
 
412
        if (varIndex >= tNRR) {
 
413
          jam();
 
414
          errorCode = FsRef::fsErrInvalidParameters;
 
415
          goto error;
 
416
        }//if
 
417
        request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
 
418
        request->par.readWrite.pages[i].size = tPageSize;
 
419
        request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize);
 
420
      }//for
 
421
      request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
 
422
      break;
 
423
      // make it a writev or readv
 
424
    }//case
 
425
      
 
426
    default: {
 
427
      jam();
 
428
      errorCode = FsRef::fsErrInvalidParameters;
 
429
      goto error;
 
430
    }//default
 
431
    }//switch
 
432
  } 
 
433
  else if (format == FsReadWriteReq::fsFormatGlobalPage)
 
434
  {
 
435
    Ptr<GlobalPage> ptr;
 
436
    m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
 
437
    request->par.readWrite.pages[0].buf = (char*)ptr.p;
 
438
    request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
 
439
    request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
 
440
    request->par.readWrite.numberOfPages = 1;
 
441
  }
 
442
  else
 
443
  {
 
444
    ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
 
445
    Ptr<GlobalPage> ptr;
 
446
    m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
 
447
    request->par.readWrite.pages[0].buf = (char*)ptr.p;
 
448
    request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
 
449
    request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
 
450
    request->par.readWrite.numberOfPages = 1;
 
451
  }
 
452
  
 
453
  ndbrequire(forward(openFile, request));
 
454
  return;
 
455
  
 
456
error:
 
457
  theRequestPool->put(request);
 
458
  FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
459
  fsRef->userPointer = userPointer;
 
460
  fsRef->setErrorCode(fsRef->errorCode, errorCode);
 
461
  fsRef->osErrorCode = ~0; // Indicate local error
 
462
  switch (action) {
 
463
  case Request:: write:
 
464
  case Request:: writeSync: {
 
465
    jam();
 
466
    sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
 
467
    break;
 
468
  }//case
 
469
  case Request:: readPartial: 
 
470
  case Request:: read: {
 
471
    jam();
 
472
    sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
 
473
  }//case
 
474
  }//switch
 
475
  return;
 
476
}
 
477
 
 
478
/*
 
479
    PR0: File Pointer , theData[0]
 
480
    DR0: User reference, theData[1]
 
481
    DR1: User Pointer,   etc.
 
482
    DR2: Flag
 
483
    DR3: Var number
 
484
    DR4: amount of pages
 
485
    DR5->: Memory Page id and File page id according to Flag
 
486
*/
 
487
void 
 
488
Ndbfs::execFSWRITEREQ(Signal* signal)
 
489
{
 
490
  jamEntry();
 
491
  const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
 
492
  
 
493
  if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
 
494
    jam();
 
495
    readWriteRequest( Request::writeSync, signal );
 
496
  } else {
 
497
    jam();
 
498
    readWriteRequest( Request::write, signal );
 
499
  }
 
500
}
 
501
 
 
502
/*
 
503
    PR0: File Pointer
 
504
    DR0: User reference
 
505
    DR1: User Pointer
 
506
    DR2: Flag
 
507
    DR3: Var number
 
508
    DR4: amount of pages
 
509
    DR5->: Memory Page id and File page id according to Flag
 
510
*/
 
511
void 
 
512
Ndbfs::execFSREADREQ(Signal* signal)
 
513
{
 
514
  jamEntry();
 
515
  FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
 
516
  if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
 
517
    readWriteRequest( Request::readPartial, signal );
 
518
  else
 
519
    readWriteRequest( Request::read, signal );
 
520
}
 
521
 
 
522
/*
 
523
 * PR0: File Pointer DR0: User reference DR1: User Pointer
 
524
 */
 
525
void
 
526
Ndbfs::execFSSYNCREQ(Signal * signal)
 
527
{
 
528
  jamEntry();
 
529
  Uint16 filePointer =  (Uint16)signal->theData[0];
 
530
  BlockReference userRef = signal->theData[1];
 
531
  const UintR userPointer = signal->theData[2]; 
 
532
  AsyncFile* openFile = theOpenFiles.find(filePointer);
 
533
 
 
534
  if (openFile == NULL) {
 
535
     jam(); //file not open
 
536
     FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
537
     fsRef->userPointer = userPointer;
 
538
     fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
 
539
     fsRef->osErrorCode = ~0; // Indicate local error
 
540
     sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
 
541
     return;
 
542
  }
 
543
  
 
544
  Request *request = theRequestPool->get();
 
545
  request->error = 0;
 
546
  request->action = Request::sync;
 
547
  request->set(userRef, userPointer, filePointer);
 
548
  request->file = openFile;
 
549
  request->theTrace = signal->getTrace();
 
550
  
 
551
  ndbrequire(forward(openFile,request));
 
552
}
 
553
 
 
554
void 
 
555
Ndbfs::execFSAPPENDREQ(Signal * signal)
 
556
{
 
557
  const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
 
558
  const Uint16 filePointer =  (Uint16)fsReq->filePointer;
 
559
  const UintR userPointer = fsReq->userPointer; 
 
560
  const BlockReference userRef = fsReq->userReference;
 
561
  const BlockNumber blockNumber = refToBlock(userRef);
 
562
 
 
563
  FsRef::NdbfsErrorCodeType errorCode;
 
564
 
 
565
  AsyncFile* openFile = theOpenFiles.find(filePointer);
 
566
  const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
 
567
 
 
568
  const Uint32* tWA   = (const Uint32*)myBaseAddrRef->WA;
 
569
  const Uint32  tSz   = myBaseAddrRef->nrr;
 
570
  const Uint32 offset = fsReq->offset;
 
571
  const Uint32 size   = fsReq->size;
 
572
  const Uint32 synch_flag = fsReq->synch_flag;
 
573
  Request *request = theRequestPool->get();
 
574
 
 
575
  if (openFile == NULL) {
 
576
    jam();
 
577
    errorCode = FsRef::fsErrFileDoesNotExist;
 
578
    goto error;
 
579
  }
 
580
 
 
581
  if (myBaseAddrRef == NULL) {
 
582
    jam(); // Ensure that a valid variable is used
 
583
    errorCode = FsRef::fsErrInvalidParameters;
 
584
    goto error;
 
585
  }
 
586
  
 
587
  if (fsReq->varIndex >= getBatSize(blockNumber)) {
 
588
    jam();// Ensure that a valid variable is used    
 
589
    errorCode = FsRef::fsErrInvalidParameters;
 
590
    goto error;
 
591
  }
 
592
  
 
593
  if(offset + size > tSz){
 
594
    jam(); // Ensure that a valid variable is used
 
595
    errorCode = FsRef::fsErrInvalidParameters;
 
596
    goto error;
 
597
  }
 
598
 
 
599
  request->error = 0;
 
600
  request->set(userRef, userPointer, filePointer);
 
601
  request->file = openFile;
 
602
  request->theTrace = signal->getTrace();
 
603
  
 
604
  request->par.append.buf = (const char *)(tWA + offset);
 
605
  request->par.append.size = size << 2;
 
606
 
 
607
  if (!synch_flag)
 
608
    request->action = Request::append;
 
609
  else
 
610
    request->action = Request::append_synch;
 
611
  ndbrequire(forward(openFile, request));
 
612
  return;
 
613
  
 
614
error:
 
615
  jam();
 
616
  theRequestPool->put(request);
 
617
  FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
618
  fsRef->userPointer = userPointer;
 
619
  fsRef->setErrorCode(fsRef->errorCode, errorCode);
 
620
  fsRef->osErrorCode = ~0; // Indicate local error
 
621
 
 
622
  jam();
 
623
  sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
 
624
  return;
 
625
}
 
626
 
 
627
Uint16
 
628
Ndbfs::newId()
 
629
{
 
630
  // finds a new key, eg a new filepointer
 
631
  for (int i = 1; i < SHRT_MAX; i++) 
 
632
  {
 
633
    if (theLastId == SHRT_MAX) {
 
634
      jam();
 
635
      theLastId = 1;
 
636
    } else {
 
637
      jam();
 
638
      theLastId++;
 
639
    }
 
640
      
 
641
    if(theOpenFiles.find(theLastId) == NULL) {
 
642
      jam();
 
643
      return theLastId;
 
644
    }
 
645
  }  
 
646
  ndbrequire(1 == 0);
 
647
  // The program will not reach this point
 
648
  return 0;
 
649
}
 
650
 
 
651
AsyncFile*
 
652
Ndbfs::createAsyncFile(){
 
653
 
 
654
  // Check limit of open files
 
655
  if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles) {
 
656
    // Print info about all open files
 
657
    for (unsigned i = 0; i < theFiles.size(); i++){
 
658
      AsyncFile* file = theFiles[i];
 
659
      ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
 
660
    }
 
661
    ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
 
662
  }
 
663
 
 
664
  AsyncFile* file = new AsyncFile(* this);
 
665
  file->doStart();
 
666
 
 
667
  // Put the file in list of all files
 
668
  theFiles.push_back(file);
 
669
 
 
670
#ifdef VM_TRACE
 
671
  infoEvent("NDBFS: Created new file thread %d", theFiles.size());
 
672
#endif
 
673
  
 
674
  return file;
 
675
}
 
676
 
 
677
AsyncFile*
 
678
Ndbfs::getIdleFile(){
 
679
  AsyncFile* file;
 
680
  if (theIdleFiles.size() > 0){
 
681
    file = theIdleFiles[0];
 
682
    theIdleFiles.erase(0);
 
683
  } else {
 
684
    file = createAsyncFile();
 
685
  } 
 
686
  return file;
 
687
}
 
688
 
 
689
 
 
690
 
 
691
void
 
692
Ndbfs::report(Request * request, Signal* signal)
 
693
{
 
694
  const Uint32 orgTrace = signal->getTrace();
 
695
  signal->setTrace(request->theTrace);
 
696
  const BlockReference ref = request->theUserReference;
 
697
 
 
698
  if(!request->file->m_page_ptr.isNull())
 
699
  {
 
700
    m_global_page_pool.release(request->file->m_page_ptr);
 
701
    request->file->m_page_ptr.setNull();
 
702
  }
 
703
  
 
704
  if (request->error) {
 
705
    jam();
 
706
    // Initialise FsRef signal
 
707
    FsRef * const fsRef = (FsRef *)&signal->theData[0];
 
708
    fsRef->userPointer = request->theUserPointer;
 
709
    if(request->error & FsRef::FS_ERR_BIT)
 
710
    {
 
711
      fsRef->errorCode = request->error;
 
712
      fsRef->osErrorCode = 0;
 
713
    }
 
714
    else 
 
715
    {
 
716
      fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
 
717
      fsRef->osErrorCode = request->error; 
 
718
    }
 
719
    switch (request->action) {
 
720
    case Request:: open: {
 
721
      jam();
 
722
      // Put the file back in idle files list
 
723
      theIdleFiles.push_back(request->file);  
 
724
      sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
 
725
      break;
 
726
    }
 
727
    case Request:: closeRemove:
 
728
    case Request:: close: {
 
729
      jam();
 
730
      sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
 
731
      break;
 
732
    }
 
733
    case Request:: writeSync:
 
734
    case Request:: writevSync:
 
735
    case Request:: write:
 
736
    case Request:: writev: {
 
737
      jam();
 
738
      sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
 
739
      break;
 
740
    }
 
741
    case Request:: read: 
 
742
    case Request:: readPartial:
 
743
    case Request:: readv: {
 
744
      jam();
 
745
      sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
 
746
      break;
 
747
    }
 
748
    case Request:: sync: {
 
749
      jam();
 
750
      sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
 
751
      break;
 
752
    }
 
753
    case Request::append:
 
754
    case Request::append_synch:
 
755
    {
 
756
      jam();
 
757
      sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
 
758
      break;
 
759
    }
 
760
    case Request::rmrf: {
 
761
      jam();
 
762
      // Put the file back in idle files list
 
763
      theIdleFiles.push_back(request->file);  
 
764
      sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
 
765
      break;
 
766
    }
 
767
    
 
768
    case Request:: end: {
 
769
      // Report nothing
 
770
      break;
 
771
    }
 
772
    }//switch
 
773
  } else {
 
774
    jam();
 
775
    FsConf * const fsConf = (FsConf *)&signal->theData[0];
 
776
    fsConf->userPointer = request->theUserPointer;
 
777
    switch (request->action) {
 
778
    case Request:: open: {
 
779
      jam();
 
780
      theOpenFiles.insert(request->file, request->theFilePointer);
 
781
 
 
782
      // Keep track on max number of opened files
 
783
      if (theOpenFiles.size() > m_maxOpenedFiles)
 
784
        m_maxOpenedFiles = theOpenFiles.size();
 
785
 
 
786
      fsConf->filePointer = request->theFilePointer;
 
787
      sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB);
 
788
      break;
 
789
    }
 
790
    case Request:: closeRemove:
 
791
    case Request:: close: {
 
792
      jam();
 
793
      // removes the file from OpenFiles list
 
794
      theOpenFiles.erase(request->theFilePointer); 
 
795
      // Put the file in idle files list
 
796
      theIdleFiles.push_back(request->file); 
 
797
      sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
 
798
      break;
 
799
    }
 
800
    case Request:: writeSync:
 
801
    case Request:: writevSync:
 
802
    case Request:: write:
 
803
    case Request:: writev: {
 
804
      jam();
 
805
      sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB);
 
806
      break;
 
807
    }
 
808
    case Request:: read:
 
809
    case Request:: readv: {
 
810
      jam();
 
811
      sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB);
 
812
      break;
 
813
    }
 
814
    case Request:: readPartial: {
 
815
      jam();
 
816
      fsConf->bytes_read = request->par.readWrite.pages[0].size;
 
817
      sendSignal(ref, GSN_FSREADCONF, signal, 2, JBB);
 
818
      break;
 
819
    }
 
820
    case Request:: sync: {
 
821
      jam();
 
822
      sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
 
823
      break;
 
824
    }//case
 
825
    case Request::append:
 
826
    case Request::append_synch:
 
827
    {
 
828
      jam();
 
829
      signal->theData[1] = request->par.append.size;
 
830
      sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
 
831
      break;
 
832
    }
 
833
    case Request::rmrf: {
 
834
      jam();
 
835
      // Put the file in idle files list
 
836
      theIdleFiles.push_back(request->file);            
 
837
      sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
 
838
      break;
 
839
    }
 
840
    case Request:: end: {
 
841
      // Report nothing
 
842
      break;
 
843
    }
 
844
    }    
 
845
  }//if
 
846
  signal->setTrace(orgTrace);
 
847
}
 
848
 
 
849
 
 
850
bool
 
851
Ndbfs::scanIPC(Signal* signal)
 
852
{
 
853
   Request* request = theFromThreads.tryReadChannel();
 
854
   jam();
 
855
   if (request) {
 
856
      jam();
 
857
      report(request, signal);
 
858
      theRequestPool->put(request);
 
859
      return true;
 
860
   }
 
861
   return false;
 
862
}
 
863
 
 
864
#if defined NDB_WIN32
 
865
Uint32 Ndbfs::translateErrno(int aErrno)
 
866
{
 
867
  switch (aErrno)
 
868
    {
 
869
      //permission denied
 
870
    case ERROR_ACCESS_DENIED:
 
871
 
 
872
      return FsRef::fsErrPermissionDenied;
 
873
      //temporary not accessible
 
874
    case ERROR_PATH_BUSY:
 
875
    case ERROR_NO_MORE_SEARCH_HANDLES:
 
876
 
 
877
      return FsRef::fsErrTemporaryNotAccessible;
 
878
      //no space left on device
 
879
    case ERROR_HANDLE_DISK_FULL:
 
880
    case ERROR_DISK_FULL:
 
881
 
 
882
      return FsRef::fsErrNoSpaceLeftOnDevice;
 
883
      //none valid parameters
 
884
    case ERROR_INVALID_HANDLE:
 
885
    case ERROR_INVALID_DRIVE:
 
886
    case ERROR_INVALID_ACCESS:
 
887
    case ERROR_HANDLE_EOF:
 
888
    case ERROR_BUFFER_OVERFLOW:
 
889
 
 
890
      return FsRef::fsErrInvalidParameters;
 
891
      //environment error
 
892
    case ERROR_CRC:
 
893
    case ERROR_ARENA_TRASHED:
 
894
    case ERROR_BAD_ENVIRONMENT:
 
895
    case ERROR_INVALID_BLOCK:
 
896
    case ERROR_WRITE_FAULT:
 
897
    case ERROR_READ_FAULT:
 
898
    case ERROR_OPEN_FAILED:
 
899
 
 
900
      return FsRef::fsErrEnvironmentError;
 
901
 
 
902
      //no more process resources
 
903
    case ERROR_TOO_MANY_OPEN_FILES:
 
904
    case ERROR_NOT_ENOUGH_MEMORY:
 
905
    case ERROR_OUTOFMEMORY:
 
906
      return FsRef::fsErrNoMoreResources;
 
907
      //no file
 
908
    case ERROR_FILE_NOT_FOUND:
 
909
      return FsRef::fsErrFileDoesNotExist;
 
910
 
 
911
    case ERR_ReadUnderflow:
 
912
      return FsRef::fsErrReadUnderflow;
 
913
 
 
914
    default:
 
915
      return FsRef::fsErrUnknown;
 
916
    }
 
917
}
 
918
#else
 
919
Uint32 Ndbfs::translateErrno(int aErrno)
 
920
{
 
921
  switch (aErrno)
 
922
    {
 
923
      //permission denied
 
924
    case EACCES:
 
925
    case EROFS:
 
926
    case ENXIO:
 
927
      return FsRef::fsErrPermissionDenied;
 
928
      //temporary not accessible
 
929
    case EAGAIN:
 
930
    case ETIMEDOUT:
 
931
    case ENOLCK:
 
932
    case EINTR:
 
933
    case EIO:
 
934
      return FsRef::fsErrTemporaryNotAccessible;
 
935
      //no space left on device
 
936
    case ENFILE:
 
937
    case EDQUOT:
 
938
#ifdef ENOSR
 
939
    case ENOSR:
 
940
#endif
 
941
    case ENOSPC:
 
942
    case EFBIG:
 
943
      return FsRef::fsErrNoSpaceLeftOnDevice;
 
944
      //none valid parameters
 
945
    case EINVAL:
 
946
    case EBADF:
 
947
    case ENAMETOOLONG:
 
948
    case EFAULT:
 
949
    case EISDIR:
 
950
    case ENOTDIR:
 
951
    case EEXIST:
 
952
    case ETXTBSY:
 
953
      return FsRef::fsErrInvalidParameters;
 
954
      //environment error
 
955
    case ELOOP:
 
956
#ifdef ENOLINK
 
957
    case ENOLINK:
 
958
#endif
 
959
#ifdef EMULTIHOP
 
960
    case EMULTIHOP:
 
961
#endif
 
962
#ifdef EOPNOTSUPP
 
963
    case EOPNOTSUPP:
 
964
#endif
 
965
#ifdef ESPIPE
 
966
    case ESPIPE:
 
967
#endif
 
968
    case EPIPE:
 
969
      return FsRef::fsErrEnvironmentError;
 
970
 
 
971
      //no more process resources
 
972
    case EMFILE:
 
973
    case ENOMEM:
 
974
      return FsRef::fsErrNoMoreResources;
 
975
      //no file
 
976
    case ENOENT:
 
977
      return FsRef::fsErrFileDoesNotExist;
 
978
 
 
979
    case ERR_ReadUnderflow:
 
980
      return FsRef::fsErrReadUnderflow;
 
981
      
 
982
    default:
 
983
      return FsRef::fsErrUnknown;
 
984
    }
 
985
}
 
986
#endif
 
987
 
 
988
 
 
989
 
 
990
void 
 
991
Ndbfs::execCONTINUEB(Signal* signal)
 
992
{
 
993
  jamEntry();
 
994
  if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
 
995
    jam();
 
996
 
 
997
    // Also send CONTINUEB to ourself in order to scan for 
 
998
    // incoming answers from AsyncFile on MemoryChannel theFromThreads
 
999
    signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
 
1000
    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
 
1001
    if (scanningInProgress == true) {
 
1002
      jam();
 
1003
      return;
 
1004
    }
 
1005
  }
 
1006
  if (scanIPC(signal)) {
 
1007
    jam();
 
1008
    scanningInProgress = true;
 
1009
    signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;    
 
1010
    sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
 
1011
   } else {
 
1012
    jam();
 
1013
    scanningInProgress = false;
 
1014
   }
 
1015
   return;
 
1016
}
 
1017
 
 
1018
void
 
1019
Ndbfs::execDUMP_STATE_ORD(Signal* signal)
 
1020
{
 
1021
  if(signal->theData[0] == 19){
 
1022
    return;
 
1023
  }
 
1024
  if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
 
1025
    infoEvent("NDBFS: Files: %d Open files: %d",
 
1026
              theFiles.size(),
 
1027
              theOpenFiles.size());
 
1028
    infoEvent(" Idle files: %d Max opened files: %d",
 
1029
               theIdleFiles.size(),
 
1030
               m_maxOpenedFiles);
 
1031
    infoEvent(" Max files: %d",
 
1032
              m_maxFiles);
 
1033
    infoEvent(" Requests: %d",
 
1034
              theRequestPool->size());
 
1035
 
 
1036
    return;
 
1037
  }
 
1038
  if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
 
1039
    infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
 
1040
    
 
1041
    for (unsigned i = 0; i < theOpenFiles.size(); i++){
 
1042
      AsyncFile* file = theOpenFiles.getFile(i);
 
1043
      infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str());
 
1044
    }
 
1045
    return;
 
1046
  }
 
1047
  if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
 
1048
    infoEvent("NDBFS: Dump all files: %d", theFiles.size());
 
1049
    
 
1050
    for (unsigned i = 0; i < theFiles.size(); i++){
 
1051
      AsyncFile* file = theFiles[i];
 
1052
      infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
 
1053
    }
 
1054
    return;
 
1055
  }
 
1056
  if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
 
1057
    infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
 
1058
    
 
1059
    for (unsigned i = 0; i < theIdleFiles.size(); i++){
 
1060
      AsyncFile* file = theIdleFiles[i];
 
1061
      infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
 
1062
    }
 
1063
    return;
 
1064
  }
 
1065
 
 
1066
  if(signal->theData[0] == 404)
 
1067
  {
 
1068
    ndbrequire(signal->getLength() == 2);
 
1069
    Uint32 file= signal->theData[1];
 
1070
    AsyncFile* openFile = theOpenFiles.find(file);
 
1071
    ndbrequire(openFile != 0);
 
1072
    ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
 
1073
    Request* curr = openFile->m_current_request;
 
1074
    Request* last = openFile->m_last_request;
 
1075
    if(curr)
 
1076
      ndbout << "Current request: " << *curr << endl;
 
1077
    if(last)
 
1078
       ndbout << "Last request: " << *last << endl;
 
1079
 
 
1080
    ndbout << "theReportTo " << *openFile->theReportTo << endl;
 
1081
    ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
 
1082
 
 
1083
    ndbout << "All files: " << endl;
 
1084
    for (unsigned i = 0; i < theFiles.size(); i++){
 
1085
      AsyncFile* file = theFiles[i];
 
1086
      ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
 
1087
    }
 
1088
  }
 
1089
}//Ndbfs::execDUMP_STATE_ORD()
 
1090
 
 
1091
const char*
 
1092
Ndbfs::get_filename(Uint32 fd) const
 
1093
{
 
1094
  jamEntry();
 
1095
  const AsyncFile* openFile = theOpenFiles.find(fd);
 
1096
  if(openFile)
 
1097
    return openFile->theFileName.get_base_name();
 
1098
  return "";
 
1099
}
 
1100
 
 
1101
 
 
1102
BLOCK_FUNCTIONS(Ndbfs)
 
1103
 
 
1104
template class Vector<AsyncFile*>;
 
1105
template class Vector<OpenFiles::OpenFileItem>;
 
1106
template class MemoryChannel<Request>;
 
1107
template class Pool<Request>;
 
1108
template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);