1
/*************************************************************************************************
3
* Copyright (C) 2009-2012 FAL Labs
4
* This file is part of Kyoto Cabinet.
5
* This program is free software: you can redistribute it and/or modify it under the terms of
6
* the GNU General Public License as published by the Free Software Foundation, either version
7
* 3 of the License, or any later version.
8
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10
* See the GNU General Public License for more details.
11
* You should have received a copy of the GNU General Public License along with this program.
12
* If not, see <http://www.gnu.org/licenses/>.
13
*************************************************************************************************/
16
#ifndef _KCTEXTDB_H // duplication check
23
#include <kccompress.h>
24
#include <kccompare.h>
29
namespace kyotocabinet { // common namespace
33
* Plain text database.
34
* @note Although this class is designed to use a text file as a database file, not all methods
35
* are implemented. Each line in the text is treated as a record. When storing a record, the
36
* key is ignored and the value only is appended at the end of the file. Records can be
37
* retrieved only by the iterator and the cursor mechanisms. No record can be retrieved by
38
* specifying the key. When accessing a record by the iterator, the key is given as the offset
39
* from the beginning of the file for descriptive purposes. Any existing record cannot be
40
* modified and deleted.
42
class TextDB : public BasicDB {
47
/** An alias of list of cursors. */
48
typedef std::list<Cursor*> CursorList;
49
/** An alias of a past record. */
50
typedef std::pair<int64_t, std::string> Record;
51
/** The size of the IO buffer. */
52
static const size_t IOBUFSIZ = 1024;
55
* Cursor to indicate a record.
57
class Cursor : public BasicDB::Cursor {
62
* @param db the container database object.
64
explicit Cursor(TextDB* db) : db_(db), off_(INT64MAX), end_(0), queue_(), line_() {
66
ScopedRWLock lock(&db_->mlock_, true);
67
db_->curs_.push_back(this);
75
ScopedRWLock lock(&db_->mlock_, true);
76
db_->curs_.remove(this);
79
* Accept a visitor to the current record.
80
* @param visitor a visitor object.
81
* @param writable true for writable operation, or false for read-only operation.
82
* @param step true to move the cursor to the next record, or false for no move.
83
* @return true on success, or false on failure.
84
* @note The key is generated from the offset of each record. To avoid deadlock, any
85
* explicit database operation must not be performed in this function.
87
bool accept(Visitor* visitor, bool writable = true, bool step = false) {
89
ScopedRWLock lock(&db_->mlock_, false);
90
if (db_->omode_ == 0) {
91
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
94
if (writable && !db_->writer_) {
95
db_->set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
99
if (!accept_impl(visitor, step)) err = true;
103
* Jump the cursor to the first record for forward scan.
104
* @return true on success, or false on failure.
108
ScopedRWLock lock(&db_->mlock_, false);
109
if (db_->omode_ == 0) {
110
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
114
end_ = db_->file_.size();
118
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
124
* Jump the cursor to a record for forward scan.
125
* @param kbuf the pointer to the key region.
126
* @param ksiz the size of the key region.
127
* @return true on success, or false on failure.
129
bool jump(const char* kbuf, size_t ksiz) {
130
_assert_(kbuf && ksiz <= MEMMAXSIZ);
131
ScopedRWLock lock(&db_->mlock_, true);
132
if (db_->omode_ == 0) {
133
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
136
off_ = atoin(kbuf, ksiz);
137
end_ = db_->file_.size();
141
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
147
* Jump the cursor to a record for forward scan.
148
* @note Equal to the original Cursor::jump method except that the parameter is std::string.
150
bool jump(const std::string& key) {
152
return jump(key.c_str(), key.size());
155
* Jump the cursor to the last record for backward scan.
156
* @note This is a dummy implementation for compatibility.
160
ScopedRWLock lock(&db_->mlock_, true);
161
if (db_->omode_ == 0) {
162
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
165
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
169
* Jump the cursor to a record for backward scan.
170
* @note This is a dummy implementation for compatibility.
172
bool jump_back(const char* kbuf, size_t ksiz) {
173
_assert_(kbuf && ksiz <= MEMMAXSIZ);
174
ScopedRWLock lock(&db_->mlock_, true);
175
if (db_->omode_ == 0) {
176
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
179
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
183
* Jump the cursor to a record for backward scan.
184
* @note This is a dummy implementation for compatibility.
186
bool jump_back(const std::string& key) {
188
return jump_back(key.c_str(), key.size());
191
* Step the cursor to the next record.
192
* @return true on success, or false on failure.
196
ScopedRWLock lock(&db_->mlock_, false);
197
if (db_->omode_ == 0) {
198
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
201
if (queue_.empty() && !read_next()) return false;
202
if (queue_.empty()) {
203
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
210
* Step the cursor to the previous record.
211
* @note This is a dummy implementation for compatibility.
215
ScopedRWLock lock(&db_->mlock_, true);
216
if (db_->omode_ == 0) {
217
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
220
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
224
* Get the database object.
225
* @return the database object.
233
* Accept a visitor to the current record.
234
* @param visitor a visitor object.
235
* @param step true to move the cursor to the next record, or false for no move.
236
* @return true on success, or false on failure.
238
bool accept_impl(Visitor* visitor, bool step) {
240
if (queue_.empty() && !read_next()) return false;
241
if (queue_.empty()) {
242
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
246
const Record& rec = queue_.front();
247
char kbuf[NUMBUFSIZ];
248
size_t ksiz = db_->write_key(kbuf, rec.first);
250
const char* vbuf = visitor->visit_full(kbuf, ksiz,
251
rec.second.data(), rec.second.size(), &vsiz);
252
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
253
char stack[IOBUFSIZ];
254
size_t rsiz = vsiz + 1;
255
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
256
std::memcpy(rbuf, vbuf, vsiz);
258
if (!db_->file_.append(rbuf, rsiz)) {
259
db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
262
if (rbuf != stack) delete[] rbuf;
263
if (db_->autosync_ && !db_->file_.synchronize(true)) {
264
db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
268
if (step) queue_.pop_front();
272
* Read the next record.
273
* @return true on success, or false on failure.
277
while (off_ < end_) {
278
char stack[IOBUFSIZ];
279
int64_t rsiz = end_ - off_;
280
if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
281
if (!db_->file_.read_fast(off_, stack, rsiz)) {
282
db_->set_error(_KCCODELINE_, Error::SYSTEM, db_->file_.error());
285
const char* rp = stack;
287
const char* ep = rp + rsiz;
290
line_.append(pv, rp - pv);
292
rec.first = off_ + pv - stack;
294
queue_.push_back(rec);
302
line_.append(pv, rp - pv);
304
if (!queue_.empty()) break;
308
/** Dummy constructor to forbid the use. */
309
Cursor(const Cursor&);
310
/** Dummy Operator to forbid the use. */
311
Cursor& operator =(const Cursor&);
312
/** The inner database. */
314
/** The current offset. */
316
/** The end offset. */
318
/** The queue of read lines. */
319
std::deque<Record> queue_;
320
/** The current line. */
324
* Default constructor.
327
error_(), logger_(NULL), logkinds_(0), mtrigger_(NULL),
328
omode_(0), writer_(false), autotran_(false), autosync_(false),
329
file_(), curs_(), path_("") {
334
* @note If the database is not closed, it is closed implicitly.
338
if (omode_ != 0) close();
339
if (!curs_.empty()) {
340
CursorList::const_iterator cit = curs_.begin();
341
CursorList::const_iterator citend = curs_.end();
342
while (cit != citend) {
350
* Accept a visitor to a record.
351
* @param kbuf the pointer to the key region.
352
* @param ksiz the size of the key region.
353
* @param visitor a visitor object.
354
* @param writable true for writable operation, or false for read-only operation.
355
* @return true on success, or false on failure.
356
* @note No record can be retrieved by specifying the key and the Visitor::visit_empty method
357
* is always called. To avoid deadlock, any explicit database operation must not be performed
360
bool accept(const char* kbuf, size_t ksiz, Visitor* visitor, bool writable = true) {
361
_assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
362
ScopedRWLock lock(&mlock_, false);
364
set_error(_KCCODELINE_, Error::INVALID, "not opened");
367
if (writable && !writer_) {
368
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
372
if (!accept_impl(kbuf, ksiz, visitor)) err = true;
376
* Accept a visitor to multiple records at once.
377
* @param keys specifies a string vector of the keys.
378
* @param visitor a visitor object.
379
* @param writable true for writable operation, or false for read-only operation.
380
* @return true on success, or false on failure.
381
* @note No record can be retrieved by specifying the key and the Visitor::visit_empty method
382
* is always called. To avoid deadlock, any explicit database operation must not be performed
385
bool accept_bulk(const std::vector<std::string>& keys, Visitor* visitor,
386
bool writable = true) {
388
ScopedRWLock lock(&mlock_, true);
390
set_error(_KCCODELINE_, Error::INVALID, "not opened");
393
if (writable && !writer_) {
394
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
397
ScopedVisitor svis(visitor);
399
std::vector<std::string>::const_iterator kit = keys.begin();
400
std::vector<std::string>::const_iterator kitend = keys.end();
401
while (kit != kitend) {
402
if (!accept_impl(kit->data(), kit->size(), visitor)) err = true;
408
* Iterate to accept a visitor for each record.
409
* @param visitor a visitor object.
410
* @param writable true for writable operation, or false for read-only operation.
411
* @param checker a progress checker object. If it is NULL, no checking is performed.
412
* @return true on success, or false on failure.
413
* @note The key is generated from the offset of each record. To avoid deadlock, any explicit
414
* database operation must not be performed in this function.
416
bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* checker = NULL) {
418
ScopedRWLock lock(&mlock_, true);
420
set_error(_KCCODELINE_, Error::INVALID, "not opened");
423
if (writable && !writer_) {
424
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
427
ScopedVisitor svis(visitor);
429
if (!iterate_impl(visitor, checker)) err = true;
430
trigger_meta(MetaTrigger::ITERATE, "iterate");
434
* Scan each record in parallel.
435
* @param visitor a visitor object.
436
* @param thnum the number of worker threads.
437
* @param checker a progress checker object. If it is NULL, no checking is performed.
438
* @return true on success, or false on failure.
439
* @note This function is for reading records and not for updating ones. The return value of
440
* the visitor is just ignored. The key is generated from the offset of each record. To
441
* avoid deadlock, any explicit database operation must not be performed in this function.
443
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) {
444
_assert_(visitor && thnum <= MEMMAXSIZ);
445
ScopedRWLock lock(&mlock_, false);
447
set_error(_KCCODELINE_, Error::INVALID, "not opened");
450
if (thnum < 1) thnum = 1;
451
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
452
ScopedVisitor svis(visitor);
454
if (!scan_parallel_impl(visitor, thnum, checker)) err = true;
455
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
459
* Get the last happened error.
460
* @return the last happened error.
462
Error error() const {
467
* Set the error information.
468
* @param file the file name of the program source code.
469
* @param line the line number of the program source code.
470
* @param func the function name of the program source code.
471
* @param code an error code.
472
* @param message a supplement message.
474
void set_error(const char* file, int32_t line, const char* func,
475
Error::Code code, const char* message) {
476
_assert_(file && line > 0 && func && message);
477
error_->set(code, message);
479
Logger::Kind kind = code == Error::BROKEN || code == Error::SYSTEM ?
480
Logger::ERROR : Logger::INFO;
481
if (kind & logkinds_)
482
report(file, line, func, kind, "%d: %s: %s", code, Error::codename(code), message);
486
* Open a database file.
487
* @param path the path of a database file.
488
* @param mode the connection mode. TextDB::OWRITER as a writer, TextDB::OREADER as a
489
* reader. The following may be added to the writer mode by bitwise-or: TextDB::OCREATE,
490
* which means it creates a new database if the file does not exist, TextDB::OTRUNCATE, which
491
* means it creates a new database regardless if the file exists, TextDB::OAUTOTRAN, which
492
* means each updating operation is performed in implicit transaction, TextDB::OAUTOSYNC,
493
* which means each updating operation is followed by implicit synchronization with the file
494
* system. The following may be added to both of the reader mode and the writer mode by
495
* bitwise-or: TextDB::ONOLOCK, which means it opens the database file without file locking,
496
* TextDB::OTRYLOCK, which means locking is performed without blocking, TextDB::ONOREPAIR,
497
* which means the database file is not repaired implicitly even if file destruction is
499
* @return true on success, or false on failure.
500
* @note Every opened database must be closed by the TextDB::close method when it is no
501
* longer in use. It is not allowed for two or more database objects in the same process to
502
* keep their connections to the same database file at the same time.
504
bool open(const std::string& path, uint32_t mode = OWRITER | OCREATE) {
506
ScopedRWLock lock(&mlock_, true);
508
set_error(_KCCODELINE_, Error::INVALID, "already opened");
511
report(_KCCODELINE_, Logger::DEBUG, "opening the database (path=%s)", path.c_str());
515
uint32_t fmode = File::OREADER;
516
if (mode & OWRITER) {
518
fmode = File::OWRITER;
519
if (mode & OCREATE) fmode |= File::OCREATE;
520
if (mode & OTRUNCATE) fmode |= File::OTRUNCATE;
521
if (mode & OAUTOTRAN) autotran_ = true;
522
if (mode & OAUTOSYNC) autosync_ = true;
524
if (mode & ONOLOCK) fmode |= File::ONOLOCK;
525
if (mode & OTRYLOCK) fmode |= File::OTRYLOCK;
526
if (!file_.open(path, fmode, 0)) {
527
const char* emsg = file_.error();
528
Error::Code code = Error::SYSTEM;
529
if (std::strstr(emsg, "(permission denied)") || std::strstr(emsg, "(directory)")) {
530
code = Error::NOPERM;
531
} else if (std::strstr(emsg, "(file not found)") || std::strstr(emsg, "(invalid path)")) {
532
code = Error::NOREPOS;
534
set_error(_KCCODELINE_, code, emsg);
537
if (autosync_ && !File::synchronize_whole()) {
538
set_error(_KCCODELINE_, Error::SYSTEM, "synchronizing the file system failed");
544
trigger_meta(MetaTrigger::OPEN, "open");
548
* Close the database file.
549
* @return true on success, or false on failure.
553
ScopedRWLock lock(&mlock_, true);
555
set_error(_KCCODELINE_, Error::INVALID, "not opened");
558
report(_KCCODELINE_, Logger::DEBUG, "closing the database (path=%s)", path_.c_str());
561
if (!file_.close()) {
562
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
567
trigger_meta(MetaTrigger::CLOSE, "close");
571
* Synchronize updated contents with the file and the device.
572
* @param hard true for physical synchronization with the device, or false for logical
573
* synchronization with the file system.
574
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
575
* @param checker a progress checker object. If it is NULL, no checking is performed.
576
* @return true on success, or false on failure.
577
* @note The operation of the postprocessor is performed atomically and other threads accessing
578
* the same record are blocked. To avoid deadlock, any explicit database operation must not
579
* be performed in this function.
581
bool synchronize(bool hard = false, FileProcessor* proc = NULL,
582
ProgressChecker* checker = NULL) {
584
ScopedRWLock lock(&mlock_, false);
586
set_error(_KCCODELINE_, Error::INVALID, "not opened");
590
if (!synchronize_impl(hard, proc, checker)) err = true;
591
trigger_meta(MetaTrigger::SYNCHRONIZE, "synchronize");
595
* Occupy database by locking and do something meanwhile.
596
* @param writable true to use writer lock, or false to use reader lock.
597
* @param proc a processor object. If it is NULL, no processing is performed.
598
* @return true on success, or false on failure.
599
* @note The operation of the processor is performed atomically and other threads accessing
600
* the same record are blocked. To avoid deadlock, any explicit database operation must not
601
* be performed in this function.
603
bool occupy(bool writable = true, FileProcessor* proc = NULL) {
605
ScopedRWLock lock(&mlock_, writable);
607
if (proc && !proc->process(path_, -1, file_.size())) {
608
set_error(_KCCODELINE_, Error::LOGIC, "processing failed");
611
trigger_meta(MetaTrigger::OCCUPY, "occupy");
616
* @param hard true for physical synchronization with the device, or false for logical
617
* synchronization with the file system.
618
* @return true on success, or false on failure.
620
bool begin_transaction(bool hard = false) {
622
ScopedRWLock lock(&mlock_, true);
624
set_error(_KCCODELINE_, Error::INVALID, "not opened");
627
set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
631
* Try to begin transaction.
632
* @param hard true for physical synchronization with the device, or false for logical
633
* synchronization with the file system.
634
* @return true on success, or false on failure.
636
bool begin_transaction_try(bool hard = false) {
638
ScopedRWLock lock(&mlock_, true);
640
set_error(_KCCODELINE_, Error::INVALID, "not opened");
643
set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
648
* @param commit true to commit the transaction, or false to abort the transaction.
649
* @return true on success, or false on failure.
651
bool end_transaction(bool commit = true) {
653
ScopedRWLock lock(&mlock_, true);
655
set_error(_KCCODELINE_, Error::INVALID, "not opened");
658
set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
662
* Remove all records.
663
* @return true on success, or false on failure.
667
ScopedRWLock lock(&mlock_, true);
669
set_error(_KCCODELINE_, Error::INVALID, "not opened");
673
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
677
if (!file_.truncate(0)) {
678
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
681
if (autosync_ && !file_.synchronize(true)) {
682
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
685
trigger_meta(MetaTrigger::CLEAR, "clear");
689
* Get the number of records.
690
* @return the number of records, or -1 on failure.
694
ScopedRWLock lock(&mlock_, false);
696
set_error(_KCCODELINE_, Error::INVALID, "not opened");
699
set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
703
* Get the size of the database file.
704
* @return the size of the database file in bytes, or -1 on failure.
708
ScopedRWLock lock(&mlock_, false);
710
set_error(_KCCODELINE_, Error::INVALID, "not opened");
716
* Get the path of the database file.
717
* @return the path of the database file, or an empty string on failure.
724
* Get the miscellaneous status information.
725
* @param strmap a string map to contain the result.
726
* @return true on success, or false on failure.
728
bool status(std::map<std::string, std::string>* strmap) {
730
ScopedRWLock lock(&mlock_, true);
732
set_error(_KCCODELINE_, Error::INVALID, "not opened");
735
(*strmap)["type"] = strprintf("%u", (unsigned)TYPETEXT);
736
(*strmap)["realtype"] = strprintf("%u", (unsigned)TYPETEXT);
737
(*strmap)["path"] = path_;
738
(*strmap)["size"] = strprintf("%lld", (long long)file_.size());
742
* Create a cursor object.
743
* @return the return value is the created cursor object.
744
* @note Because the object of the return value is allocated by the constructor, it should be
745
* released with the delete operator when it is no longer in use.
749
return new Cursor(this);
752
* Write a log message.
753
* @param file the file name of the program source code.
754
* @param line the line number of the program source code.
755
* @param func the function name of the program source code.
756
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
757
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
758
* @param message the supplement message.
760
void log(const char* file, int32_t line, const char* func, Logger::Kind kind,
761
const char* message) {
762
_assert_(file && line > 0 && func && message);
763
ScopedRWLock lock(&mlock_, false);
764
if (!logger_) return;
765
logger_->log(file, line, func, kind, message);
768
* Set the internal logger.
769
* @param logger the logger object.
770
* @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
771
* Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
773
* @return true on success, or false on failure.
775
bool tune_logger(Logger* logger, uint32_t kinds = Logger::WARN | Logger::ERROR) {
777
ScopedRWLock lock(&mlock_, true);
779
set_error(_KCCODELINE_, Error::INVALID, "already opened");
787
* Set the internal meta operation trigger.
788
* @param trigger the trigger object.
789
* @return true on success, or false on failure.
791
bool tune_meta_trigger(MetaTrigger* trigger) {
793
ScopedRWLock lock(&mlock_, true);
795
set_error(_KCCODELINE_, Error::INVALID, "already opened");
803
* Report a message for debugging.
804
* @param file the file name of the program source code.
805
* @param line the line number of the program source code.
806
* @param func the function name of the program source code.
807
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
808
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
809
* @param format the printf-like format string.
810
* @param ... used according to the format string.
812
void report(const char* file, int32_t line, const char* func, Logger::Kind kind,
813
const char* format, ...) {
814
_assert_(file && line > 0 && func && format);
815
if (!logger_ || !(kind & logkinds_)) return;
817
strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
819
va_start(ap, format);
820
vstrprintf(&message, format, ap);
822
logger_->log(file, line, func, kind, message.c_str());
825
* Report a message for debugging with variable number of arguments.
826
* @param file the file name of the program source code.
827
* @param line the line number of the program source code.
828
* @param func the function name of the program source code.
829
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
830
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
831
* @param format the printf-like format string.
832
* @param ap used according to the format string.
834
void report_valist(const char* file, int32_t line, const char* func, Logger::Kind kind,
835
const char* format, va_list ap) {
836
_assert_(file && line > 0 && func && format);
837
if (!logger_ || !(kind & logkinds_)) return;
839
strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
840
vstrprintf(&message, format, ap);
841
logger_->log(file, line, func, kind, message.c_str());
844
* Report the content of a binary buffer for debugging.
845
* @param file the file name of the epicenter.
846
* @param line the line number of the epicenter.
847
* @param func the function name of the program source code.
848
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
849
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
850
* @param name the name of the information.
851
* @param buf the binary buffer.
852
* @param size the size of the binary buffer
854
void report_binary(const char* file, int32_t line, const char* func, Logger::Kind kind,
855
const char* name, const char* buf, size_t size) {
856
_assert_(file && line > 0 && func && name && buf && size <= MEMMAXSIZ);
857
if (!logger_) return;
858
char* hex = hexencode(buf, size);
859
report(file, line, func, kind, "%s=%s", name, hex);
863
* Trigger a meta database operation.
864
* @param kind the kind of the event. MetaTrigger::OPEN for opening, MetaTrigger::CLOSE for
865
* closing, MetaTrigger::CLEAR for clearing, MetaTrigger::ITERATE for iteration,
866
* MetaTrigger::SYNCHRONIZE for synchronization, MetaTrigger::BEGINTRAN for beginning
867
* transaction, MetaTrigger::COMMITTRAN for committing transaction, MetaTrigger::ABORTTRAN
868
* for aborting transaction, and MetaTrigger::MISC for miscellaneous operations.
869
* @param message the supplement message.
871
void trigger_meta(MetaTrigger::Kind kind, const char* message) {
873
if (mtrigger_) mtrigger_->trigger(kind, message);
879
class ScopedVisitor {
882
explicit ScopedVisitor(Visitor* visitor) : visitor_(visitor) {
884
visitor_->visit_before();
889
visitor_->visit_after();
892
Visitor* visitor_; ///< visitor
895
* Accept a visitor to a record.
896
* @param kbuf the pointer to the key region.
897
* @param ksiz the size of the key region.
898
* @param visitor a visitor object.
899
* @return true on success, or false on failure.
901
bool accept_impl(const char* kbuf, size_t ksiz, Visitor* visitor) {
902
_assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
905
const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz);
906
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
907
size_t rsiz = vsiz + 1;
908
char stack[IOBUFSIZ];
909
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
910
std::memcpy(rbuf, vbuf, vsiz);
912
if (!file_.append(rbuf, rsiz)) {
913
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
916
if (rbuf != stack) delete[] rbuf;
917
if (autosync_ && !file_.synchronize(true)) {
918
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
925
* Iterate to accept a visitor for each record.
926
* @param visitor a visitor object.
927
* @param checker a progress checker object.
928
* @return true on success, or false on failure.
930
bool iterate_impl(Visitor* visitor, ProgressChecker* checker) {
932
if (checker && !checker->check("iterate", "beginning", 0, -1)) {
933
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
937
int64_t end = file_.size();
940
char stack[IOBUFSIZ*4];
942
int64_t rsiz = end - off;
943
if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
944
if (!file_.read_fast(off, stack, rsiz)) {
945
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
948
const char* rp = stack;
950
const char* ep = rp + rsiz;
953
char kbuf[NUMBUFSIZ];
954
size_t ksiz = write_key(kbuf, off + pv - stack);
958
vbuf = visitor->visit_full(kbuf, ksiz, pv, rp - pv, &vsiz);
960
line.append(pv, rp - pv);
961
vbuf = visitor->visit_full(kbuf, ksiz, line.data(), line.size(), &vsiz);
964
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
965
char tstack[IOBUFSIZ];
966
size_t trsiz = vsiz + 1;
967
char* trbuf = trsiz > sizeof(tstack) ? new char[trsiz] : tstack;
968
std::memcpy(trbuf, vbuf, vsiz);
970
if (!file_.append(trbuf, trsiz)) {
971
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
972
if (trbuf != stack) delete[] trbuf;
975
if (trbuf != tstack) delete[] trbuf;
978
if (checker && !checker->check("iterate", "processing", curcnt, -1)) {
979
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
988
line.append(pv, rp - pv);
991
if (checker && !checker->check("iterate", "ending", -1, -1)) {
992
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
998
* Scan each record in parallel.
999
* @param visitor a visitor object.
1000
* @param thnum the number of worker threads.
1001
* @param checker a progress checker object.
1002
* @return true on success, or false on failure.
1004
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) {
1005
_assert_(visitor && thnum <= MEMMAXSIZ);
1006
if (checker && !checker->check("scan_parallel", "beginning", -1, -1)) {
1007
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
1011
int64_t end = file_.size();
1012
int64_t gap = (end - off) / thnum;
1013
std::vector<int64_t> offs;
1015
offs.push_back(off);
1016
int64_t edge = off + gap;
1022
char rbuf[IOBUFSIZ];
1023
int64_t rsiz = end - edge;
1024
if (rsiz > (int64_t)sizeof(rbuf)) rsiz = sizeof(rbuf);
1025
if (!file_.read_fast(edge, rbuf, rsiz)) {
1026
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
1030
const char* rp = rbuf;
1031
const char* ep = rp + rsiz;
1034
noff = edge + (rp - rbuf);
1047
size_t onum = offs.size();
1049
class ThreadImpl : public Thread {
1051
explicit ThreadImpl() :
1052
db_(NULL), visitor_(NULL), checker_(NULL), begoff_(0), endoff_(0), error_() {}
1053
void init(TextDB* db, Visitor* visitor, ProgressChecker* checker,
1054
int64_t begoff, int64_t endoff) {
1061
const Error& error() {
1067
File* file = &db->file_;
1068
Visitor* visitor = visitor_;
1069
ProgressChecker* checker = checker_;
1070
int64_t off = begoff_;
1071
int64_t end = endoff_;
1073
char stack[IOBUFSIZ*4];
1075
int64_t rsiz = end - off;
1076
if (rsiz > (int64_t)sizeof(stack)) rsiz = sizeof(stack);
1077
if (!file->read_fast(off, stack, rsiz)) {
1078
db->set_error(_KCCODELINE_, Error::SYSTEM, file->error());
1081
const char* rp = stack;
1082
const char* pv = rp;
1083
const char* ep = rp + rsiz;
1086
char kbuf[NUMBUFSIZ];
1087
size_t ksiz = db->write_key(kbuf, off + pv - stack);
1090
visitor->visit_full(kbuf, ksiz, pv, rp - pv, &vsiz);
1092
line.append(pv, rp - pv);
1094
visitor->visit_full(kbuf, ksiz, line.data(), line.size(), &vsiz);
1097
if (checker && !checker->check("iterate", "processing", -1, -1)) {
1098
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
1107
line.append(pv, rp - pv);
1113
ProgressChecker* checker_;
1118
ThreadImpl* threads = new ThreadImpl[onum];
1119
for (size_t i = 0; i < onum; i++) {
1120
int64_t begoff = offs[i];
1121
int64_t endoff = i < onum - 1 ? offs[i+1] : end;
1122
ThreadImpl* thread = threads + i;
1123
thread->init(this, visitor, checker, begoff, endoff);
1126
for (size_t i = 0; i < onum; i++) {
1127
ThreadImpl* thread = threads + i;
1129
if (thread->error() != Error::SUCCESS) {
1130
*error_ = thread->error();
1136
if (checker && !checker->check("scan_parallel", "ending", -1, -1)) {
1137
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
1143
* Synchronize updated contents with the file and the device.
1144
* @param hard true for physical synchronization with the device, or false for logical
1145
* synchronization with the file system.
1146
* @param proc a postprocessor object.
1147
* @param checker a progress checker object.
1148
* @return true on success, or false on failure.
1150
bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* checker) {
1154
if (checker && !checker->check("synchronize", "synchronizing the file", -1, -1)) {
1155
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
1158
if (!file_.synchronize(hard)) {
1159
set_error(_KCCODELINE_, Error::SYSTEM, file_.error());
1164
if (checker && !checker->check("synchronize", "running the post processor", -1, -1)) {
1165
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
1168
if (!proc->process(path_, -1, file_.size())) {
1169
set_error(_KCCODELINE_, Error::LOGIC, "postprocessing failed");
1176
* Disable all cursors.
1178
void disable_cursors() {
1180
if (curs_.empty()) return;
1181
CursorList::const_iterator cit = curs_.begin();
1182
CursorList::const_iterator citend = curs_.end();
1183
while (cit != citend) {
1185
cur->off_ = INT64MAX;
1190
* Write the key pattern into a buffer.
1191
* @param kbuf the destination buffer.
1192
* @param off the offset of the record.
1193
* @return the size of the key pattern.
1195
size_t write_key(char* kbuf, int64_t off) {
1196
_assert_(kbuf && off >= 0);
1197
for (size_t i = 0; i < sizeof(off); i++) {
1198
uint8_t c = off >> ((sizeof(off) - 1 - i) * 8);
1201
*(kbuf++) = '0' + h;
1203
*(kbuf++) = 'A' - 10 + h;
1205
uint8_t l = c & 0xf;
1207
*(kbuf++) = '0' + l;
1209
*(kbuf++) = 'A' - 10 + l;
1212
return sizeof(off) * 2;
1214
/** Dummy constructor to forbid the use. */
1215
TextDB(const TextDB&);
1216
/** Dummy Operator to forbid the use. */
1217
TextDB& operator =(const TextDB&);
1218
/** The method lock. */
1220
/** The last happened error. */
1222
/** The internal logger. */
1224
/** The kinds of logged messages. */
1226
/** The internal meta operation trigger. */
1227
MetaTrigger* mtrigger_;
1228
/** The open mode. */
1230
/** The flag for writer. */
1232
/** The flag for auto transaction. */
1234
/** The flag for auto synchronization. */
1236
/** The file for data. */
1238
/** The cursor objects. */
1240
/** The path of the database file. */
1245
} // common namespace
1247
#endif // duplication check