~themue/pyjuju/go-state-auth

« back to all changes in this revision

Viewing changes to state/topology.go

  • Committer: Frank Mueller
  • Date: 2011-12-20 10:52:07 UTC
  • mto: This revision was merged to the branch mainline in revision 33.
  • Revision ID: frank.mueller@canonical.com-20111220105207-4trmfrtjxzqu3nq5
Initial adding of state for review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// launchpad.net/juju/state
 
2
//
 
3
// Copyright (c) 2011 Canonical Ltd.
 
4
 
 
5
package state
 
6
 
 
7
// --------------------
 
8
// IMPORT
 
9
// --------------------
 
10
 
 
11
import (
 
12
        "errors"
 
13
        "launchpad.net/goyaml"
 
14
        "launchpad.net/gozk/zookeeper"
 
15
        "strings"
 
16
)
 
17
 
 
18
// --------------------
 
19
// CONST
 
20
// --------------------
 
21
 
 
22
const (
 
23
        Version = 1
 
24
)
 
25
 
 
26
// --------------------
 
27
// GLOBAL VARIABLES
 
28
// --------------------
 
29
 
 
30
// systemTopology is the global topology that will be
 
31
// initialized with the first call of retrieveTopology().
 
32
// Then it will be reused and updated automatically after
 
33
// signals via the watch.
 
34
var systemTopology *topology
 
35
 
 
36
// --------------------
 
37
// TOPOLOGY NODES
 
38
// --------------------
 
39
 
 
40
// topologyNodes represents a set of nodes of the
 
41
// topology informations.
 
42
type topologyNodes map[interface{}]interface{}
 
43
 
 
44
// newTopologyNodes creates node set with the version set.
 
45
func newTopologyNodes() topologyNodes {
 
46
        tn := make(topologyNodes)
 
47
 
 
48
        tn["version"] = Version
 
49
 
 
50
        return tn
 
51
}
 
52
 
 
53
// find looks for a value (a topologyNode or its end value) by
 
54
// a given path. This path is a slice of strings, the end criteria
 
55
// for this recursive call is an empty path. Here the node will
 
56
// be returned.
 
57
func (tn topologyNodes) find(path []string) (interface{}, error) {
 
58
        if len(path) == 0 {
 
59
                return tn, nil
 
60
        }
 
61
 
 
62
        value, ok := tn[path[0]]
 
63
 
 
64
        if !ok {
 
65
                // Not found!
 
66
                return nil, errors.New("topology nodes: node '" + path[0] + "' not found")
 
67
        }
 
68
 
 
69
        if v, ok := value.(map[interface{}]interface{}); ok {
 
70
                // More nodes.
 
71
                vtn := topologyNodes(v)
 
72
 
 
73
                return vtn.find(path[1:])
 
74
        }
 
75
 
 
76
        return value, nil
 
77
}
 
78
 
 
79
// getString retrieves the string value of a path. If it's a node
 
80
// an error will be returned. The path will be passed as a string
 
81
// with slashes as separators.
 
82
func (tn topologyNodes) getString(path string) (string, error) {
 
83
        value, err := tn.find(pathToSlice(path))
 
84
 
 
85
        if err != nil {
 
86
                return "", err
 
87
        }
 
88
 
 
89
        if v, ok := value.(string); ok {
 
90
                // It's a string, yeah.
 
91
                return v, nil
 
92
        }
 
93
 
 
94
        return "", errors.New("topology nodes: path '" + path + "' leads to topology nodes, no string")
 
95
}
 
96
 
 
97
// getNodes retrieves the nodes value of a path. If it's a string
 
98
// an error will be returned. The path will be passed as a string
 
99
// with slashes as separators.
 
100
func (tn topologyNodes) getNodes(path string) (topologyNodes, error) {
 
101
        value, err := tn.find(pathToSlice(path))
 
102
 
 
103
        if err != nil {
 
104
                return nil, err
 
105
        }
 
106
 
 
107
        if v, ok := value.(topologyNodes); ok {
 
108
                // It's a topologyNodes, got it.
 
109
                return v, nil
 
110
        }
 
111
 
 
112
        return nil, errors.New("topology nodes: path '" + path + "' leads to a string, no topology nodes")
 
113
}
 
114
 
 
115
// searchFunc defines the signature of a function for searches inside the topology nodes.
 
116
// The arguments are the current path and the value. It has to return true if the search
 
117
// matches.
 
118
type searchFunc func(path []string, value interface{}) bool
 
119
 
 
120
// search executes a search function recursively on the topology nodes. If this
 
121
// function returns true the full path and its value will be returned.
 
122
func (tn topologyNodes) search(sf searchFunc) ([]string, interface{}, error) {
 
123
        path, value := tn.pathSearch([]string{}, sf)
 
124
 
 
125
        if len(path) == 0 {
 
126
                // Nothing found!
 
127
                return nil, nil, errors.New("topology nodes: search has no results")
 
128
        }
 
129
 
 
130
        // Success, yay!
 
131
        return path, value, nil
 
132
}
 
133
 
 
134
// pathSearch is used by search and has the current path as argument.
 
135
func (tn topologyNodes) pathSearch(path []string, sf searchFunc) ([]string, interface{}) {
 
136
        for key, value := range tn {
 
137
                p := append(path, key.(string))
 
138
 
 
139
                if sf(p, value) {
 
140
                        // Found it.
 
141
                        return p, value
 
142
                }
 
143
 
 
144
                if v, ok := value.(map[interface{}]interface{}); ok {
 
145
                        // Search not yet ok, but value is a topology node.
 
146
                        vtn := topologyNodes(v)
 
147
                        dp, dv := vtn.pathSearch(p, sf)
 
148
 
 
149
                        if len(dp) > 0 {
 
150
                                return dp, dv
 
151
                        }
 
152
                }
 
153
        }
 
154
 
 
155
        // Found nothing.
 
156
        return []string{}, nil
 
157
}
 
158
 
 
159
// pathToSlice converts a path string into a slice of strings.
 
160
func pathToSlice(path string) []string {
 
161
        pathSlice := strings.Split(path, "/")
 
162
        cleanPathSlice := []string{}
 
163
 
 
164
        for _, ps := range pathSlice {
 
165
                if len(ps) > 0 {
 
166
                        cleanPathSlice = append(cleanPathSlice, ps)
 
167
                }
 
168
        }
 
169
 
 
170
        return cleanPathSlice
 
171
}
 
172
 
 
173
// --------------------
 
174
// COMMAND
 
175
// --------------------
 
176
 
 
177
// commandFunc is a function that will be sent to the backend
 
178
// to analyze the topology nodes. It can return any result.
 
179
type commandFunc func(topologyNodes) (interface{}, error)
 
180
 
 
181
// commandResult encapsulates the result and an error for
 
182
// transportation over a channel.
 
183
type commandResult struct {
 
184
        result interface{}
 
185
        err    error
 
186
}
 
187
 
 
188
// command will be sent to the backend goroutine to perform
 
189
// the task and return the answer.
 
190
type command struct {
 
191
        task       commandFunc
 
192
        resultChan chan *commandResult
 
193
}
 
194
 
 
195
// newCommand creates a new command with the given task
 
196
// and a new channel for the result.
 
197
func newCommand(cf commandFunc) *command {
 
198
        return &command{cf, make(chan *commandResult)}
 
199
}
 
200
 
 
201
// perform performs the task and returns the result
 
202
// via the channel.
 
203
func (c *command) perform(tn topologyNodes) {
 
204
        result, err := c.task(tn)
 
205
 
 
206
        c.resultChan <- &commandResult{result, err}
 
207
}
 
208
 
 
209
// --------------------
 
210
// TOPOLOGY
 
211
// --------------------
 
212
 
 
213
// topology is an internal helper handling the topology informations
 
214
// inside of ZooKeeper.
 
215
type topology struct {
 
216
        zkConn      *zookeeper.Conn
 
217
        zkEventChan <-chan zookeeper.Event
 
218
        commandChan chan *command
 
219
        nodes       topologyNodes
 
220
}
 
221
 
 
222
// retrieveTopology connects ZooKeeper, retrieves the data as YAML,
 
223
// parses and stores it.
 
224
func retrieveTopology(zkc *zookeeper.Conn) (*topology, error) {
 
225
        if systemTopology == nil {
 
226
                systemTopology = &topology{
 
227
                        zkConn:      zkc,
 
228
                        commandChan: make(chan *command),
 
229
                        nodes:       newTopologyNodes(),
 
230
                }
 
231
 
 
232
                data, _, session, err := zkc.GetW("/topology")
 
233
 
 
234
                if err != nil {
 
235
                        return nil, err
 
236
                }
 
237
 
 
238
                if err = goyaml.Unmarshal([]byte(data), systemTopology.nodes); err != nil {
 
239
                        return nil, err
 
240
                }
 
241
 
 
242
                systemTopology.zkEventChan = session
 
243
 
 
244
                go systemTopology.backend()
 
245
        }
 
246
 
 
247
        return systemTopology, nil
 
248
}
 
249
 
 
250
// getString retrieves the string value of a path. If it's a node
 
251
// an error will be returned. The path will be passed as a string
 
252
// with slashes as separators.
 
253
func (t *topology) getString(path string) (string, error) {
 
254
        value, err := t.execute(func(tn topologyNodes) (interface{}, error) {
 
255
                return tn.getString(path)
 
256
        })
 
257
 
 
258
        return value.(string), err
 
259
}
 
260
 
 
261
// execute sends a command function to the backend for execution. It returns 
 
262
// the result received from the backend. This way all requests are searialized.
 
263
func (t *topology) execute(cf commandFunc) (interface{}, error) {
 
264
        cmd := newCommand(cf)
 
265
 
 
266
        t.commandChan <- cmd
 
267
 
 
268
        result := <-cmd.resultChan
 
269
 
 
270
        return result.result, result.err
 
271
}
 
272
 
 
273
// backend manages the topology as a goroutine.
 
274
func (t *topology) backend() {
 
275
        for {
 
276
                select {
 
277
                case evt := <-t.zkEventChan:
 
278
                        // Change happened inside the topology.
 
279
                        if !evt.Ok() {
 
280
                                // TODO: Error handling, logging!
 
281
                        }
 
282
 
 
283
                        switch evt.Type {
 
284
                        case zookeeper.EVENT_CHANGED:
 
285
                                t.reload()
 
286
                        }
 
287
                case cmd := <-t.commandChan:
 
288
                        // Perform the given command on the
 
289
                        // topology nodes.
 
290
                        cmd.perform(t.nodes)
 
291
                }
 
292
        }
 
293
}
 
294
 
 
295
// reload retrieves and parses the topology from ZooKeeper.
 
296
func (t *topology) reload() error {
 
297
        data, _, err := t.zkConn.Get("/topology")
 
298
 
 
299
        if err != nil {
 
300
                // TODO: Error handling, logging!
 
301
                return err
 
302
        }
 
303
 
 
304
        if err = goyaml.Unmarshal([]byte(data), t.nodes); err != nil {
 
305
                // TODO: Error handling, logging!
 
306
                return err
 
307
        }
 
308
 
 
309
        return nil
 
310
}
 
311
 
 
312
// reset empties the topology.
 
313
func (t *topology) reset() {
 
314
        t.nodes = newTopologyNodes()
 
315
}
 
316
 
 
317
// dump returns the current topology as byte slice
 
318
func (t *topology) dump() ([]byte, error) {
 
319
        return goyaml.Marshal(t.nodes)
 
320
}
 
321
 
 
322
// EOF