~ubuntu-branches/ubuntu/utopic/hockeypuck/utopic-proposed

« back to all changes in this revision

Viewing changes to build/src/github.com/cmars/conflux/recon/leveldb/ptree.go

  • Committer: Package Import Robot
  • Author(s): Casey Marshall
  • Date: 2014-04-13 20:06:01 UTC
  • Revision ID: package-import@ubuntu.com-20140413200601-oxdlqn1gy0x8m55u
Tags: 1.0~rel20140413+7a1892a~trusty
Hockeypuck 1.0 release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   conflux - Distributed database synchronization library
 
3
        Based on the algorithm described in
 
4
                "Set Reconciliation with Nearly Optimal Communication Complexity",
 
5
                        Yaron Minsky, Ari Trachtenberg, and Richard Zippel, 2004.
 
6
 
 
7
   Copyright (C) 2012  Casey Marshall <casey.marshall@gmail.com>
 
8
 
 
9
   This program is free software: you can redistribute it and/or modify
 
10
   it under the terms of the GNU Affero General Public License as published by
 
11
   the Free Software Foundation, version 3.
 
12
 
 
13
   This program is distributed in the hope that it will be useful,
 
14
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
   GNU Affero General Public License for more details.
 
17
 
 
18
   You should have received a copy of the GNU Affero General Public License
 
19
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
// Package leveldb provides a key-value storage implementation of the
 
23
// recon prefix tree interface.
 
24
package leveldb
 
25
 
 
26
import (
 
27
        "bytes"
 
28
        "encoding/gob"
 
29
        "errors"
 
30
        "fmt"
 
31
        . "github.com/cmars/conflux"
 
32
        "github.com/cmars/conflux/recon"
 
33
        "github.com/syndtr/goleveldb/leveldb"
 
34
        "os"
 
35
)
 
36
 
 
37
type prefixTree struct {
 
38
        *Settings
 
39
        root   *prefixNode
 
40
        db     *leveldb.DB
 
41
        points []*Zp
 
42
}
 
43
 
 
44
type prefixNode struct {
 
45
        *prefixTree
 
46
        NodeKey      []byte
 
47
        NodeSValues  []byte
 
48
        NumElements  int
 
49
        Leaf         bool
 
50
        NodeElements [][]byte
 
51
}
 
52
 
 
53
func mustEncodeBitstring(bs *Bitstring) []byte {
 
54
        w := bytes.NewBuffer(nil)
 
55
        err := recon.WriteBitstring(w, bs)
 
56
        if err != nil {
 
57
                panic(err)
 
58
        }
 
59
        return w.Bytes()
 
60
}
 
61
 
 
62
func mustDecodeBitstring(buf []byte) *Bitstring {
 
63
        bs, err := recon.ReadBitstring(bytes.NewBuffer(buf))
 
64
        if err != nil {
 
65
                panic(err)
 
66
        }
 
67
        return bs
 
68
}
 
69
 
 
70
func mustEncodeZZarray(arr []*Zp) []byte {
 
71
        w := bytes.NewBuffer(nil)
 
72
        err := recon.WriteZZarray(w, arr)
 
73
        if err != nil {
 
74
                panic(err)
 
75
        }
 
76
        return w.Bytes()
 
77
}
 
78
 
 
79
func mustDecodeZZarray(buf []byte) []*Zp {
 
80
        arr, err := recon.ReadZZarray(bytes.NewBuffer(buf))
 
81
        if err != nil {
 
82
                panic(err)
 
83
        }
 
84
        return arr
 
85
}
 
86
 
 
87
const COLLECTION_NAME = "conflux.recon"
 
88
 
 
89
func New(settings *Settings) (ptree recon.PrefixTree, err error) {
 
90
        tree := &prefixTree{
 
91
                Settings: settings,
 
92
                points:   Zpoints(P_SKS, settings.NumSamples())}
 
93
        ptree = tree
 
94
        return
 
95
}
 
96
 
 
97
func (t *prefixTree) Create() (err error) {
 
98
        if t.db, err = leveldb.OpenFile(t.Settings.Path(), nil); err != nil {
 
99
                return
 
100
        }
 
101
        return t.ensureRoot()
 
102
}
 
103
 
 
104
func (t *prefixTree) Drop() error {
 
105
        if t.db != nil {
 
106
                t.db.Close()
 
107
        }
 
108
        return os.Remove(t.Settings.Path())
 
109
}
 
110
 
 
111
func (t *prefixTree) Close() (err error) {
 
112
        return t.db.Close()
 
113
}
 
114
 
 
115
func (t *prefixTree) Init() {
 
116
}
 
117
 
 
118
func (t *prefixTree) ensureRoot() (err error) {
 
119
        _, err = t.Root()
 
120
        if err != recon.PNodeNotFound {
 
121
                return
 
122
        }
 
123
        root := t.newChildNode(nil, 0)
 
124
        return root.upsertNode()
 
125
}
 
126
 
 
127
func (t *prefixTree) Points() []*Zp { return t.points }
 
128
 
 
129
func (t *prefixTree) Root() (recon.PrefixNode, error) {
 
130
        return t.Node(NewBitstring(0))
 
131
}
 
132
 
 
133
func (t *prefixTree) hasKey(key []byte) bool {
 
134
        _, err := t.db.Get(key, nil)
 
135
        return err == nil
 
136
}
 
137
 
 
138
func (t *prefixTree) getNode(key []byte) (node *prefixNode, err error) {
 
139
        var val []byte
 
140
        if val, err = t.db.Get(key, nil); err != nil {
 
141
                if err == leveldb.ErrNotFound {
 
142
                        err = recon.PNodeNotFound
 
143
                }
 
144
                return
 
145
        }
 
146
        if len(val) == 0 {
 
147
                err = recon.PNodeNotFound
 
148
                return
 
149
        }
 
150
        dec := gob.NewDecoder(bytes.NewBuffer(val))
 
151
        node = new(prefixNode)
 
152
        err = dec.Decode(node)
 
153
        node.prefixTree = t
 
154
        return
 
155
}
 
156
 
 
157
func (t *prefixTree) Node(bs *Bitstring) (node recon.PrefixNode, err error) {
 
158
        nbq := t.BitQuantum()
 
159
        key := bs
 
160
        nodeKey := mustEncodeBitstring(key)
 
161
        for {
 
162
                node, err = t.getNode(nodeKey)
 
163
                if err != recon.PNodeNotFound || key.BitLen() == 0 {
 
164
                        break
 
165
                }
 
166
                key = NewBitstring(key.BitLen() - nbq)
 
167
                key.SetBytes(bs.Bytes())
 
168
                nodeKey = mustEncodeBitstring(key)
 
169
        }
 
170
        return node, err
 
171
}
 
172
 
 
173
func (n *prefixNode) insert(z *Zp, marray []*Zp, bs *Bitstring, depth int) error {
 
174
        for {
 
175
                n.updateSvalues(z, marray)
 
176
                n.NumElements++
 
177
                var err error
 
178
                if n.IsLeaf() {
 
179
                        if len(n.NodeElements) > n.SplitThreshold() {
 
180
                                err = n.split(depth)
 
181
                                if err != nil {
 
182
                                        return err
 
183
                                }
 
184
                        } else {
 
185
                                err = n.insertElement(z)
 
186
                                if err != nil {
 
187
                                        return err
 
188
                                }
 
189
                                return n.upsertNode()
 
190
                        }
 
191
                }
 
192
                err = n.upsertNode()
 
193
                if err != nil {
 
194
                        return err
 
195
                }
 
196
                childIndex := recon.NextChild(n, bs, depth)
 
197
                n = n.Children()[childIndex].(*prefixNode)
 
198
                depth++
 
199
        }
 
200
}
 
201
 
 
202
func (n *prefixNode) deleteNode() (err error) {
 
203
        err = n.db.Delete(n.NodeKey, nil)
 
204
        return
 
205
}
 
206
 
 
207
func (n *prefixNode) deleteElements() error {
 
208
        n.NodeElements = nil
 
209
        return n.upsertNode()
 
210
}
 
211
 
 
212
func (n *prefixNode) deleteElement(element *Zp) error {
 
213
        elementBytes := element.Bytes()
 
214
        var elements [][]byte
 
215
        var removed bool
 
216
        for _, element := range n.NodeElements {
 
217
                if bytes.Equal(element, elementBytes) {
 
218
                        removed = true
 
219
                } else {
 
220
                        elements = append(elements, element)
 
221
                }
 
222
        }
 
223
        if !removed {
 
224
                return ErrElementNotFound(element)
 
225
        }
 
226
        n.NodeElements = elements
 
227
        return n.upsertNode()
 
228
}
 
229
 
 
230
func (n *prefixNode) insertElement(element *Zp) error {
 
231
        n.NodeElements = append(n.NodeElements, element.Bytes())
 
232
        return n.upsertNode()
 
233
}
 
234
 
 
235
func (n *prefixNode) split(depth int) (err error) {
 
236
        splitElements := n.NodeElements
 
237
        n.Leaf = false
 
238
        n.NodeElements = nil
 
239
        err = n.upsertNode()
 
240
        if err != nil {
 
241
                return err
 
242
        }
 
243
        // Create child nodes
 
244
        numChildren := 1 << uint(n.BitQuantum())
 
245
        var children []*prefixNode
 
246
        for i := 0; i < numChildren; i++ {
 
247
                // Create new empty child node
 
248
                child := n.newChildNode(n, i)
 
249
                err = child.upsertNode()
 
250
                if err != nil {
 
251
                        return err
 
252
                }
 
253
                children = append(children, child)
 
254
        }
 
255
        // Move elements into child nodes
 
256
        for _, element := range splitElements {
 
257
                z := Zb(P_SKS, element)
 
258
                bs := NewZpBitstring(z)
 
259
                childIndex := recon.NextChild(n, bs, depth)
 
260
                child := children[childIndex]
 
261
                marray, err := recon.AddElementArray(child, z)
 
262
                if err != nil {
 
263
                        return err
 
264
                }
 
265
                err = child.insert(z, marray, bs, depth+1)
 
266
                if err != nil {
 
267
                        return err
 
268
                }
 
269
        }
 
270
        return nil
 
271
}
 
272
 
 
273
func (n *prefixNode) remove(z *Zp, marray []*Zp, bs *Bitstring, depth int) error {
 
274
        var err error
 
275
        for {
 
276
                n.updateSvalues(z, marray)
 
277
                n.NumElements--
 
278
                if n.IsLeaf() {
 
279
                        break
 
280
                } else {
 
281
                        if n.NumElements <= n.JoinThreshold() {
 
282
                                err = n.join()
 
283
                                if err != nil {
 
284
                                        return err
 
285
                                }
 
286
                                break
 
287
                        } else {
 
288
                                err = n.upsertNode()
 
289
                                if err != nil {
 
290
                                        return err
 
291
                                }
 
292
                                childIndex := recon.NextChild(n, bs, depth)
 
293
                                n = n.Children()[childIndex].(*prefixNode)
 
294
                                depth++
 
295
                        }
 
296
                }
 
297
        }
 
298
        err = n.deleteElement(z)
 
299
        if err != nil {
 
300
                return err
 
301
        }
 
302
        return n.upsertNode()
 
303
}
 
304
 
 
305
func (n *prefixNode) join() error {
 
306
        var elements [][]byte
 
307
        for _, child := range n.Children() {
 
308
                elements = append(elements, child.(*prefixNode).NodeElements...)
 
309
                if err := child.(*prefixNode).deleteNode(); err != nil {
 
310
                        return err
 
311
                }
 
312
        }
 
313
        n.NodeElements = elements
 
314
        n.Leaf = true
 
315
        return n.upsertNode()
 
316
}
 
317
 
 
318
func ErrDuplicateElement(z *Zp) error {
 
319
        return errors.New(fmt.Sprintf("Attempt to insert duplicate element %v", z))
 
320
}
 
321
 
 
322
func ErrElementNotFound(z *Zp) error {
 
323
        return errors.New(fmt.Sprintf("Expected element %v was not found", z))
 
324
}
 
325
 
 
326
func (t *prefixTree) Insert(z *Zp) error {
 
327
        _, lookupErr := t.db.Get(z.Bytes(), nil)
 
328
        if lookupErr == nil {
 
329
                return ErrDuplicateElement(z)
 
330
        } else if lookupErr != leveldb.ErrNotFound {
 
331
                return lookupErr
 
332
        }
 
333
        bs := NewZpBitstring(z)
 
334
        root, err := t.Root()
 
335
        if err != nil {
 
336
                return err
 
337
        }
 
338
        marray, err := recon.AddElementArray(t, z)
 
339
        if err != nil {
 
340
                return err
 
341
        }
 
342
        err = root.(*prefixNode).insert(z, marray, bs, 0)
 
343
        if err != nil {
 
344
                return err
 
345
        }
 
346
        return t.db.Put(z.Bytes(), []byte{}, nil)
 
347
}
 
348
 
 
349
func (t *prefixTree) Remove(z *Zp) error {
 
350
        _, lookupErr := t.db.Get(z.Bytes(), nil)
 
351
        if lookupErr != nil {
 
352
                return lookupErr
 
353
        }
 
354
        bs := NewZpBitstring(z)
 
355
        root, err := t.Root()
 
356
        if err != nil {
 
357
                return err
 
358
        }
 
359
        marray := recon.DelElementArray(t, z)
 
360
        err = root.(*prefixNode).remove(z, marray, bs, 0)
 
361
        if err != nil {
 
362
                return err
 
363
        }
 
364
        return t.db.Delete(z.Bytes(), nil)
 
365
}
 
366
 
 
367
func (t *prefixTree) newChildNode(parent *prefixNode, childIndex int) *prefixNode {
 
368
        n := &prefixNode{prefixTree: t, Leaf: true}
 
369
        var key *Bitstring
 
370
        if parent != nil {
 
371
                parentKey := parent.Key()
 
372
                key = NewBitstring(parentKey.BitLen() + t.BitQuantum())
 
373
                key.SetBytes(parentKey.Bytes())
 
374
                for j := 0; j < parent.BitQuantum(); j++ {
 
375
                        if (1<<uint(j))&childIndex == 0 {
 
376
                                key.Unset(parentKey.BitLen() + j)
 
377
                        } else {
 
378
                                key.Set(parentKey.BitLen() + j)
 
379
                        }
 
380
                }
 
381
        } else {
 
382
                key = NewBitstring(0)
 
383
        }
 
384
        n.NodeKey = mustEncodeBitstring(key)
 
385
        svalues := make([]*Zp, t.NumSamples())
 
386
        for i := 0; i < len(svalues); i++ {
 
387
                svalues[i] = Zi(P_SKS, 1)
 
388
        }
 
389
        n.NodeSValues = mustEncodeZZarray(svalues)
 
390
        return n
 
391
}
 
392
 
 
393
func (n *prefixNode) upsertNode() (err error) {
 
394
        var buf bytes.Buffer
 
395
        enc := gob.NewEncoder(&buf)
 
396
        if err = enc.Encode(n); err != nil {
 
397
                return
 
398
        }
 
399
        return n.db.Put(n.NodeKey, buf.Bytes(), nil)
 
400
}
 
401
 
 
402
func (n *prefixNode) IsLeaf() bool {
 
403
        return n.Leaf
 
404
}
 
405
 
 
406
func (n *prefixNode) Children() (result []recon.PrefixNode) {
 
407
        if n.IsLeaf() {
 
408
                return nil
 
409
        }
 
410
        key := n.Key()
 
411
        numChildren := 1 << uint(n.BitQuantum())
 
412
        for i := 0; i < numChildren; i++ {
 
413
                childKey := NewBitstring(key.BitLen() + n.BitQuantum())
 
414
                childKey.SetBytes(key.Bytes())
 
415
                for j := 0; j < n.BitQuantum(); j++ {
 
416
                        if (1<<uint(j))&i == 0 {
 
417
                                childKey.Unset(key.BitLen() + j)
 
418
                        } else {
 
419
                                childKey.Set(key.BitLen() + j)
 
420
                        }
 
421
                }
 
422
                child, err := n.Node(childKey)
 
423
                if err != nil {
 
424
                        panic(fmt.Sprintf("Children failed on child#%v, key=%v: %v", i, childKey, err))
 
425
                }
 
426
                result = append(result, child)
 
427
                //fmt.Println("Node", n.Key(), "Child:", child.Key())
 
428
        }
 
429
        return
 
430
}
 
431
 
 
432
func (n *prefixNode) Elements() (result []*Zp) {
 
433
        if n.IsLeaf() {
 
434
                for _, element := range n.NodeElements {
 
435
                        result = append(result, Zb(P_SKS, element))
 
436
                }
 
437
        } else {
 
438
                // TODO: Eliminate recursion
 
439
                for _, child := range n.Children() {
 
440
                        result = append(result, child.Elements()...)
 
441
                }
 
442
        }
 
443
        return
 
444
}
 
445
 
 
446
func (n *prefixNode) Size() int { return n.NumElements }
 
447
 
 
448
func (n *prefixNode) SValues() []*Zp {
 
449
        return mustDecodeZZarray(n.NodeSValues)
 
450
}
 
451
 
 
452
func (n *prefixNode) Key() *Bitstring {
 
453
        return mustDecodeBitstring(n.NodeKey)
 
454
}
 
455
 
 
456
func (n *prefixNode) Parent() (recon.PrefixNode, bool) {
 
457
        key := n.Key()
 
458
        if key.BitLen() == 0 {
 
459
                return nil, false
 
460
        }
 
461
        parentKey := NewBitstring(key.BitLen() - n.BitQuantum())
 
462
        parentKey.SetBytes(key.Bytes())
 
463
        parent, err := n.Node(parentKey)
 
464
        if err != nil {
 
465
                panic(fmt.Sprintf("Failed to get parent: %v", err))
 
466
        }
 
467
        return parent, true
 
468
}
 
469
 
 
470
func (n *prefixNode) updateSvalues(z *Zp, marray []*Zp) {
 
471
        if len(marray) != len(n.points) {
 
472
                panic("Inconsistent NumSamples size")
 
473
        }
 
474
        svalues := mustDecodeZZarray(n.NodeSValues)
 
475
        for i := 0; i < len(marray); i++ {
 
476
                svalues[i] = Z(z.P).Mul(svalues[i], marray[i])
 
477
        }
 
478
        n.NodeSValues = mustEncodeZZarray(svalues)
 
479
}