~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/tritcask.py

  • Committer: Package Import Robot
  • Author(s): Rodney Dawes
  • Date: 2011-12-21 15:46:25 UTC
  • mfrom: (1.1.56)
  • Revision ID: package-import@ubuntu.com-20111221154625-ujvunri4frsecj2k
Tags: 2.99.0-0ubuntu1
* New upstream release.
  - Verify timestamp to avoid invalid auth failures (LP: #692597)
  - Files in new UDFs not uploaded due to filtering (LP: #869920)
* debian/patches:
  - Remove upstreamed patches

Show diffs side-by-side

added added

removed removed

Lines of Context:
82
82
_HintEntry = namedtuple('_HintEntry', ['tstamp', 'key_sz', 'row_type',
83
83
                                       'value_sz', 'value_pos', 'key'])
84
84
 
 
85
 
85
86
class HintEntry(_HintEntry):
86
87
    """A entry for the hint file."""
87
88
 
101
102
_KeydirEntry = namedtuple('_KeydirEntry',
102
103
                          ['file_id', 'tstamp', 'value_sz', 'value_pos'])
103
104
 
 
105
 
104
106
class KeydirEntry(_KeydirEntry):
105
107
    """A entry for the Keydir."""
106
108
 
260
262
        """__getitem__ to support slicing and *only* slicing."""
261
263
        if isinstance(item, slice):
262
264
            self.fd.seek(item.start)
263
 
            return self.fd.read(item.stop-item.start)
 
265
            return self.fd.read(item.stop - item.start)
264
266
        else:
265
267
            raise ValueError('Only slice is supported')
266
268
 
270
272
        value_sz = len(value)
271
273
        tstamp = timestamp()
272
274
        header = header_struct.pack(tstamp, key_sz, value_sz, row_type)
273
 
        crc32 = crc32_struct.pack(zlib.crc32(header+key+value))
 
275
        crc32 = crc32_struct.pack(zlib.crc32(header + key + value))
274
276
        if EXTRA_SEEK:
275
277
            # seek to end of file even if we are in append mode, but py2.x IO
276
278
            # in win32 is really buggy, see: http://bugs.python.org/issue3207
277
279
            self.fd.seek(0, os.SEEK_END)
278
 
        self.fd.write(crc32+header)
 
280
        self.fd.write(crc32 + header)
279
281
        self.fd.write(key)
280
282
        value_pos = self.fd.tell()
281
283
        self.fd.write(value)
284
286
 
285
287
    def read(self, fmmap, current_pos):
286
288
        """Read a single entry from the current position."""
287
 
        crc32_bytes = fmmap[current_pos:current_pos+crc32_size]
 
289
        crc32_bytes = fmmap[current_pos:current_pos + crc32_size]
288
290
        current_pos += crc32_size
289
 
        header = fmmap[current_pos:current_pos+header_size]
 
291
        header = fmmap[current_pos:current_pos + header_size]
290
292
        current_pos += header_size
291
293
        if header == '' or crc32_bytes == '':
292
294
            # reached EOF
296
298
            tstamp, key_sz, value_sz, row_type = header_struct.unpack(header)
297
299
        except struct.error, e:
298
300
            raise BadHeader(e)
299
 
        key = fmmap[current_pos:current_pos+key_sz]
 
301
        key = fmmap[current_pos:current_pos + key_sz]
300
302
        current_pos += key_sz
301
303
        value_pos = current_pos
302
 
        value = fmmap[current_pos:current_pos+value_sz]
 
304
        value = fmmap[current_pos:current_pos + value_sz]
303
305
        current_pos += value_sz
304
306
        # verify the crc32 of the data
305
 
        if zlib.crc32(header+key+value) == crc32:
 
307
        if zlib.crc32(header + key + value) == crc32:
306
308
            return TritcaskEntry(crc32, tstamp, key_sz, value_sz, row_type,
307
309
                                 key, value, value_pos), current_pos
308
310
        else:
309
 
            raise BadCrc(crc32, zlib.crc32(header+key+value))
 
311
            raise BadCrc(crc32, zlib.crc32(header + key + value))
310
312
 
311
313
    def get_hint_file(self):
312
314
        """Open and return the hint file."""
414
416
        new_name = self.filename.replace(self.temp_name, INACTIVE)
415
417
        os.rename(self.filename, new_name)
416
418
        if self.has_hint:
417
 
            new_hint_name = self.hint_filename.replace(self.temp_name, INACTIVE)
 
419
            new_hint_name = self.hint_filename.replace(self.temp_name,
 
420
                INACTIVE)
418
421
            os.rename(self.hint_filename, new_hint_name)
419
422
        return ImmutableDataFile(*os.path.split(new_name))
420
423
 
447
450
        with contextlib.closing(fmap):
448
451
            current_pos = 0
449
452
            while True:
450
 
                header = fmap[current_pos:current_pos+hint_header_size]
 
453
                header = fmap[current_pos:current_pos + hint_header_size]
451
454
                current_pos += hint_header_size
452
455
                if header == '':
453
456
                    raise StopIteration
454
457
                tstamp, key_sz, row_type, value_sz, value_pos = \
455
458
                        hint_header_struct.unpack(header)
456
 
                key = fmap[current_pos:current_pos+key_sz]
 
459
                key = fmap[current_pos:current_pos + key_sz]
457
460
                current_pos += key_sz
458
461
                yield HintEntry(tstamp, key_sz, row_type,
459
462
                                value_sz, value_pos, key)
593
596
        @param auto_merge: disable auto merge/compaction.
594
597
        @param dead_bytes_threshold: the limit factor of dead vs live bytes to
595
598
            trigger a merge and/or live file rotation.
596
 
        @param max_immutable_files: the max number of inactive files to use, once
597
 
            this value is reached a merge is triggered.
 
599
        @param max_immutable_files: the max number of inactive files to use,
 
600
            once this value is reached a merge is triggered.
598
601
        """
599
602
        logger.info("Initializing Tritcask on: %s", path)
600
603
        self._keydir = Keydir()
726
729
                os.rename(orig, dest)
727
730
        # immutable files + live
728
731
        logger.info("found %s data files, %s dead and %s broken files",
729
 
                    len(self._immutable)+1, dead_files, broken_files)
 
732
                    len(self._immutable) + 1, dead_files, broken_files)
730
733
 
731
734
    def rotate(self, create_file=True):
732
735
        """Rotate the live file only if it already exsits."""
799
802
                self._keydir.remove((entry.row_type, entry.key))
800
803
                # add the tombstone entry to the hint
801
804
                if build_hint:
802
 
                    hint_entry = HintEntry.from_tritcask_entry(entry, dead=True)
 
805
                    hint_entry = HintEntry.from_tritcask_entry(entry,
 
806
                        dead=True)
803
807
                    hint_idx[hint_entry.key] = hint_entry
804
808
            else:
805
809
                kd_entry = KeydirEntry.from_tritcask_entry(data_file.file_id,
817
821
        """Get the value for file_id, value_pos."""
818
822
        if self.live_file and file_id == self.live_file.file_id:
819
823
            # it's the current live file
820
 
            return self.live_file[value_pos:value_pos+value_sz]
 
824
            return self.live_file[value_pos:value_pos + value_sz]
821
825
        else:
822
 
            return self._immutable[file_id][value_pos:value_pos+value_sz]
 
826
            return self._immutable[file_id][value_pos:value_pos + value_sz]
823
827
 
824
828
    def put(self, row_type, key, value):
825
829
        """Put key/value in the store."""
828
832
            raise ValueError('key must be a str instance.')
829
833
        if not isinstance(value, str):
830
834
            raise ValueError('value must be a str instance.')
831
 
        tstamp, value_pos, value_sz = self.live_file.write(row_type, key, value)
 
835
        tstamp, value_pos, value_sz = self.live_file.write(row_type,
 
836
            key, value)
832
837
        if value != TOMBSTONE:
833
838
            kd_entry = KeydirEntry(self.live_file.file_id, tstamp,
834
839
                                   value_sz, value_pos)
861
866
    def merge(self, immutable_files):
862
867
        """Merge a set of immutable files into a single one."""
863
868
        logger.info("Starting merge of %s", immutable_files.keys())
 
869
 
864
870
        def by_file_id(item):
865
871
            file_id = item[1][0]
866
872
            return file_id in immutable_files