~ubuntu-branches/ubuntu/utopic/hockeypuck/utopic-proposed

« back to all changes in this revision

Viewing changes to build/src/github.com/syndtr/goleveldb/leveldb/session_util.go

  • Committer: Package Import Robot
  • Author(s): Casey Marshall
  • Date: 2014-04-13 20:06:01 UTC
  • Revision ID: package-import@ubuntu.com-20140413200601-oxdlqn1gy0x8m55u
Tags: 1.0~rel20140413+7a1892a~trusty
Hockeypuck 1.0 release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 
2
// All rights reserved.
 
3
//
 
4
// Use of this source code is governed by a BSD-style license that can be
 
5
// found in the LICENSE file.
 
6
 
 
7
package leveldb
 
8
 
 
9
import (
 
10
        "fmt"
 
11
        "sync/atomic"
 
12
 
 
13
        "github.com/syndtr/goleveldb/leveldb/journal"
 
14
        "github.com/syndtr/goleveldb/leveldb/storage"
 
15
)
 
16
 
 
17
// logging
 
18
 
 
19
type dropper struct {
 
20
        s    *session
 
21
        file storage.File
 
22
}
 
23
 
 
24
func (d dropper) Drop(err error) {
 
25
        if e, ok := err.(journal.DroppedError); ok {
 
26
                d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason)
 
27
        } else {
 
28
                d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num, err)
 
29
        }
 
30
}
 
31
 
 
32
func (s *session) log(v ...interface{}) {
 
33
        s.stor.Log(fmt.Sprint(v...))
 
34
}
 
35
 
 
36
func (s *session) logf(format string, v ...interface{}) {
 
37
        s.stor.Log(fmt.Sprintf(format, v...))
 
38
}
 
39
 
 
40
// file utils
 
41
 
 
42
func (s *session) getJournalFile(num uint64) storage.File {
 
43
        return s.stor.GetFile(num, storage.TypeJournal)
 
44
}
 
45
 
 
46
func (s *session) getTableFile(num uint64) storage.File {
 
47
        return s.stor.GetFile(num, storage.TypeTable)
 
48
}
 
49
 
 
50
func (s *session) getFiles(t storage.FileType) ([]storage.File, error) {
 
51
        return s.stor.GetFiles(t)
 
52
}
 
53
 
 
54
// session state
 
55
 
 
56
// Get current version.
 
57
func (s *session) version() *version {
 
58
        s.vmu.Lock()
 
59
        defer s.vmu.Unlock()
 
60
        s.stVersion.ref++
 
61
        return s.stVersion
 
62
}
 
63
 
 
64
// Get current version; no barrier.
 
65
func (s *session) version_NB() *version {
 
66
        return s.stVersion
 
67
}
 
68
 
 
69
// Set current version to v.
 
70
func (s *session) setVersion(v *version) {
 
71
        s.vmu.Lock()
 
72
        v.ref = 1
 
73
        if old := s.stVersion; old != nil {
 
74
                v.ref++
 
75
                old.next = v
 
76
                old.release_NB()
 
77
        }
 
78
        s.stVersion = v
 
79
        s.vmu.Unlock()
 
80
}
 
81
 
 
82
// Get current unused file number.
 
83
func (s *session) fileNum() uint64 {
 
84
        return atomic.LoadUint64(&s.stFileNum)
 
85
}
 
86
 
 
87
// Get current unused file number to num.
 
88
func (s *session) setFileNum(num uint64) {
 
89
        atomic.StoreUint64(&s.stFileNum, num)
 
90
}
 
91
 
 
92
// Mark file number as used.
 
93
func (s *session) markFileNum(num uint64) {
 
94
        num += 1
 
95
        for {
 
96
                old, x := s.stFileNum, num
 
97
                if old > x {
 
98
                        x = old
 
99
                }
 
100
                if atomic.CompareAndSwapUint64(&s.stFileNum, old, x) {
 
101
                        break
 
102
                }
 
103
        }
 
104
}
 
105
 
 
106
// Allocate a file number.
 
107
func (s *session) allocFileNum() (num uint64) {
 
108
        return atomic.AddUint64(&s.stFileNum, 1) - 1
 
109
}
 
110
 
 
111
// Reuse given file number.
 
112
func (s *session) reuseFileNum(num uint64) {
 
113
        for {
 
114
                old, x := s.stFileNum, num
 
115
                if old != x+1 {
 
116
                        x = old
 
117
                }
 
118
                if atomic.CompareAndSwapUint64(&s.stFileNum, old, x) {
 
119
                        break
 
120
                }
 
121
        }
 
122
}
 
123
 
 
124
// manifest related utils
 
125
 
 
126
// Fill given session record obj with current states; need external
 
127
// synchronization.
 
128
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
 
129
        r.setNextNum(s.fileNum())
 
130
 
 
131
        if snapshot {
 
132
                if !r.has(recJournalNum) {
 
133
                        r.setJournalNum(s.stJournalNum)
 
134
                }
 
135
 
 
136
                if !r.has(recSeq) {
 
137
                        r.setSeq(s.stSeq)
 
138
                }
 
139
 
 
140
                for level, ik := range s.stCPtrs {
 
141
                        if ik != nil {
 
142
                                r.addCompactionPointer(level, ik)
 
143
                        }
 
144
                }
 
145
 
 
146
                r.setComparer(s.cmp.cmp.Name())
 
147
        }
 
148
}
 
149
 
 
150
// Mark if record has been commited, this will update session state;
 
151
// need external synchronization.
 
152
func (s *session) recordCommited(r *sessionRecord) {
 
153
        if r.has(recJournalNum) {
 
154
                s.stJournalNum = r.journalNum
 
155
        }
 
156
 
 
157
        if r.has(recPrevJournalNum) {
 
158
                s.stPrevJournalNum = r.prevJournalNum
 
159
        }
 
160
 
 
161
        if r.has(recSeq) {
 
162
                s.stSeq = r.seq
 
163
        }
 
164
 
 
165
        for _, p := range r.compactionPointers {
 
166
                s.stCPtrs[p.level] = iKey(p.key)
 
167
        }
 
168
}
 
169
 
 
170
// Create a new manifest file; need external synchronization.
 
171
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
 
172
        num := s.allocFileNum()
 
173
        file := s.stor.GetFile(num, storage.TypeManifest)
 
174
        writer, err := file.Create()
 
175
        if err != nil {
 
176
                return
 
177
        }
 
178
        jw := journal.NewWriter(writer)
 
179
 
 
180
        if v == nil {
 
181
                v = s.version_NB()
 
182
        }
 
183
        if rec == nil {
 
184
                rec = new(sessionRecord)
 
185
        }
 
186
        s.fillRecord(rec, true)
 
187
        v.fillRecord(rec)
 
188
 
 
189
        defer func() {
 
190
                if err == nil {
 
191
                        s.recordCommited(rec)
 
192
                        if s.manifest != nil {
 
193
                                s.manifest.Close()
 
194
                        }
 
195
                        if s.manifestWriter != nil {
 
196
                                s.manifestWriter.Close()
 
197
                        }
 
198
                        if s.manifestFile != nil {
 
199
                                s.manifestFile.Remove()
 
200
                        }
 
201
                        s.manifestFile = file
 
202
                        s.manifestWriter = writer
 
203
                        s.manifest = jw
 
204
                } else {
 
205
                        writer.Close()
 
206
                        file.Remove()
 
207
                        s.reuseFileNum(num)
 
208
                }
 
209
        }()
 
210
 
 
211
        w, err := jw.Next()
 
212
        if err != nil {
 
213
                return
 
214
        }
 
215
        err = rec.encode(w)
 
216
        if err != nil {
 
217
                return
 
218
        }
 
219
        err = jw.Flush()
 
220
        if err != nil {
 
221
                return
 
222
        }
 
223
        err = s.stor.SetManifest(file)
 
224
        return
 
225
}
 
226
 
 
227
// Flush record to disk.
 
228
func (s *session) flushManifest(rec *sessionRecord) (err error) {
 
229
        s.fillRecord(rec, false)
 
230
        w, err := s.manifest.Next()
 
231
        if err != nil {
 
232
                return
 
233
        }
 
234
        err = rec.encode(w)
 
235
        if err != nil {
 
236
                return
 
237
        }
 
238
        err = s.manifest.Flush()
 
239
        if err != nil {
 
240
                return
 
241
        }
 
242
        err = s.manifestWriter.Sync()
 
243
        if err != nil {
 
244
                return
 
245
        }
 
246
        s.recordCommited(rec)
 
247
        return
 
248
}