2
conflux - Distributed database synchronization library
3
Based on the algorithm described in
4
"Set Reconciliation with Nearly Optimal Communication Complexity",
5
Yaron Minsky, Ari Trachtenberg, and Richard Zippel, 2004.
7
Copyright (C) 2012 Casey Marshall <casey.marshall@gmail.com>
9
This program is free software: you can redistribute it and/or modify
10
it under the terms of the GNU Affero General Public License as published by
11
the Free Software Foundation, version 3.
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU Affero General Public License for more details.
18
You should have received a copy of the GNU Affero General Public License
19
along with this program. If not, see <http://www.gnu.org/licenses/>.
22
// Package leveldb provides a key-value storage implementation of the
23
// recon prefix tree interface.
31
. "github.com/cmars/conflux"
32
"github.com/cmars/conflux/recon"
33
"github.com/syndtr/goleveldb/leveldb"
37
type prefixTree struct {
44
type prefixNode struct {
53
func mustEncodeBitstring(bs *Bitstring) []byte {
54
w := bytes.NewBuffer(nil)
55
err := recon.WriteBitstring(w, bs)
62
func mustDecodeBitstring(buf []byte) *Bitstring {
63
bs, err := recon.ReadBitstring(bytes.NewBuffer(buf))
70
func mustEncodeZZarray(arr []*Zp) []byte {
71
w := bytes.NewBuffer(nil)
72
err := recon.WriteZZarray(w, arr)
79
func mustDecodeZZarray(buf []byte) []*Zp {
80
arr, err := recon.ReadZZarray(bytes.NewBuffer(buf))
87
const COLLECTION_NAME = "conflux.recon"
89
func New(settings *Settings) (ptree recon.PrefixTree, err error) {
92
points: Zpoints(P_SKS, settings.NumSamples())}
97
func (t *prefixTree) Create() (err error) {
98
if t.db, err = leveldb.OpenFile(t.Settings.Path(), nil); err != nil {
101
return t.ensureRoot()
104
func (t *prefixTree) Drop() error {
108
return os.Remove(t.Settings.Path())
111
func (t *prefixTree) Close() (err error) {
115
func (t *prefixTree) Init() {
118
func (t *prefixTree) ensureRoot() (err error) {
120
if err != recon.PNodeNotFound {
123
root := t.newChildNode(nil, 0)
124
return root.upsertNode()
127
func (t *prefixTree) Points() []*Zp { return t.points }
129
func (t *prefixTree) Root() (recon.PrefixNode, error) {
130
return t.Node(NewBitstring(0))
133
func (t *prefixTree) hasKey(key []byte) bool {
134
_, err := t.db.Get(key, nil)
138
func (t *prefixTree) getNode(key []byte) (node *prefixNode, err error) {
140
if val, err = t.db.Get(key, nil); err != nil {
141
if err == leveldb.ErrNotFound {
142
err = recon.PNodeNotFound
147
err = recon.PNodeNotFound
150
dec := gob.NewDecoder(bytes.NewBuffer(val))
151
node = new(prefixNode)
152
err = dec.Decode(node)
157
func (t *prefixTree) Node(bs *Bitstring) (node recon.PrefixNode, err error) {
158
nbq := t.BitQuantum()
160
nodeKey := mustEncodeBitstring(key)
162
node, err = t.getNode(nodeKey)
163
if err != recon.PNodeNotFound || key.BitLen() == 0 {
166
key = NewBitstring(key.BitLen() - nbq)
167
key.SetBytes(bs.Bytes())
168
nodeKey = mustEncodeBitstring(key)
173
func (n *prefixNode) insert(z *Zp, marray []*Zp, bs *Bitstring, depth int) error {
175
n.updateSvalues(z, marray)
179
if len(n.NodeElements) > n.SplitThreshold() {
185
err = n.insertElement(z)
189
return n.upsertNode()
196
childIndex := recon.NextChild(n, bs, depth)
197
n = n.Children()[childIndex].(*prefixNode)
202
func (n *prefixNode) deleteNode() (err error) {
203
err = n.db.Delete(n.NodeKey, nil)
207
func (n *prefixNode) deleteElements() error {
209
return n.upsertNode()
212
func (n *prefixNode) deleteElement(element *Zp) error {
213
elementBytes := element.Bytes()
214
var elements [][]byte
216
for _, element := range n.NodeElements {
217
if bytes.Equal(element, elementBytes) {
220
elements = append(elements, element)
224
return ErrElementNotFound(element)
226
n.NodeElements = elements
227
return n.upsertNode()
230
func (n *prefixNode) insertElement(element *Zp) error {
231
n.NodeElements = append(n.NodeElements, element.Bytes())
232
return n.upsertNode()
235
func (n *prefixNode) split(depth int) (err error) {
236
splitElements := n.NodeElements
243
// Create child nodes
244
numChildren := 1 << uint(n.BitQuantum())
245
var children []*prefixNode
246
for i := 0; i < numChildren; i++ {
247
// Create new empty child node
248
child := n.newChildNode(n, i)
249
err = child.upsertNode()
253
children = append(children, child)
255
// Move elements into child nodes
256
for _, element := range splitElements {
257
z := Zb(P_SKS, element)
258
bs := NewZpBitstring(z)
259
childIndex := recon.NextChild(n, bs, depth)
260
child := children[childIndex]
261
marray, err := recon.AddElementArray(child, z)
265
err = child.insert(z, marray, bs, depth+1)
273
func (n *prefixNode) remove(z *Zp, marray []*Zp, bs *Bitstring, depth int) error {
276
n.updateSvalues(z, marray)
281
if n.NumElements <= n.JoinThreshold() {
292
childIndex := recon.NextChild(n, bs, depth)
293
n = n.Children()[childIndex].(*prefixNode)
298
err = n.deleteElement(z)
302
return n.upsertNode()
305
func (n *prefixNode) join() error {
306
var elements [][]byte
307
for _, child := range n.Children() {
308
elements = append(elements, child.(*prefixNode).NodeElements...)
309
if err := child.(*prefixNode).deleteNode(); err != nil {
313
n.NodeElements = elements
315
return n.upsertNode()
318
func ErrDuplicateElement(z *Zp) error {
319
return errors.New(fmt.Sprintf("Attempt to insert duplicate element %v", z))
322
func ErrElementNotFound(z *Zp) error {
323
return errors.New(fmt.Sprintf("Expected element %v was not found", z))
326
func (t *prefixTree) Insert(z *Zp) error {
327
_, lookupErr := t.db.Get(z.Bytes(), nil)
328
if lookupErr == nil {
329
return ErrDuplicateElement(z)
330
} else if lookupErr != leveldb.ErrNotFound {
333
bs := NewZpBitstring(z)
334
root, err := t.Root()
338
marray, err := recon.AddElementArray(t, z)
342
err = root.(*prefixNode).insert(z, marray, bs, 0)
346
return t.db.Put(z.Bytes(), []byte{}, nil)
349
func (t *prefixTree) Remove(z *Zp) error {
350
_, lookupErr := t.db.Get(z.Bytes(), nil)
351
if lookupErr != nil {
354
bs := NewZpBitstring(z)
355
root, err := t.Root()
359
marray := recon.DelElementArray(t, z)
360
err = root.(*prefixNode).remove(z, marray, bs, 0)
364
return t.db.Delete(z.Bytes(), nil)
367
func (t *prefixTree) newChildNode(parent *prefixNode, childIndex int) *prefixNode {
368
n := &prefixNode{prefixTree: t, Leaf: true}
371
parentKey := parent.Key()
372
key = NewBitstring(parentKey.BitLen() + t.BitQuantum())
373
key.SetBytes(parentKey.Bytes())
374
for j := 0; j < parent.BitQuantum(); j++ {
375
if (1<<uint(j))&childIndex == 0 {
376
key.Unset(parentKey.BitLen() + j)
378
key.Set(parentKey.BitLen() + j)
382
key = NewBitstring(0)
384
n.NodeKey = mustEncodeBitstring(key)
385
svalues := make([]*Zp, t.NumSamples())
386
for i := 0; i < len(svalues); i++ {
387
svalues[i] = Zi(P_SKS, 1)
389
n.NodeSValues = mustEncodeZZarray(svalues)
393
func (n *prefixNode) upsertNode() (err error) {
395
enc := gob.NewEncoder(&buf)
396
if err = enc.Encode(n); err != nil {
399
return n.db.Put(n.NodeKey, buf.Bytes(), nil)
402
func (n *prefixNode) IsLeaf() bool {
406
func (n *prefixNode) Children() (result []recon.PrefixNode) {
411
numChildren := 1 << uint(n.BitQuantum())
412
for i := 0; i < numChildren; i++ {
413
childKey := NewBitstring(key.BitLen() + n.BitQuantum())
414
childKey.SetBytes(key.Bytes())
415
for j := 0; j < n.BitQuantum(); j++ {
416
if (1<<uint(j))&i == 0 {
417
childKey.Unset(key.BitLen() + j)
419
childKey.Set(key.BitLen() + j)
422
child, err := n.Node(childKey)
424
panic(fmt.Sprintf("Children failed on child#%v, key=%v: %v", i, childKey, err))
426
result = append(result, child)
427
//fmt.Println("Node", n.Key(), "Child:", child.Key())
432
func (n *prefixNode) Elements() (result []*Zp) {
434
for _, element := range n.NodeElements {
435
result = append(result, Zb(P_SKS, element))
438
// TODO: Eliminate recursion
439
for _, child := range n.Children() {
440
result = append(result, child.Elements()...)
446
func (n *prefixNode) Size() int { return n.NumElements }
448
func (n *prefixNode) SValues() []*Zp {
449
return mustDecodeZZarray(n.NodeSValues)
452
func (n *prefixNode) Key() *Bitstring {
453
return mustDecodeBitstring(n.NodeKey)
456
func (n *prefixNode) Parent() (recon.PrefixNode, bool) {
458
if key.BitLen() == 0 {
461
parentKey := NewBitstring(key.BitLen() - n.BitQuantum())
462
parentKey.SetBytes(key.Bytes())
463
parent, err := n.Node(parentKey)
465
panic(fmt.Sprintf("Failed to get parent: %v", err))
470
func (n *prefixNode) updateSvalues(z *Zp, marray []*Zp) {
471
if len(marray) != len(n.points) {
472
panic("Inconsistent NumSamples size")
474
svalues := mustDecodeZZarray(n.NodeSValues)
475
for i := 0; i < len(marray); i++ {
476
svalues[i] = Z(z.P).Mul(svalues[i], marray[i])
478
n.NodeSValues = mustEncodeZZarray(svalues)