~didrocks/ubuntuone-client/use_result_var

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/tritcask.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-02-11 16:18:11 UTC
  • mto: This revision was merged to the branch mainline in revision 67.
  • Revision ID: james.westby@ubuntu.com-20110211161811-n18dj9lde7dxqjzr
Tags: upstream-1.5.4
ImportĀ upstreamĀ versionĀ 1.5.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import mmap
26
26
import os
27
27
import struct
 
28
import sys
28
29
import time
29
30
import tempfile
30
31
import uuid
48
49
hint_header_struct = struct.Struct(hint_header_fmt)
49
50
 
50
51
TOMBSTONE = str(uuid.uuid5(uuid.NAMESPACE_OID, 'TOMBSTONE'))
 
52
TOMBSTONE_POS = -1
51
53
 
52
54
LIVE = '.live'
53
55
INACTIVE = '.inactive'
57
59
VERSION = 'v1'
58
60
FILE_SUFFIX = '.tritcask-%s.data' % VERSION
59
61
 
 
62
EXTRA_SEEK = False
 
63
if sys.platform == 'win32':
 
64
    EXTRA_SEEK = True
60
65
 
61
66
logger = logging.getLogger('ubuntuone.SyncDaemon.tritcask')
62
67
 
76
81
    """A entry for the hint file."""
77
82
 
78
83
    @classmethod
79
 
    def from_tritcask_entry(cls, entry):
 
84
    def from_tritcask_entry(cls, entry, dead=False):
80
85
        """Return a KeydirEntry from a file_id + TritcaskEntry."""
 
86
        value_pos = TOMBSTONE_POS if dead else entry.value_pos
81
87
        return cls(entry.tstamp, entry.key_sz, entry.row_type,
82
 
                   entry.value_sz, entry.value_pos, entry.key)
 
88
                   entry.value_sz, value_pos, entry.key)
83
89
 
84
90
    @property
85
91
    def header(self):
223
229
        tstamp = time.time()
224
230
        header = header_struct.pack(tstamp, key_sz, value_sz, row_type)
225
231
        crc32 = crc32_struct.pack(zlib.crc32(header+key+value))
 
232
        if EXTRA_SEEK:
 
233
            # seek to end of file even if we are in append mode, but py2.x IO
 
234
            # in win32 is really buggy, see: http://bugs.python.org/issue3207
 
235
            self.fd.seek(0, os.SEEK_END)
226
236
        self.fd.write(crc32+header)
227
237
        self.fd.write(key)
228
238
        value_pos = self.fd.tell()
490
500
        return self._stats[file_id].copy()
491
501
 
492
502
 
 
503
 
493
504
class Tritcask(object):
494
505
    """Implementation of a bitcask-like (row_type,key)/value store.
495
506
 
525
536
    This is based on: http://downloads.basho.com/papers/bitcask-intro.pdf
526
537
    """
527
538
 
528
 
    def __init__(self, path, dead_bytes_threshold=0.5):
529
 
        """Initialize the instance."""
 
539
    def __init__(self, path, auto_merge=True, dead_bytes_threshold=0.5,
 
540
                 max_immutable_files=10):
 
541
        """Initialize the instance.
 
542
 
 
543
        @param auto_merge: disable auto merge/compaction.
 
544
        @param dead_bytes_threshold: the limit factor of dead vs live bytes to
 
545
            trigger a merge and/or live file rotation.
 
546
        @param max_immutable_files: the max number of inactive files to use, once
 
547
            this value is reached a merge is triggered.
 
548
        """
530
549
        logger.info("Initializing Tritcask on: %s", path)
531
550
        self._keydir = Keydir()
532
551
        self.base_path = path
533
552
        self.dead_bytes_threshold = dead_bytes_threshold
 
553
        self.max_immutable_files = max_immutable_files
 
554
        self.auto_merge = auto_merge
534
555
        if not os.path.exists(self.base_path):
535
556
            os.makedirs(self.base_path)
536
557
        elif not os.path.isdir(self.base_path):
542
563
        # now check if we should rotate the live file
543
564
        # and merge immutable ones
544
565
        self._rotate_and_merge()
 
566
        # check if we found a live data file
 
567
        # if not, define one (it will be created later)
 
568
        if self.live_file is None:
 
569
            # it's a clean start, let's create the first file
 
570
            self.live_file = DataFile(self.base_path)
545
571
 
546
572
    def shutdown(self):
547
573
        """Shutdown and close all open files."""
555
581
 
556
582
    def should_rotate(self):
557
583
        """Check if we should rotate the live file."""
 
584
        # if there is no live file, just say no
 
585
        if self.live_file is None:
 
586
            return False
558
587
        # check if the file is marked with a bad crc
559
588
        # as we shouldn't keep adding data to a broken file.
560
589
        if self.live_file.has_bad_crc:
576
605
        live_bytes = sum([self._keydir.get_stats(file_id)['live_bytes'] \
577
606
                          for file_id in file_ids])
578
607
        total_bytes = sum([f.size for f in immutable_files.values()])
579
 
        return (live_bytes / total_bytes) < self.dead_bytes_threshold
 
608
        merge_dead = (live_bytes / total_bytes) < self.dead_bytes_threshold
 
609
        if merge_dead:
 
610
            return True
 
611
        # now check if we should merge using the max_immutable_files setting.
 
612
        if len(self._immutable) > self.max_immutable_files:
 
613
            return True
 
614
        # shouldn't merge.
 
615
        return False
580
616
 
581
617
    def _rotate_and_merge(self):
582
618
        """Check if we need to rotate/merge the data files and do it."""
583
 
        # check if we should rotate the live file
 
619
        rotated = False
584
620
        if self.should_rotate():
585
 
            self.rotate()
586
 
        # check if we need to merge immutable_files
587
 
        if self.should_merge(self._immutable):
588
 
            self.merge(self._immutable)
 
621
            self.rotate(create_file=False)
 
622
            rotated = True
 
623
 
 
624
        if self.auto_merge:
 
625
            # check if we have inactive files with 0 live entries/bytes
 
626
            # and kill them
 
627
            for file_id in self._immutable.keys():
 
628
                stats = self._keydir.get_stats(file_id)
 
629
                if stats['live_bytes'] == 0 and stats['live_entries'] == 0:
 
630
                    # remove from the immutable files list and mark it as dead.
 
631
                    self._immutable.pop(file_id).make_zombie().close()
 
632
 
 
633
            # check if we need to merge immutable_files
 
634
            if self.should_merge(self._immutable):
 
635
                # if the size of the live file
 
636
                if self.live_file and self.live_file.size > 0 and not rotated:
 
637
                    self.rotate(create_file=False)
 
638
                self.merge(self._immutable)
589
639
 
590
640
    def _find_data_files(self):
591
641
        """Collect the files we need to work with."""
605
655
                # a dead file...let's remove it
606
656
                dead_files += 1
607
657
                DeadDataFile(self.base_path, filename).delete()
608
 
        # check if we found a live data file
609
 
        # if not, define one (it will be created later)
610
 
        if self.live_file is None:
611
 
            # it's a clent start, let's create the first file
612
 
            self.live_file = DataFile(self.base_path)
613
658
        # immutable files + live
614
659
        logger.info("found %s data files and %s dead files",
615
660
                    len(self._immutable)+1, dead_files)
616
661
 
617
 
    def rotate(self):
618
 
        # if the file exists and the size is 0, don't rotate it.
619
 
        logger.info("rotating live file: %s", self.live_file.filename)
620
 
        if self.live_file.exists() and self.live_file.size == 0:
621
 
            return
622
 
        # add the current file to the "immutable" list
623
 
        self.live_file.close()
624
 
        data_file = self.live_file.make_immutable()
625
 
        self._immutable[data_file.file_id] = data_file
626
 
        # create a new live data file
627
 
        self.live_file = DataFile(self.base_path)
 
662
    def rotate(self, create_file=True):
 
663
        """Rotate the live file only if it already exsits."""
 
664
        if self.live_file:
 
665
            if self.live_file.exists() and self.live_file.size == 0:
 
666
                return
 
667
            elif not self.live_file.exists():
 
668
                return
 
669
            # add the current file to the "immutable" list
 
670
            logger.info("rotating live file: %s", self.live_file.filename)
 
671
            self.live_file.close()
 
672
            data_file = self.live_file.make_immutable()
 
673
            self._immutable[data_file.file_id] = data_file
 
674
            if create_file:
 
675
                # create a new live data file
 
676
                self.live_file = DataFile(self.base_path)
 
677
            else:
 
678
                self.live_file = None
628
679
 
629
680
    def _build_keydir(self):
630
681
        """Build the keydir."""
631
 
        logger.debug('building the keydir, using: %s',
632
 
                     self._immutable.keys() + [self.live_file.file_id])
 
682
        fileids = self._immutable.keys()
 
683
        if self.live_file:
 
684
            fileids.append(self.live_file.file_id)
 
685
        logger.debug('building the keydir, using: %s', fileids)
633
686
        # sort the files by name, in order to load from older -> newest
634
687
        for data_file in sorted(self._immutable.values(),
635
 
                                        key=attrgetter('filename')):
 
688
                                key=attrgetter('filename')):
636
689
            if data_file.has_hint:
637
690
                # load directly from the hint file.
638
691
                self._load_from_hint(data_file)
639
692
            elif data_file.exists() and data_file.size > 0:
640
693
                self._load_from_data(data_file)
641
 
        if self.live_file.exists() and self.live_file.size > 0:
 
694
        if self.live_file and self.live_file.exists() \
 
695
           and self.live_file.size > 0:
642
696
            self._load_from_data(self.live_file)
643
697
        logger.info('keydir ready! (keys: %d)', len(self._keydir))
644
698
 
647
701
        logger.debug("loading entries from hint of: %s", data_file.filename)
648
702
        hint_file = data_file.get_hint_file()
649
703
        for hint_entry in hint_file.iter_entries():
650
 
            kd_entry = KeydirEntry.from_hint_entry(data_file.file_id,
651
 
                                                   hint_entry)
652
 
            self._keydir[(hint_entry.row_type, hint_entry.key)] = kd_entry
 
704
            if hint_entry.value_pos == TOMBSTONE_POS:
 
705
                self._keydir.remove((hint_entry.row_type, hint_entry.key))
 
706
            else:
 
707
                kd_entry = KeydirEntry.from_hint_entry(data_file.file_id,
 
708
                                                       hint_entry)
 
709
                self._keydir[(hint_entry.row_type, hint_entry.key)] = kd_entry
653
710
 
654
711
    def _load_from_data(self, data_file):
655
712
        """Load keydir info from a data file.
667
724
                # the record is dead, check if need to remove it from
668
725
                # the indexes
669
726
                self._keydir.remove((entry.row_type, entry.key))
670
 
                hint_idx.pop(entry.key, None)
 
727
                # add the tombstone entry to the hint
 
728
                if build_hint:
 
729
                    hint_entry = HintEntry.from_tritcask_entry(entry, dead=True)
 
730
                    hint_idx[hint_entry.key] = hint_entry
671
731
            else:
672
732
                kd_entry = KeydirEntry.from_tritcask_entry(data_file.file_id,
673
733
                                                           entry)
682
742
 
683
743
    def _get_value(self, file_id, value_pos, value_sz):
684
744
        """Get the value for file_id, value_pos."""
685
 
        if file_id == self.live_file.file_id:
 
745
        if self.live_file and file_id == self.live_file.file_id:
686
746
            # it's the current live file
687
747
            return self.live_file[value_pos:value_pos+value_sz]
688
748
        else:
732
792
            file_id = item[1][0]
733
793
            return file_id in immutable_files
734
794
        filtered_keydir = itertools.ifilter(by_file_id, self._keydir.items())
735
 
 
736
795
        dest_file = TempDataFile(self.base_path)
737
796
        hint_file = dest_file.get_hint_file()
738
797
        for keydir_key, kd_entry in sorted(filtered_keydir,