1
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2
// All rights reserved.
4
// Use of this source code is governed by a BSD-style license that can be
5
// found in the LICENSE file.
13
"github.com/syndtr/goleveldb/leveldb/journal"
14
"github.com/syndtr/goleveldb/leveldb/storage"
24
func (d dropper) Drop(err error) {
25
if e, ok := err.(journal.DroppedError); ok {
26
d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason)
28
d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num, err)
32
func (s *session) log(v ...interface{}) {
33
s.stor.Log(fmt.Sprint(v...))
36
func (s *session) logf(format string, v ...interface{}) {
37
s.stor.Log(fmt.Sprintf(format, v...))
42
func (s *session) getJournalFile(num uint64) storage.File {
43
return s.stor.GetFile(num, storage.TypeJournal)
46
func (s *session) getTableFile(num uint64) storage.File {
47
return s.stor.GetFile(num, storage.TypeTable)
50
func (s *session) getFiles(t storage.FileType) ([]storage.File, error) {
51
return s.stor.GetFiles(t)
56
// Get current version.
57
func (s *session) version() *version {
64
// Get current version; no barrier.
65
func (s *session) version_NB() *version {
69
// Set current version to v.
70
func (s *session) setVersion(v *version) {
73
if old := s.stVersion; old != nil {
82
// Get current unused file number.
83
func (s *session) fileNum() uint64 {
84
return atomic.LoadUint64(&s.stFileNum)
87
// Get current unused file number to num.
88
func (s *session) setFileNum(num uint64) {
89
atomic.StoreUint64(&s.stFileNum, num)
92
// Mark file number as used.
93
func (s *session) markFileNum(num uint64) {
96
old, x := s.stFileNum, num
100
if atomic.CompareAndSwapUint64(&s.stFileNum, old, x) {
106
// Allocate a file number.
107
func (s *session) allocFileNum() (num uint64) {
108
return atomic.AddUint64(&s.stFileNum, 1) - 1
111
// Reuse given file number.
112
func (s *session) reuseFileNum(num uint64) {
114
old, x := s.stFileNum, num
118
if atomic.CompareAndSwapUint64(&s.stFileNum, old, x) {
124
// manifest related utils
126
// Fill given session record obj with current states; need external
128
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
129
r.setNextNum(s.fileNum())
132
if !r.has(recJournalNum) {
133
r.setJournalNum(s.stJournalNum)
140
for level, ik := range s.stCPtrs {
142
r.addCompactionPointer(level, ik)
146
r.setComparer(s.cmp.cmp.Name())
150
// Mark if record has been commited, this will update session state;
151
// need external synchronization.
152
func (s *session) recordCommited(r *sessionRecord) {
153
if r.has(recJournalNum) {
154
s.stJournalNum = r.journalNum
157
if r.has(recPrevJournalNum) {
158
s.stPrevJournalNum = r.prevJournalNum
165
for _, p := range r.compactionPointers {
166
s.stCPtrs[p.level] = iKey(p.key)
170
// Create a new manifest file; need external synchronization.
171
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
172
num := s.allocFileNum()
173
file := s.stor.GetFile(num, storage.TypeManifest)
174
writer, err := file.Create()
178
jw := journal.NewWriter(writer)
184
rec = new(sessionRecord)
186
s.fillRecord(rec, true)
191
s.recordCommited(rec)
192
if s.manifest != nil {
195
if s.manifestWriter != nil {
196
s.manifestWriter.Close()
198
if s.manifestFile != nil {
199
s.manifestFile.Remove()
201
s.manifestFile = file
202
s.manifestWriter = writer
223
err = s.stor.SetManifest(file)
227
// Flush record to disk.
228
func (s *session) flushManifest(rec *sessionRecord) (err error) {
229
s.fillRecord(rec, false)
230
w, err := s.manifest.Next()
238
err = s.manifest.Flush()
242
err = s.manifestWriter.Sync()
246
s.recordCommited(rec)