76
81
"""A entry for the hint file."""
79
def from_tritcask_entry(cls, entry):
84
def from_tritcask_entry(cls, entry, dead=False):
80
85
"""Return a KeydirEntry from a file_id + TritcaskEntry."""
86
value_pos = TOMBSTONE_POS if dead else entry.value_pos
81
87
return cls(entry.tstamp, entry.key_sz, entry.row_type,
82
entry.value_sz, entry.value_pos, entry.key)
88
entry.value_sz, value_pos, entry.key)
223
229
tstamp = time.time()
224
230
header = header_struct.pack(tstamp, key_sz, value_sz, row_type)
225
231
crc32 = crc32_struct.pack(zlib.crc32(header+key+value))
233
# seek to end of file even if we are in append mode, but py2.x IO
234
# in win32 is really buggy, see: http://bugs.python.org/issue3207
235
self.fd.seek(0, os.SEEK_END)
226
236
self.fd.write(crc32+header)
227
237
self.fd.write(key)
228
238
value_pos = self.fd.tell()
525
536
This is based on: http://downloads.basho.com/papers/bitcask-intro.pdf
528
def __init__(self, path, dead_bytes_threshold=0.5):
529
"""Initialize the instance."""
539
def __init__(self, path, auto_merge=True, dead_bytes_threshold=0.5,
540
max_immutable_files=10):
541
"""Initialize the instance.
543
@param auto_merge: disable auto merge/compaction.
544
@param dead_bytes_threshold: the limit factor of dead vs live bytes to
545
trigger a merge and/or live file rotation.
546
@param max_immutable_files: the max number of inactive files to use, once
547
this value is reached a merge is triggered.
530
549
logger.info("Initializing Tritcask on: %s", path)
531
550
self._keydir = Keydir()
532
551
self.base_path = path
533
552
self.dead_bytes_threshold = dead_bytes_threshold
553
self.max_immutable_files = max_immutable_files
554
self.auto_merge = auto_merge
534
555
if not os.path.exists(self.base_path):
535
556
os.makedirs(self.base_path)
536
557
elif not os.path.isdir(self.base_path):
542
563
# now check if we should rotate the live file
543
564
# and merge immutable ones
544
565
self._rotate_and_merge()
566
# check if we found a live data file
567
# if not, define one (it will be created later)
568
if self.live_file is None:
569
# it's a clean start, let's create the first file
570
self.live_file = DataFile(self.base_path)
546
572
def shutdown(self):
547
573
"""Shutdown and close all open files."""
576
605
live_bytes = sum([self._keydir.get_stats(file_id)['live_bytes'] \
577
606
for file_id in file_ids])
578
607
total_bytes = sum([f.size for f in immutable_files.values()])
579
return (live_bytes / total_bytes) < self.dead_bytes_threshold
608
merge_dead = (live_bytes / total_bytes) < self.dead_bytes_threshold
611
# now check if we should merge using the max_immutable_files setting.
612
if len(self._immutable) > self.max_immutable_files:
581
617
def _rotate_and_merge(self):
582
618
"""Check if we need to rotate/merge the data files and do it."""
583
# check if we should rotate the live file
584
620
if self.should_rotate():
586
# check if we need to merge immutable_files
587
if self.should_merge(self._immutable):
588
self.merge(self._immutable)
621
self.rotate(create_file=False)
625
# check if we have inactive files with 0 live entries/bytes
627
for file_id in self._immutable.keys():
628
stats = self._keydir.get_stats(file_id)
629
if stats['live_bytes'] == 0 and stats['live_entries'] == 0:
630
# remove from the immutable files list and mark it as dead.
631
self._immutable.pop(file_id).make_zombie().close()
633
# check if we need to merge immutable_files
634
if self.should_merge(self._immutable):
635
# if the size of the live file
636
if self.live_file and self.live_file.size > 0 and not rotated:
637
self.rotate(create_file=False)
638
self.merge(self._immutable)
590
640
def _find_data_files(self):
591
641
"""Collect the files we need to work with."""
605
655
# a dead file...let's remove it
607
657
DeadDataFile(self.base_path, filename).delete()
608
# check if we found a live data file
609
# if not, define one (it will be created later)
610
if self.live_file is None:
611
# it's a clent start, let's create the first file
612
self.live_file = DataFile(self.base_path)
613
658
# immutable files + live
614
659
logger.info("found %s data files and %s dead files",
615
660
len(self._immutable)+1, dead_files)
618
# if the file exists and the size is 0, don't rotate it.
619
logger.info("rotating live file: %s", self.live_file.filename)
620
if self.live_file.exists() and self.live_file.size == 0:
622
# add the current file to the "immutable" list
623
self.live_file.close()
624
data_file = self.live_file.make_immutable()
625
self._immutable[data_file.file_id] = data_file
626
# create a new live data file
627
self.live_file = DataFile(self.base_path)
662
def rotate(self, create_file=True):
663
"""Rotate the live file only if it already exsits."""
665
if self.live_file.exists() and self.live_file.size == 0:
667
elif not self.live_file.exists():
669
# add the current file to the "immutable" list
670
logger.info("rotating live file: %s", self.live_file.filename)
671
self.live_file.close()
672
data_file = self.live_file.make_immutable()
673
self._immutable[data_file.file_id] = data_file
675
# create a new live data file
676
self.live_file = DataFile(self.base_path)
678
self.live_file = None
629
680
def _build_keydir(self):
630
681
"""Build the keydir."""
631
logger.debug('building the keydir, using: %s',
632
self._immutable.keys() + [self.live_file.file_id])
682
fileids = self._immutable.keys()
684
fileids.append(self.live_file.file_id)
685
logger.debug('building the keydir, using: %s', fileids)
633
686
# sort the files by name, in order to load from older -> newest
634
687
for data_file in sorted(self._immutable.values(),
635
key=attrgetter('filename')):
688
key=attrgetter('filename')):
636
689
if data_file.has_hint:
637
690
# load directly from the hint file.
638
691
self._load_from_hint(data_file)
639
692
elif data_file.exists() and data_file.size > 0:
640
693
self._load_from_data(data_file)
641
if self.live_file.exists() and self.live_file.size > 0:
694
if self.live_file and self.live_file.exists() \
695
and self.live_file.size > 0:
642
696
self._load_from_data(self.live_file)
643
697
logger.info('keydir ready! (keys: %d)', len(self._keydir))
647
701
logger.debug("loading entries from hint of: %s", data_file.filename)
648
702
hint_file = data_file.get_hint_file()
649
703
for hint_entry in hint_file.iter_entries():
650
kd_entry = KeydirEntry.from_hint_entry(data_file.file_id,
652
self._keydir[(hint_entry.row_type, hint_entry.key)] = kd_entry
704
if hint_entry.value_pos == TOMBSTONE_POS:
705
self._keydir.remove((hint_entry.row_type, hint_entry.key))
707
kd_entry = KeydirEntry.from_hint_entry(data_file.file_id,
709
self._keydir[(hint_entry.row_type, hint_entry.key)] = kd_entry
654
711
def _load_from_data(self, data_file):
655
712
"""Load keydir info from a data file.
667
724
# the record is dead, check if need to remove it from
669
726
self._keydir.remove((entry.row_type, entry.key))
670
hint_idx.pop(entry.key, None)
727
# add the tombstone entry to the hint
729
hint_entry = HintEntry.from_tritcask_entry(entry, dead=True)
730
hint_idx[hint_entry.key] = hint_entry
672
732
kd_entry = KeydirEntry.from_tritcask_entry(data_file.file_id,
683
743
def _get_value(self, file_id, value_pos, value_sz):
684
744
"""Get the value for file_id, value_pos."""
685
if file_id == self.live_file.file_id:
745
if self.live_file and file_id == self.live_file.file_id:
686
746
# it's the current live file
687
747
return self.live_file[value_pos:value_pos+value_sz]