~ubuntu-branches/ubuntu/quantal/kyotocabinet/quantal

« back to all changes in this revision

Viewing changes to kctextdb.h

  • Committer: Package Import Robot
  • Author(s): Shawn Landden
  • Date: 2012-06-07 16:12:07 UTC
  • Revision ID: package-import@ubuntu.com-20120607161207-prbj5blqgzzfl8of
Tags: upstream-1.2.76
ImportĀ upstreamĀ versionĀ 1.2.76

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*************************************************************************************************
 
2
 * Plain text database
 
3
 *                                                               Copyright (C) 2009-2012 FAL Labs
 
4
 * This file is part of Kyoto Cabinet.
 
5
 * This program is free software: you can redistribute it and/or modify it under the terms of
 
6
 * the GNU General Public License as published by the Free Software Foundation, either version
 
7
 * 3 of the License, or any later version.
 
8
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 
9
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 
10
 * See the GNU General Public License for more details.
 
11
 * You should have received a copy of the GNU General Public License along with this program.
 
12
 * If not, see <http://www.gnu.org/licenses/>.
 
13
 *************************************************************************************************/
 
14
 
 
15
 
 
16
#ifndef _KCTEXTDB_H                      // duplication check
 
17
#define _KCTEXTDB_H
 
18
 
 
19
#include <kccommon.h>
 
20
#include <kcutil.h>
 
21
#include <kcthread.h>
 
22
#include <kcfile.h>
 
23
#include <kccompress.h>
 
24
#include <kccompare.h>
 
25
#include <kcmap.h>
 
26
#include <kcregex.h>
 
27
#include <kcdb.h>
 
28
 
 
29
namespace kyotocabinet {                 // common namespace
 
30
 
 
31
 
 
32
/**
 
33
 * Plain text database.
 
34
 * @note Although this class is designed to use a text file as a database file, not all methods
 
35
 * are implemented.  Each line in the text is treated as a record.  When storing a record, the
 
36
 * key is ignored and the value only is appended at the end of the file.  Records can be
 
37
 * retrieved only by the iterator and the cursor mechanisms.  No record can be retrieved by
 
38
 * specifying the key.  When accessing a record by the iterator, the key is given as the offset
 
39
 * from the beginning of the file for descriptive purposes.  Any existing record cannot be
 
40
 * modified and deleted.
 
41
 */
 
42
class TextDB : public BasicDB {
 
43
 public:
 
44
  class Cursor;
 
45
 private:
 
46
  class ScopedVisitor;
 
47
  /** An alias of list of cursors. */
 
48
  typedef std::list<Cursor*> CursorList;
 
49
  /** An alias of a past record. */
 
50
  typedef std::pair<int64_t, std::string> Record;
 
51
  /** The size of the IO buffer. */
 
52
  static const size_t IOBUFSIZ = 1024;
 
53
 public:
 
54
  /**
 
55
   * Cursor to indicate a record.
 
56
   */
 
57
  class Cursor : public BasicDB::Cursor {
 
58
    friend class TextDB;
 
59
   public:
 
60
    /**
 
61
     * Constructor.
 
62
     * @param db the container database object.
 
63
     */
 
64
    explicit Cursor(TextDB* db) : db_(db), off_(INT64MAX), end_(0), queue_(), line_() {
 
65
      _assert_(db);
 
66
      ScopedRWLock lock(&db_->mlock_, true);
 
67
      db_->curs_.push_back(this);
 
68
    }
 
69
    /**
 
70
     * Destructor.
 
71
     */
 
72
    virtual ~Cursor() {
 
73
      _assert_(true);
 
74
      if (!db_) return;
 
75
      ScopedRWLock lock(&db_->mlock_, true);
 
76
      db_->curs_.remove(this);
 
77
    }
 
78
    /**
 
79
     * Accept a visitor to the current record.
 
80
     * @param visitor a visitor object.
 
81
     * @param writable true for writable operation, or false for read-only operation.
 
82
     * @param step true to move the cursor to the next record, or false for no move.
 
83
     * @return true on success, or false on failure.
 
84
     * @note The key is generated from the offset of each record.  To avoid deadlock, any
 
85
     * explicit database operation must not be performed in this function.
 
86
     */
 
87
    bool accept(Visitor* visitor, bool writable = true, bool step = false) {
 
88
      _assert_(visitor);
 
89
      ScopedRWLock lock(&db_->mlock_, false);
 
90
      if (db_->omode_ == 0) {
 
91
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
92
        return false;
 
93
      }
 
94
      if (writable && !db_->writer_) {
 
95
        db_->set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
 
96
        return false;
 
97
      }
 
98
      bool err = false;
 
99
      if (!accept_impl(visitor, step)) err = true;
 
100
      return !err;
 
101
    }
 
102
    /**
 
103
     * Jump the cursor to the first record for forward scan.
 
104
     * @return true on success, or false on failure.
 
105
     */
 
106
    bool jump() {
 
107
      _assert_(true);
 
108
      ScopedRWLock lock(&db_->mlock_, false);
 
109
      if (db_->omode_ == 0) {
 
110
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
111
        return false;
 
112
      }
 
113
      off_ = 0;
 
114
      end_ = db_->file_.size();
 
115
      queue_.clear();
 
116
      line_.clear();
 
117
      if (off_ >= end_) {
 
118
        db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
 
119
        return false;
 
120
      }
 
121
      return true;
 
122
    }
 
123
    /**
 
124
     * Jump the cursor to a record for forward scan.
 
125
     * @param kbuf the pointer to the key region.
 
126
     * @param ksiz the size of the key region.
 
127
     * @return true on success, or false on failure.
 
128
     */
 
129
    bool jump(const char* kbuf, size_t ksiz) {
 
130
      _assert_(kbuf && ksiz <= MEMMAXSIZ);
 
131
      ScopedRWLock lock(&db_->mlock_, true);
 
132
      if (db_->omode_ == 0) {
 
133
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
134
        return false;
 
135
      }
 
136
      off_ = atoin(kbuf, ksiz);
 
137
      end_ = db_->file_.size();
 
138
      queue_.clear();
 
139
      line_.clear();
 
140
      if (off_ >= end_) {
 
141
        db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
 
142
        return false;
 
143
      }
 
144
      return true;
 
145
    }
 
146
    /**
 
147
     * Jump the cursor to a record for forward scan.
 
148
     * @note Equal to the original Cursor::jump method except that the parameter is std::string.
 
149
     */
 
150
    bool jump(const std::string& key) {
 
151
      _assert_(true);
 
152
      return jump(key.c_str(), key.size());
 
153
    }
 
154
    /**
 
155
     * Jump the cursor to the last record for backward scan.
 
156
     * @note This is a dummy implementation for compatibility.
 
157
     */
 
158
    bool jump_back() {
 
159
      _assert_(true);
 
160
      ScopedRWLock lock(&db_->mlock_, true);
 
161
      if (db_->omode_ == 0) {
 
162
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
163
        return false;
 
164
      }
 
165
      db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
166
      return false;
 
167
    }
 
168
    /**
 
169
     * Jump the cursor to a record for backward scan.
 
170
     * @note This is a dummy implementation for compatibility.
 
171
     */
 
172
    bool jump_back(const char* kbuf, size_t ksiz) {
 
173
      _assert_(kbuf && ksiz <= MEMMAXSIZ);
 
174
      ScopedRWLock lock(&db_->mlock_, true);
 
175
      if (db_->omode_ == 0) {
 
176
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
177
        return false;
 
178
      }
 
179
      db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
180
      return false;
 
181
    }
 
182
    /**
 
183
     * Jump the cursor to a record for backward scan.
 
184
     * @note This is a dummy implementation for compatibility.
 
185
     */
 
186
    bool jump_back(const std::string& key) {
 
187
      _assert_(true);
 
188
      return jump_back(key.c_str(), key.size());
 
189
    }
 
190
    /**
 
191
     * Step the cursor to the next record.
 
192
     * @return true on success, or false on failure.
 
193
     */
 
194
    bool step() {
 
195
      _assert_(true);
 
196
      ScopedRWLock lock(&db_->mlock_, false);
 
197
      if (db_->omode_ == 0) {
 
198
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
199
        return false;
 
200
      }
 
201
      if (queue_.empty() && !read_next()) return false;
 
202
      if (queue_.empty()) {
 
203
        db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
 
204
        return false;
 
205
      }
 
206
      queue_.pop_front();
 
207
      return true;
 
208
    }
 
209
    /**
 
210
     * Step the cursor to the previous record.
 
211
     * @note This is a dummy implementation for compatibility.
 
212
     */
 
213
    bool step_back() {
 
214
      _assert_(true);
 
215
      ScopedRWLock lock(&db_->mlock_, true);
 
216
      if (db_->omode_ == 0) {
 
217
        db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
218
        return false;
 
219
      }
 
220
      db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
221
      return false;
 
222
    }
 
223
    /**
 
224
     * Get the database object.
 
225
     * @return the database object.
 
226
     */
 
227
    TextDB* db() {
 
228
      _assert_(true);
 
229
      return db_;
 
230
    }
 
231
   private:
 
232
    /**
 
233
     * Accept a visitor to the current record.
 
234
     * @param visitor a visitor object.
 
235
     * @param step true to move the cursor to the next record, or false for no move.
 
236
     * @return true on success, or false on failure.
 
237
     */
 
238
    bool accept_impl(Visitor* visitor, bool step) {
 
239
      _assert_(visitor);
 
240
      if (queue_.empty() && !read_next()) return false;
 
241
      if (queue_.empty()) {
 
242
        db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
 
243
        return false;
 
244
      }
 
245
      bool err = false;
 
246
      const Record& rec = queue_.front();
 
247
      char kbuf[NUMBUFSIZ];
 
248
      size_t ksiz = db_->write_key(kbuf, rec.first);
 
249
      size_t vsiz;
 
250
      const char* vbuf = visitor->visit_full(kbuf, ksiz,
 
251
                                             rec.second.data(), rec.second.size(), &vsiz);
 
252
      if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
 
253
        char stack[IOBUFSIZ];
 
254
        size_t rsiz = vsiz + 1;
 
255
        char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
 
256
        std::memcpy(rbuf, vbuf, vsiz);
 
257
        rbuf[vsiz] = '\n';
 
258
        if (!db_->file_.append(rbuf, rsiz)) {
 
259
          db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
 
260
          err = true;
 
261
        }
 
262
        if (rbuf != stack) delete[] rbuf;
 
263
        if (db_->autosync_ && !db_->file_.synchronize(true)) {
 
264
          db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
 
265
          err = true;
 
266
        }
 
267
      }
 
268
      if (step) queue_.pop_front();
 
269
      return !err;
 
270
    }
 
271
    /**
 
272
     * Read the next record.
 
273
     * @return true on success, or false on failure.
 
274
     */
 
275
    bool read_next() {
 
276
      _assert_(true);
 
277
      while (off_ < end_) {
 
278
        char stack[IOBUFSIZ];
 
279
        int64_t rsiz = end_ - off_;
 
280
        if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
 
281
        if (!db_->file_.read_fast(off_, stack, rsiz)) {
 
282
          db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
 
283
          return false;
 
284
        }
 
285
        const char* rp = stack;
 
286
        const char* pv = rp;
 
287
        const char* ep = rp + rsiz;
 
288
        while (rp < ep) {
 
289
          if (*rp == '\n') {
 
290
            line_.append(pv, rp - pv);
 
291
            Record rec;
 
292
            rec.first = off_ + pv - stack;
 
293
            rec.second = line_;
 
294
            queue_.push_back(rec);
 
295
            line_.clear();
 
296
            rp++;
 
297
            pv = rp;
 
298
          } else {
 
299
            rp++;
 
300
          }
 
301
        }
 
302
        line_.append(pv, rp - pv);
 
303
        off_ += rsiz;
 
304
        if (!queue_.empty()) break;
 
305
      }
 
306
      return true;
 
307
    }
 
308
    /** Dummy constructor to forbid the use. */
 
309
    Cursor(const Cursor&);
 
310
    /** Dummy Operator to forbid the use. */
 
311
    Cursor& operator =(const Cursor&);
 
312
    /** The inner database. */
 
313
    TextDB* db_;
 
314
    /** The current offset. */
 
315
    int64_t off_;
 
316
    /** The end offset. */
 
317
    int64_t end_;
 
318
    /** The queue of read lines. */
 
319
    std::deque<Record> queue_;
 
320
    /** The current line. */
 
321
    std::string line_;
 
322
  };
 
323
  /**
 
324
   * Default constructor.
 
325
   */
 
326
  explicit TextDB() :
 
327
      error_(), logger_(NULL), logkinds_(0), mtrigger_(NULL),
 
328
      omode_(0), writer_(false), autotran_(false), autosync_(false),
 
329
      file_(), curs_(), path_("") {
 
330
    _assert_(true);
 
331
  }
 
332
  /**
 
333
   * Destructor.
 
334
   * @note If the database is not closed, it is closed implicitly.
 
335
   */
 
336
  virtual ~TextDB() {
 
337
    _assert_(true);
 
338
    if (omode_ != 0) close();
 
339
    if (!curs_.empty()) {
 
340
      CursorList::const_iterator cit = curs_.begin();
 
341
      CursorList::const_iterator citend = curs_.end();
 
342
      while (cit != citend) {
 
343
        Cursor* cur = *cit;
 
344
        cur->db_ = NULL;
 
345
        ++cit;
 
346
      }
 
347
    }
 
348
  }
 
349
  /**
 
350
   * Accept a visitor to a record.
 
351
   * @param kbuf the pointer to the key region.
 
352
   * @param ksiz the size of the key region.
 
353
   * @param visitor a visitor object.
 
354
   * @param writable true for writable operation, or false for read-only operation.
 
355
   * @return true on success, or false on failure.
 
356
   * @note No record can be retrieved by specifying the key and the Visitor::visit_empty method
 
357
   * is always called.  To avoid deadlock, any explicit database operation must not be performed
 
358
   * in this function.
 
359
   */
 
360
  bool accept(const char* kbuf, size_t ksiz, Visitor* visitor, bool writable = true) {
 
361
    _assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
 
362
    ScopedRWLock lock(&mlock_, false);
 
363
    if (omode_ == 0) {
 
364
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
365
      return false;
 
366
    }
 
367
    if (writable && !writer_) {
 
368
      set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
 
369
      return false;
 
370
    }
 
371
    bool err = false;
 
372
    if (!accept_impl(kbuf, ksiz, visitor)) err = true;
 
373
    return !err;
 
374
  }
 
375
  /**
 
376
   * Accept a visitor to multiple records at once.
 
377
   * @param keys specifies a string vector of the keys.
 
378
   * @param visitor a visitor object.
 
379
   * @param writable true for writable operation, or false for read-only operation.
 
380
   * @return true on success, or false on failure.
 
381
   * @note No record can be retrieved by specifying the key and the Visitor::visit_empty method
 
382
   * is always called.  To avoid deadlock, any explicit database operation must not be performed
 
383
   * in this function.
 
384
   */
 
385
  bool accept_bulk(const std::vector<std::string>& keys, Visitor* visitor,
 
386
                   bool writable = true) {
 
387
    _assert_(visitor);
 
388
    ScopedRWLock lock(&mlock_, true);
 
389
    if (omode_ == 0) {
 
390
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
391
      return false;
 
392
    }
 
393
    if (writable && !writer_) {
 
394
      set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
 
395
      return false;
 
396
    }
 
397
    ScopedVisitor svis(visitor);
 
398
    bool err = false;
 
399
    std::vector<std::string>::const_iterator kit = keys.begin();
 
400
    std::vector<std::string>::const_iterator kitend = keys.end();
 
401
    while (kit != kitend) {
 
402
      if (!accept_impl(kit->data(), kit->size(), visitor)) err = true;
 
403
      ++kit;
 
404
    }
 
405
    return !err;
 
406
  }
 
407
  /**
 
408
   * Iterate to accept a visitor for each record.
 
409
   * @param visitor a visitor object.
 
410
   * @param writable true for writable operation, or false for read-only operation.
 
411
   * @param checker a progress checker object.  If it is NULL, no checking is performed.
 
412
   * @return true on success, or false on failure.
 
413
   * @note The key is generated from the offset of each record.  To avoid deadlock, any explicit
 
414
   * database operation must not be performed in this function.
 
415
   */
 
416
  bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* checker = NULL) {
 
417
    _assert_(visitor);
 
418
    ScopedRWLock lock(&mlock_, true);
 
419
    if (omode_ == 0) {
 
420
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
421
      return false;
 
422
    }
 
423
    if (writable && !writer_) {
 
424
      set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
 
425
      return false;
 
426
    }
 
427
    ScopedVisitor svis(visitor);
 
428
    bool err = false;
 
429
    if (!iterate_impl(visitor, checker)) err = true;
 
430
    trigger_meta(MetaTrigger::ITERATE, "iterate");
 
431
    return !err;
 
432
  }
 
433
  /**
 
434
   * Scan each record in parallel.
 
435
   * @param visitor a visitor object.
 
436
   * @param thnum the number of worker threads.
 
437
   * @param checker a progress checker object.  If it is NULL, no checking is performed.
 
438
   * @return true on success, or false on failure.
 
439
   * @note This function is for reading records and not for updating ones.  The return value of
 
440
   * the visitor is just ignored.  The key is generated from the offset of each record.  To
 
441
   * avoid deadlock, any explicit database operation must not be performed in this function.
 
442
   */
 
443
  bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) {
 
444
    _assert_(visitor && thnum <= MEMMAXSIZ);
 
445
    ScopedRWLock lock(&mlock_, false);
 
446
    if (omode_ == 0) {
 
447
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
448
      return false;
 
449
    }
 
450
    if (thnum < 1) thnum = 1;
 
451
    if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
 
452
    ScopedVisitor svis(visitor);
 
453
    bool err = false;
 
454
    if (!scan_parallel_impl(visitor, thnum, checker)) err = true;
 
455
    trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
 
456
    return !err;
 
457
  }
 
458
  /**
 
459
   * Get the last happened error.
 
460
   * @return the last happened error.
 
461
   */
 
462
  Error error() const {
 
463
    _assert_(true);
 
464
    return error_;
 
465
  }
 
466
  /**
 
467
   * Set the error information.
 
468
   * @param file the file name of the program source code.
 
469
   * @param line the line number of the program source code.
 
470
   * @param func the function name of the program source code.
 
471
   * @param code an error code.
 
472
   * @param message a supplement message.
 
473
   */
 
474
  void set_error(const char* file, int32_t line, const char* func,
 
475
                 Error::Code code, const char* message) {
 
476
    _assert_(file && line > 0 && func && message);
 
477
    error_->set(code, message);
 
478
    if (logger_) {
 
479
      Logger::Kind kind = code == Error::BROKEN || code == Error::SYSTEM ?
 
480
          Logger::ERROR : Logger::INFO;
 
481
      if (kind & logkinds_)
 
482
        report(file, line, func, kind, "%d: %s: %s", code, Error::codename(code), message);
 
483
    }
 
484
  }
 
485
  /**
 
486
   * Open a database file.
 
487
   * @param path the path of a database file.
 
488
   * @param mode the connection mode.  TextDB::OWRITER as a writer, TextDB::OREADER as a
 
489
   * reader.  The following may be added to the writer mode by bitwise-or: TextDB::OCREATE,
 
490
   * which means it creates a new database if the file does not exist, TextDB::OTRUNCATE, which
 
491
   * means it creates a new database regardless if the file exists, TextDB::OAUTOTRAN, which
 
492
   * means each updating operation is performed in implicit transaction, TextDB::OAUTOSYNC,
 
493
   * which means each updating operation is followed by implicit synchronization with the file
 
494
   * system.  The following may be added to both of the reader mode and the writer mode by
 
495
   * bitwise-or: TextDB::ONOLOCK, which means it opens the database file without file locking,
 
496
   * TextDB::OTRYLOCK, which means locking is performed without blocking, TextDB::ONOREPAIR,
 
497
   * which means the database file is not repaired implicitly even if file destruction is
 
498
   * detected.
 
499
   * @return true on success, or false on failure.
 
500
   * @note Every opened database must be closed by the TextDB::close method when it is no
 
501
   * longer in use.  It is not allowed for two or more database objects in the same process to
 
502
   * keep their connections to the same database file at the same time.
 
503
   */
 
504
  bool open(const std::string& path, uint32_t mode = OWRITER | OCREATE) {
 
505
    _assert_(true);
 
506
    ScopedRWLock lock(&mlock_, true);
 
507
    if (omode_ != 0) {
 
508
      set_error(_KCCODELINE_, Error::INVALID, "already opened");
 
509
      return false;
 
510
    }
 
511
    report(_KCCODELINE_, Logger::DEBUG, "opening the database (path=%s)", path.c_str());
 
512
    writer_ = false;
 
513
    autotran_ = false;
 
514
    autosync_ = false;
 
515
    uint32_t fmode = File::OREADER;
 
516
    if (mode & OWRITER) {
 
517
      writer_ = true;
 
518
      fmode = File::OWRITER;
 
519
      if (mode & OCREATE) fmode |= File::OCREATE;
 
520
      if (mode & OTRUNCATE) fmode |= File::OTRUNCATE;
 
521
      if (mode & OAUTOTRAN) autotran_ = true;
 
522
      if (mode & OAUTOSYNC) autosync_ = true;
 
523
    }
 
524
    if (mode & ONOLOCK) fmode |= File::ONOLOCK;
 
525
    if (mode & OTRYLOCK) fmode |= File::OTRYLOCK;
 
526
    if (!file_.open(path, fmode, 0)) {
 
527
      const char* emsg = file_.error();
 
528
      Error::Code code = Error::SYSTEM;
 
529
      if (std::strstr(emsg, "(permission denied)") || std::strstr(emsg, "(directory)")) {
 
530
        code = Error::NOPERM;
 
531
      } else if (std::strstr(emsg, "(file not found)") || std::strstr(emsg, "(invalid path)")) {
 
532
        code = Error::NOREPOS;
 
533
      }
 
534
      set_error(_KCCODELINE_, code, emsg);
 
535
      return false;
 
536
    }
 
537
    if (autosync_ && !File::synchronize_whole()) {
 
538
      set_error(_KCCODELINE_, Error::SYSTEM, "synchronizing the file system failed");
 
539
      file_.close();
 
540
      return false;
 
541
    }
 
542
    path_.append(path);
 
543
    omode_ = mode;
 
544
    trigger_meta(MetaTrigger::OPEN, "open");
 
545
    return true;
 
546
  }
 
547
  /**
 
548
   * Close the database file.
 
549
   * @return true on success, or false on failure.
 
550
   */
 
551
  bool close() {
 
552
    _assert_(true);
 
553
    ScopedRWLock lock(&mlock_, true);
 
554
    if (omode_ == 0) {
 
555
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
556
      return false;
 
557
    }
 
558
    report(_KCCODELINE_, Logger::DEBUG, "closing the database (path=%s)", path_.c_str());
 
559
    bool err = false;
 
560
    disable_cursors();
 
561
    if (!file_.close()) {
 
562
      set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
563
      err = true;
 
564
    }
 
565
    omode_ = 0;
 
566
    path_.clear();
 
567
    trigger_meta(MetaTrigger::CLOSE, "close");
 
568
    return !err;
 
569
  }
 
570
  /**
 
571
   * Synchronize updated contents with the file and the device.
 
572
   * @param hard true for physical synchronization with the device, or false for logical
 
573
   * synchronization with the file system.
 
574
   * @param proc a postprocessor object.  If it is NULL, no postprocessing is performed.
 
575
   * @param checker a progress checker object.  If it is NULL, no checking is performed.
 
576
   * @return true on success, or false on failure.
 
577
   * @note The operation of the postprocessor is performed atomically and other threads accessing
 
578
   * the same record are blocked.  To avoid deadlock, any explicit database operation must not
 
579
   * be performed in this function.
 
580
   */
 
581
  bool synchronize(bool hard = false, FileProcessor* proc = NULL,
 
582
                   ProgressChecker* checker = NULL) {
 
583
    _assert_(true);
 
584
    ScopedRWLock lock(&mlock_, false);
 
585
    if (omode_ == 0) {
 
586
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
587
      return false;
 
588
    }
 
589
    bool err = false;
 
590
    if (!synchronize_impl(hard, proc, checker)) err = true;
 
591
    trigger_meta(MetaTrigger::SYNCHRONIZE, "synchronize");
 
592
    return !err;
 
593
  }
 
594
  /**
 
595
   * Occupy database by locking and do something meanwhile.
 
596
   * @param writable true to use writer lock, or false to use reader lock.
 
597
   * @param proc a processor object.  If it is NULL, no processing is performed.
 
598
   * @return true on success, or false on failure.
 
599
   * @note The operation of the processor is performed atomically and other threads accessing
 
600
   * the same record are blocked.  To avoid deadlock, any explicit database operation must not
 
601
   * be performed in this function.
 
602
   */
 
603
  bool occupy(bool writable = true, FileProcessor* proc = NULL) {
 
604
    _assert_(true);
 
605
    ScopedRWLock lock(&mlock_, writable);
 
606
    bool err = false;
 
607
    if (proc && !proc->process(path_, -1, file_.size())) {
 
608
      set_error(_KCCODELINE_, Error::LOGIC, "processing failed");
 
609
      err = true;
 
610
    }
 
611
    trigger_meta(MetaTrigger::OCCUPY, "occupy");
 
612
    return !err;
 
613
  }
 
614
  /**
 
615
   * Begin transaction.
 
616
   * @param hard true for physical synchronization with the device, or false for logical
 
617
   * synchronization with the file system.
 
618
   * @return true on success, or false on failure.
 
619
   */
 
620
  bool begin_transaction(bool hard = false) {
 
621
    _assert_(true);
 
622
    ScopedRWLock lock(&mlock_, true);
 
623
    if (omode_ == 0) {
 
624
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
625
      return false;
 
626
    }
 
627
    set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
628
    return false;
 
629
  }
 
630
  /**
 
631
   * Try to begin transaction.
 
632
   * @param hard true for physical synchronization with the device, or false for logical
 
633
   * synchronization with the file system.
 
634
   * @return true on success, or false on failure.
 
635
   */
 
636
  bool begin_transaction_try(bool hard = false) {
 
637
    _assert_(true);
 
638
    ScopedRWLock lock(&mlock_, true);
 
639
    if (omode_ == 0) {
 
640
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
641
      return false;
 
642
    }
 
643
    set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
644
    return false;
 
645
  }
 
646
  /**
 
647
   * End transaction.
 
648
   * @param commit true to commit the transaction, or false to abort the transaction.
 
649
   * @return true on success, or false on failure.
 
650
   */
 
651
  bool end_transaction(bool commit = true) {
 
652
    _assert_(true);
 
653
    ScopedRWLock lock(&mlock_, true);
 
654
    if (omode_ == 0) {
 
655
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
656
      return false;
 
657
    }
 
658
    set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
659
    return false;
 
660
  }
 
661
  /**
 
662
   * Remove all records.
 
663
   * @return true on success, or false on failure.
 
664
   */
 
665
  bool clear() {
 
666
    _assert_(true);
 
667
    ScopedRWLock lock(&mlock_, true);
 
668
    if (omode_ == 0) {
 
669
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
670
      return false;
 
671
    }
 
672
    if (!writer_) {
 
673
      set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
 
674
      return false;
 
675
    }
 
676
    disable_cursors();
 
677
    if (!file_.truncate(0)) {
 
678
      set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
679
      return false;
 
680
    }
 
681
    if (autosync_ && !file_.synchronize(true)) {
 
682
      set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
683
      return false;
 
684
    }
 
685
    trigger_meta(MetaTrigger::CLEAR, "clear");
 
686
    return true;
 
687
  }
 
688
  /**
 
689
   * Get the number of records.
 
690
   * @return the number of records, or -1 on failure.
 
691
   */
 
692
  int64_t count() {
 
693
    _assert_(true);
 
694
    ScopedRWLock lock(&mlock_, false);
 
695
    if (omode_ == 0) {
 
696
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
697
      return -1;
 
698
    }
 
699
    set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
 
700
    return -1;
 
701
  }
 
702
  /**
 
703
   * Get the size of the database file.
 
704
   * @return the size of the database file in bytes, or -1 on failure.
 
705
   */
 
706
  int64_t size() {
 
707
    _assert_(true);
 
708
    ScopedRWLock lock(&mlock_, false);
 
709
    if (omode_ == 0) {
 
710
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
711
      return -1;
 
712
    }
 
713
    return file_.size();
 
714
  }
 
715
  /**
 
716
   * Get the path of the database file.
 
717
   * @return the path of the database file, or an empty string on failure.
 
718
   */
 
719
  std::string path() {
 
720
    _assert_(true);
 
721
    return path_;
 
722
  }
 
723
  /**
 
724
   * Get the miscellaneous status information.
 
725
   * @param strmap a string map to contain the result.
 
726
   * @return true on success, or false on failure.
 
727
   */
 
728
  bool status(std::map<std::string, std::string>* strmap) {
 
729
    _assert_(strmap);
 
730
    ScopedRWLock lock(&mlock_, true);
 
731
    if (omode_ == 0) {
 
732
      set_error(_KCCODELINE_, Error::INVALID, "not opened");
 
733
      return false;
 
734
    }
 
735
    (*strmap)["type"] = strprintf("%u", (unsigned)TYPETEXT);
 
736
    (*strmap)["realtype"] = strprintf("%u", (unsigned)TYPETEXT);
 
737
    (*strmap)["path"] = path_;
 
738
    (*strmap)["size"] = strprintf("%lld", (long long)file_.size());
 
739
    return true;
 
740
  }
 
741
  /**
 
742
   * Create a cursor object.
 
743
   * @return the return value is the created cursor object.
 
744
   * @note Because the object of the return value is allocated by the constructor, it should be
 
745
   * released with the delete operator when it is no longer in use.
 
746
   */
 
747
  Cursor* cursor() {
 
748
    _assert_(true);
 
749
    return new Cursor(this);
 
750
  }
 
751
  /**
 
752
   * Write a log message.
 
753
   * @param file the file name of the program source code.
 
754
   * @param line the line number of the program source code.
 
755
   * @param func the function name of the program source code.
 
756
   * @param kind the kind of the event.  Logger::DEBUG for debugging, Logger::INFO for normal
 
757
   * information, Logger::WARN for warning, and Logger::ERROR for fatal error.
 
758
   * @param message the supplement message.
 
759
   */
 
760
  void log(const char* file, int32_t line, const char* func, Logger::Kind kind,
 
761
           const char* message) {
 
762
    _assert_(file && line > 0 && func && message);
 
763
    ScopedRWLock lock(&mlock_, false);
 
764
    if (!logger_) return;
 
765
    logger_->log(file, line, func, kind, message);
 
766
  }
 
767
  /**
 
768
   * Set the internal logger.
 
769
   * @param logger the logger object.
 
770
   * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
 
771
   * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
 
772
   * error.
 
773
   * @return true on success, or false on failure.
 
774
   */
 
775
  bool tune_logger(Logger* logger, uint32_t kinds = Logger::WARN | Logger::ERROR) {
 
776
    _assert_(logger);
 
777
    ScopedRWLock lock(&mlock_, true);
 
778
    if (omode_ != 0) {
 
779
      set_error(_KCCODELINE_, Error::INVALID, "already opened");
 
780
      return false;
 
781
    }
 
782
    logger_ = logger;
 
783
    logkinds_ = kinds;
 
784
    return true;
 
785
  }
 
786
  /**
 
787
   * Set the internal meta operation trigger.
 
788
   * @param trigger the trigger object.
 
789
   * @return true on success, or false on failure.
 
790
   */
 
791
  bool tune_meta_trigger(MetaTrigger* trigger) {
 
792
    _assert_(trigger);
 
793
    ScopedRWLock lock(&mlock_, true);
 
794
    if (omode_ != 0) {
 
795
      set_error(_KCCODELINE_, Error::INVALID, "already opened");
 
796
      return false;
 
797
    }
 
798
    mtrigger_ = trigger;
 
799
    return true;
 
800
  }
 
801
 protected:
 
802
  /**
 
803
   * Report a message for debugging.
 
804
   * @param file the file name of the program source code.
 
805
   * @param line the line number of the program source code.
 
806
   * @param func the function name of the program source code.
 
807
   * @param kind the kind of the event.  Logger::DEBUG for debugging, Logger::INFO for normal
 
808
   * information, Logger::WARN for warning, and Logger::ERROR for fatal error.
 
809
   * @param format the printf-like format string.
 
810
   * @param ... used according to the format string.
 
811
   */
 
812
  void report(const char* file, int32_t line, const char* func, Logger::Kind kind,
 
813
              const char* format, ...) {
 
814
    _assert_(file && line > 0 && func && format);
 
815
    if (!logger_ || !(kind & logkinds_)) return;
 
816
    std::string message;
 
817
    strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
 
818
    va_list ap;
 
819
    va_start(ap, format);
 
820
    vstrprintf(&message, format, ap);
 
821
    va_end(ap);
 
822
    logger_->log(file, line, func, kind, message.c_str());
 
823
  }
 
824
  /**
 
825
   * Report a message for debugging with variable number of arguments.
 
826
   * @param file the file name of the program source code.
 
827
   * @param line the line number of the program source code.
 
828
   * @param func the function name of the program source code.
 
829
   * @param kind the kind of the event.  Logger::DEBUG for debugging, Logger::INFO for normal
 
830
   * information, Logger::WARN for warning, and Logger::ERROR for fatal error.
 
831
   * @param format the printf-like format string.
 
832
   * @param ap used according to the format string.
 
833
   */
 
834
  void report_valist(const char* file, int32_t line, const char* func, Logger::Kind kind,
 
835
                     const char* format, va_list ap) {
 
836
    _assert_(file && line > 0 && func && format);
 
837
    if (!logger_ || !(kind & logkinds_)) return;
 
838
    std::string message;
 
839
    strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
 
840
    vstrprintf(&message, format, ap);
 
841
    logger_->log(file, line, func, kind, message.c_str());
 
842
  }
 
843
  /**
 
844
   * Report the content of a binary buffer for debugging.
 
845
   * @param file the file name of the epicenter.
 
846
   * @param line the line number of the epicenter.
 
847
   * @param func the function name of the program source code.
 
848
   * @param kind the kind of the event.  Logger::DEBUG for debugging, Logger::INFO for normal
 
849
   * information, Logger::WARN for warning, and Logger::ERROR for fatal error.
 
850
   * @param name the name of the information.
 
851
   * @param buf the binary buffer.
 
852
   * @param size the size of the binary buffer
 
853
   */
 
854
  void report_binary(const char* file, int32_t line, const char* func, Logger::Kind kind,
 
855
                     const char* name, const char* buf, size_t size) {
 
856
    _assert_(file && line > 0 && func && name && buf && size <= MEMMAXSIZ);
 
857
    if (!logger_) return;
 
858
    char* hex = hexencode(buf, size);
 
859
    report(file, line, func, kind, "%s=%s", name, hex);
 
860
    delete[] hex;
 
861
  }
 
862
  /**
 
863
   * Trigger a meta database operation.
 
864
   * @param kind the kind of the event.  MetaTrigger::OPEN for opening, MetaTrigger::CLOSE for
 
865
   * closing, MetaTrigger::CLEAR for clearing, MetaTrigger::ITERATE for iteration,
 
866
   * MetaTrigger::SYNCHRONIZE for synchronization, MetaTrigger::BEGINTRAN for beginning
 
867
   * transaction, MetaTrigger::COMMITTRAN for committing transaction, MetaTrigger::ABORTTRAN
 
868
   * for aborting transaction, and MetaTrigger::MISC for miscellaneous operations.
 
869
   * @param message the supplement message.
 
870
   */
 
871
  void trigger_meta(MetaTrigger::Kind kind, const char* message) {
 
872
    _assert_(message);
 
873
    if (mtrigger_) mtrigger_->trigger(kind, message);
 
874
  }
 
875
 private:
 
876
  /**
 
877
   * Scoped visitor.
 
878
   */
 
879
  class ScopedVisitor {
 
880
   public:
 
881
    /** constructor */
 
882
    explicit ScopedVisitor(Visitor* visitor) : visitor_(visitor) {
 
883
      _assert_(visitor);
 
884
      visitor_->visit_before();
 
885
    }
 
886
    /** destructor */
 
887
    ~ScopedVisitor() {
 
888
      _assert_(true);
 
889
      visitor_->visit_after();
 
890
    }
 
891
   private:
 
892
    Visitor* visitor_;                   ///< visitor
 
893
  };
 
894
  /**
 
895
   * Accept a visitor to a record.
 
896
   * @param kbuf the pointer to the key region.
 
897
   * @param ksiz the size of the key region.
 
898
   * @param visitor a visitor object.
 
899
   * @return true on success, or false on failure.
 
900
   */
 
901
  bool accept_impl(const char* kbuf, size_t ksiz, Visitor* visitor) {
 
902
    _assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
 
903
    bool err = false;
 
904
    size_t vsiz;
 
905
    const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz);
 
906
    if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
 
907
      size_t rsiz = vsiz + 1;
 
908
      char stack[IOBUFSIZ];
 
909
      char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
 
910
      std::memcpy(rbuf, vbuf, vsiz);
 
911
      rbuf[vsiz] = '\n';
 
912
      if (!file_.append(rbuf, rsiz)) {
 
913
        set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
914
        err = true;
 
915
      }
 
916
      if (rbuf != stack) delete[] rbuf;
 
917
      if (autosync_ && !file_.synchronize(true)) {
 
918
        set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
919
        err = true;
 
920
      }
 
921
    }
 
922
    return !err;
 
923
  }
 
924
  /**
 
925
   * Iterate to accept a visitor for each record.
 
926
   * @param visitor a visitor object.
 
927
   * @param checker a progress checker object.
 
928
   * @return true on success, or false on failure.
 
929
   */
 
930
  bool iterate_impl(Visitor* visitor, ProgressChecker* checker) {
 
931
    _assert_(visitor);
 
932
    if (checker && !checker->check("iterate", "beginning", 0, -1)) {
 
933
      set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
934
      return false;
 
935
    }
 
936
    int64_t off = 0;
 
937
    int64_t end = file_.size();
 
938
    int64_t curcnt = 0;
 
939
    std::string line;
 
940
    char stack[IOBUFSIZ*4];
 
941
    while (off < end) {
 
942
      int64_t rsiz = end - off;
 
943
      if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
 
944
      if (!file_.read_fast(off, stack, rsiz)) {
 
945
        set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
946
        return false;
 
947
      }
 
948
      const char* rp = stack;
 
949
      const char* pv = rp;
 
950
      const char* ep = rp + rsiz;
 
951
      while (rp < ep) {
 
952
        if (*rp == '\n') {
 
953
          char kbuf[NUMBUFSIZ];
 
954
          size_t ksiz = write_key(kbuf, off + pv - stack);
 
955
          const char* vbuf;
 
956
          size_t vsiz;
 
957
          if (line.empty()) {
 
958
            vbuf = visitor->visit_full(kbuf, ksiz, pv, rp - pv, &vsiz);
 
959
          } else {
 
960
            line.append(pv, rp - pv);
 
961
            vbuf = visitor->visit_full(kbuf, ksiz, line.data(), line.size(), &vsiz);
 
962
            line.clear();
 
963
          }
 
964
          if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
 
965
            char tstack[IOBUFSIZ];
 
966
            size_t trsiz = vsiz + 1;
 
967
            char* trbuf = trsiz > sizeof(tstack) ? new char[trsiz] : tstack;
 
968
            std::memcpy(trbuf, vbuf, vsiz);
 
969
            trbuf[vsiz] = '\n';
 
970
            if (!file_.append(trbuf, trsiz)) {
 
971
              set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
972
              if (trbuf != stack) delete[] trbuf;
 
973
              return false;
 
974
            }
 
975
            if (trbuf != tstack) delete[] trbuf;
 
976
          }
 
977
          curcnt++;
 
978
          if (checker && !checker->check("iterate", "processing", curcnt, -1)) {
 
979
            set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
980
            return false;
 
981
          }
 
982
          rp++;
 
983
          pv = rp;
 
984
        } else {
 
985
          rp++;
 
986
        }
 
987
      }
 
988
      line.append(pv, rp - pv);
 
989
      off += rsiz;
 
990
    }
 
991
    if (checker && !checker->check("iterate", "ending", -1, -1)) {
 
992
      set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
993
      return false;
 
994
    }
 
995
    return true;
 
996
  }
 
997
  /**
 
998
   * Scan each record in parallel.
 
999
   * @param visitor a visitor object.
 
1000
   * @param thnum the number of worker threads.
 
1001
   * @param checker a progress checker object.
 
1002
   * @return true on success, or false on failure.
 
1003
   */
 
1004
  bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) {
 
1005
    _assert_(visitor && thnum <= MEMMAXSIZ);
 
1006
    if (checker && !checker->check("scan_parallel", "beginning", -1, -1)) {
 
1007
      set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
1008
      return false;
 
1009
    }
 
1010
    int64_t off = 0;
 
1011
    int64_t end = file_.size();
 
1012
    int64_t gap = (end - off) / thnum;
 
1013
    std::vector<int64_t> offs;
 
1014
    while (off < end) {
 
1015
      offs.push_back(off);
 
1016
      int64_t edge = off + gap;
 
1017
      while (true) {
 
1018
        if (edge >= end) {
 
1019
          off = end;
 
1020
          break;
 
1021
        }
 
1022
        char rbuf[IOBUFSIZ];
 
1023
        int64_t rsiz = end - edge;
 
1024
        if (rsiz > (int64_t)sizeof(rbuf)) rsiz = sizeof(rbuf);
 
1025
        if (!file_.read_fast(edge, rbuf, rsiz)) {
 
1026
          set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
1027
          return false;
 
1028
        }
 
1029
        int64_t noff = -1;
 
1030
        const char* rp = rbuf;
 
1031
        const char* ep = rp + rsiz;
 
1032
        while (rp < ep) {
 
1033
          if (*rp == '\n') {
 
1034
            noff = edge + (rp - rbuf);
 
1035
            break;
 
1036
          }
 
1037
          rp++;
 
1038
        }
 
1039
        if (noff >= 0) {
 
1040
          off = noff + 1;
 
1041
          break;
 
1042
        }
 
1043
        edge += rsiz;
 
1044
      }
 
1045
    }
 
1046
    bool err = false;
 
1047
    size_t onum = offs.size();
 
1048
    if (onum > 0) {
 
1049
      class ThreadImpl : public Thread {
 
1050
       public:
 
1051
        explicit ThreadImpl() :
 
1052
            db_(NULL), visitor_(NULL), checker_(NULL), begoff_(0), endoff_(0), error_() {}
 
1053
        void init(TextDB* db, Visitor* visitor, ProgressChecker* checker,
 
1054
                  int64_t begoff, int64_t endoff) {
 
1055
          db_ = db;
 
1056
          visitor_ = visitor;
 
1057
          checker_ = checker;
 
1058
          begoff_ = begoff;
 
1059
          endoff_ = endoff;
 
1060
        }
 
1061
        const Error& error() {
 
1062
          return error_;
 
1063
        }
 
1064
       private:
 
1065
        void run() {
 
1066
          TextDB* db = db_;
 
1067
          File* file = &db->file_;
 
1068
          Visitor* visitor = visitor_;
 
1069
          ProgressChecker* checker = checker_;
 
1070
          int64_t off = begoff_;
 
1071
          int64_t end = endoff_;
 
1072
          std::string line;
 
1073
          char stack[IOBUFSIZ*4];
 
1074
          while (off < end) {
 
1075
            int64_t rsiz = end - off;
 
1076
            if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
 
1077
            if (!file->read_fast(off, stack, rsiz)) {
 
1078
              db->set_error(_KCCODELINE_, Error::SYSTEM, file->error());
 
1079
              return;
 
1080
            }
 
1081
            const char* rp = stack;
 
1082
            const char* pv = rp;
 
1083
            const char* ep = rp + rsiz;
 
1084
            while (rp < ep) {
 
1085
              if (*rp == '\n') {
 
1086
                char kbuf[NUMBUFSIZ];
 
1087
                size_t ksiz = db->write_key(kbuf, off + pv - stack);
 
1088
                if (line.empty()) {
 
1089
                  size_t vsiz;
 
1090
                  visitor->visit_full(kbuf, ksiz, pv, rp - pv, &vsiz);
 
1091
                } else {
 
1092
                  line.append(pv, rp - pv);
 
1093
                  size_t vsiz;
 
1094
                  visitor->visit_full(kbuf, ksiz, line.data(), line.size(), &vsiz);
 
1095
                  line.clear();
 
1096
                }
 
1097
                if (checker && !checker->check("iterate", "processing", -1, -1)) {
 
1098
                  db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
1099
                  return;
 
1100
                }
 
1101
                rp++;
 
1102
                pv = rp;
 
1103
              } else {
 
1104
                rp++;
 
1105
              }
 
1106
            }
 
1107
            line.append(pv, rp - pv);
 
1108
            off += rsiz;
 
1109
          }
 
1110
        }
 
1111
        TextDB* db_;
 
1112
        Visitor* visitor_;
 
1113
        ProgressChecker* checker_;
 
1114
        int64_t begoff_;
 
1115
        int64_t endoff_;
 
1116
        Error error_;
 
1117
      };
 
1118
      ThreadImpl* threads = new ThreadImpl[onum];
 
1119
      for (size_t i = 0; i < onum; i++) {
 
1120
        int64_t begoff = offs[i];
 
1121
        int64_t endoff = i < onum - 1 ? offs[i+1] : end;
 
1122
        ThreadImpl* thread = threads + i;
 
1123
        thread->init(this, visitor, checker, begoff, endoff);
 
1124
        thread->start();
 
1125
      }
 
1126
      for (size_t i = 0; i < onum; i++) {
 
1127
        ThreadImpl* thread = threads + i;
 
1128
        thread->join();
 
1129
        if (thread->error() != Error::SUCCESS) {
 
1130
          *error_ = thread->error();
 
1131
          err = true;
 
1132
        }
 
1133
      }
 
1134
      delete[] threads;
 
1135
    }
 
1136
    if (checker && !checker->check("scan_parallel", "ending", -1, -1)) {
 
1137
      set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
1138
      err = true;
 
1139
    }
 
1140
    return !err;
 
1141
  }
 
1142
  /**
 
1143
   * Synchronize updated contents with the file and the device.
 
1144
   * @param hard true for physical synchronization with the device, or false for logical
 
1145
   * synchronization with the file system.
 
1146
   * @param proc a postprocessor object.
 
1147
   * @param checker a progress checker object.
 
1148
   * @return true on success, or false on failure.
 
1149
   */
 
1150
  bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* checker) {
 
1151
    _assert_(true);
 
1152
    bool err = false;
 
1153
    if (writer_) {
 
1154
      if (checker && !checker->check("synchronize", "synchronizing the file", -1, -1)) {
 
1155
        set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
1156
        return false;
 
1157
      }
 
1158
      if (!file_.synchronize(hard)) {
 
1159
        set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
 
1160
        err = true;
 
1161
      }
 
1162
    }
 
1163
    if (proc) {
 
1164
      if (checker && !checker->check("synchronize", "running the post processor", -1, -1)) {
 
1165
        set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
 
1166
        return false;
 
1167
      }
 
1168
      if (!proc->process(path_, -1, file_.size())) {
 
1169
        set_error(_KCCODELINE_, Error::LOGIC, "postprocessing failed");
 
1170
        err = true;
 
1171
      }
 
1172
    }
 
1173
    return !err;
 
1174
  }
 
1175
  /**
 
1176
   * Disable all cursors.
 
1177
   */
 
1178
  void disable_cursors() {
 
1179
    _assert_(true);
 
1180
    if (curs_.empty()) return;
 
1181
    CursorList::const_iterator cit = curs_.begin();
 
1182
    CursorList::const_iterator citend = curs_.end();
 
1183
    while (cit != citend) {
 
1184
      Cursor* cur = *cit;
 
1185
      cur->off_ = INT64MAX;
 
1186
      ++cit;
 
1187
    }
 
1188
  }
 
1189
  /**
 
1190
   * Write the key pattern into a buffer.
 
1191
   * @param kbuf the destination buffer.
 
1192
   * @param off the offset of the record.
 
1193
   * @return the size of the key pattern.
 
1194
   */
 
1195
  size_t write_key(char* kbuf, int64_t off) {
 
1196
    _assert_(kbuf && off >= 0);
 
1197
    for (size_t i = 0; i < sizeof(off); i++) {
 
1198
      uint8_t c = off >> ((sizeof(off) - 1 - i) * 8);
 
1199
      uint8_t h = c >> 4;
 
1200
      if (h < 10) {
 
1201
        *(kbuf++) = '0' + h;
 
1202
      } else {
 
1203
        *(kbuf++) = 'A' - 10 + h;
 
1204
      }
 
1205
      uint8_t l = c & 0xf;
 
1206
      if (l < 10) {
 
1207
        *(kbuf++) = '0' + l;
 
1208
      } else {
 
1209
        *(kbuf++) = 'A' - 10 + l;
 
1210
      }
 
1211
    }
 
1212
    return sizeof(off) * 2;
 
1213
  }
 
1214
  /** Dummy constructor to forbid the use. */
 
1215
  TextDB(const TextDB&);
 
1216
  /** Dummy Operator to forbid the use. */
 
1217
  TextDB& operator =(const TextDB&);
 
1218
  /** The method lock. */
 
1219
  RWLock mlock_;
 
1220
  /** The last happened error. */
 
1221
  TSD<Error> error_;
 
1222
  /** The internal logger. */
 
1223
  Logger* logger_;
 
1224
  /** The kinds of logged messages. */
 
1225
  uint32_t logkinds_;
 
1226
  /** The internal meta operation trigger. */
 
1227
  MetaTrigger* mtrigger_;
 
1228
  /** The open mode. */
 
1229
  uint32_t omode_;
 
1230
  /** The flag for writer. */
 
1231
  bool writer_;
 
1232
  /** The flag for auto transaction. */
 
1233
  bool autotran_;
 
1234
  /** The flag for auto synchronization. */
 
1235
  bool autosync_;
 
1236
  /** The file for data. */
 
1237
  File file_;
 
1238
  /** The cursor objects. */
 
1239
  CursorList curs_;
 
1240
  /** The path of the database file. */
 
1241
  std::string path_;
 
1242
};
 
1243
 
 
1244
 
 
1245
}                                        // common namespace
 
1246
 
 
1247
#endif                                   // duplication check
 
1248
 
 
1249
// END OF FILE