1
// launchpad.net/juju/state
3
// Copyright (c) 2011 Canonical Ltd.
7
// --------------------
9
// --------------------
13
"launchpad.net/goyaml"
14
"launchpad.net/gozk/zookeeper"
18
// --------------------
20
// --------------------
26
// --------------------
28
// --------------------
30
// systemTopology is the global topology that will be
31
// initialized with the first call of retrieveTopology().
32
// Then it will be reused and updated automatically after
33
// signals via the watch.
34
var systemTopology *topology
36
// --------------------
38
// --------------------
40
// topologyNodes represents a set of nodes of the
41
// topology informations.
42
type topologyNodes map[interface{}]interface{}
44
// newTopologyNodes creates node set with the version set.
45
func newTopologyNodes() topologyNodes {
46
tn := make(topologyNodes)
48
tn["version"] = Version
53
// find looks for a value (a topologyNode or its end value) by
54
// a given path. This path is a slice of strings, the end criteria
55
// for this recursive call is an empty path. Here the node will
57
func (tn topologyNodes) find(path []string) (interface{}, error) {
62
value, ok := tn[path[0]]
66
return nil, errors.New("topology nodes: node '" + path[0] + "' not found")
69
if v, ok := value.(map[interface{}]interface{}); ok {
71
vtn := topologyNodes(v)
73
return vtn.find(path[1:])
79
// getString retrieves the string value of a path. If it's a node
80
// an error will be returned. The path will be passed as a string
81
// with slashes as separators.
82
func (tn topologyNodes) getString(path string) (string, error) {
83
value, err := tn.find(pathToSlice(path))
89
if v, ok := value.(string); ok {
90
// It's a string, yeah.
94
return "", errors.New("topology nodes: path '" + path + "' leads to topology nodes, no string")
97
// getNodes retrieves the nodes value of a path. If it's a string
98
// an error will be returned. The path will be passed as a string
99
// with slashes as separators.
100
func (tn topologyNodes) getNodes(path string) (topologyNodes, error) {
101
value, err := tn.find(pathToSlice(path))
107
if v, ok := value.(topologyNodes); ok {
108
// It's a topologyNodes, got it.
112
return nil, errors.New("topology nodes: path '" + path + "' leads to a string, no topology nodes")
115
// searchFunc defines the signature of a function for searches inside the topology nodes.
116
// The arguments are the current path and the value. It has to return true if the search
118
type searchFunc func(path []string, value interface{}) bool
120
// search executes a search function recursively on the topology nodes. If this
121
// function returns true the full path and its value will be returned.
122
func (tn topologyNodes) search(sf searchFunc) ([]string, interface{}, error) {
123
path, value := tn.pathSearch([]string{}, sf)
127
return nil, nil, errors.New("topology nodes: search has no results")
131
return path, value, nil
134
// pathSearch is used by search and has the current path as argument.
135
func (tn topologyNodes) pathSearch(path []string, sf searchFunc) ([]string, interface{}) {
136
for key, value := range tn {
137
p := append(path, key.(string))
144
if v, ok := value.(map[interface{}]interface{}); ok {
145
// Search not yet ok, but value is a topology node.
146
vtn := topologyNodes(v)
147
dp, dv := vtn.pathSearch(p, sf)
156
return []string{}, nil
159
// pathToSlice converts a path string into a slice of strings.
160
func pathToSlice(path string) []string {
161
pathSlice := strings.Split(path, "/")
162
cleanPathSlice := []string{}
164
for _, ps := range pathSlice {
166
cleanPathSlice = append(cleanPathSlice, ps)
170
return cleanPathSlice
173
// --------------------
175
// --------------------
177
// commandFunc is a function that will be sent to the backend
178
// to analyze the topology nodes. It can return any result.
179
type commandFunc func(topologyNodes) (interface{}, error)
181
// commandResult encapsulates the result and an error for
182
// transportation over a channel.
183
type commandResult struct {
188
// command will be sent to the backend goroutine to perform
189
// the task and return the answer.
190
type command struct {
192
resultChan chan *commandResult
195
// newCommand creates a new command with the given task
196
// and a new channel for the result.
197
func newCommand(cf commandFunc) *command {
198
return &command{cf, make(chan *commandResult)}
201
// perform performs the task and returns the result
203
func (c *command) perform(tn topologyNodes) {
204
result, err := c.task(tn)
206
c.resultChan <- &commandResult{result, err}
209
// --------------------
211
// --------------------
213
// topology is an internal helper handling the topology informations
214
// inside of ZooKeeper.
215
type topology struct {
216
zkConn *zookeeper.Conn
217
zkEventChan <-chan zookeeper.Event
218
commandChan chan *command
222
// retrieveTopology connects ZooKeeper, retrieves the data as YAML,
223
// parses and stores it.
224
func retrieveTopology(zkc *zookeeper.Conn) (*topology, error) {
225
if systemTopology == nil {
226
systemTopology = &topology{
228
commandChan: make(chan *command),
229
nodes: newTopologyNodes(),
232
data, _, session, err := zkc.GetW("/topology")
238
if err = goyaml.Unmarshal([]byte(data), systemTopology.nodes); err != nil {
242
systemTopology.zkEventChan = session
244
go systemTopology.backend()
247
return systemTopology, nil
250
// getString retrieves the string value of a path. If it's a node
251
// an error will be returned. The path will be passed as a string
252
// with slashes as separators.
253
func (t *topology) getString(path string) (string, error) {
254
value, err := t.execute(func(tn topologyNodes) (interface{}, error) {
255
return tn.getString(path)
258
return value.(string), err
261
// execute sends a command function to the backend for execution. It returns
262
// the result received from the backend. This way all requests are searialized.
263
func (t *topology) execute(cf commandFunc) (interface{}, error) {
264
cmd := newCommand(cf)
268
result := <-cmd.resultChan
270
return result.result, result.err
273
// backend manages the topology as a goroutine.
274
func (t *topology) backend() {
277
case evt := <-t.zkEventChan:
278
// Change happened inside the topology.
280
// TODO: Error handling, logging!
284
case zookeeper.EVENT_CHANGED:
287
case cmd := <-t.commandChan:
288
// Perform the given command on the
295
// reload retrieves and parses the topology from ZooKeeper.
296
func (t *topology) reload() error {
297
data, _, err := t.zkConn.Get("/topology")
300
// TODO: Error handling, logging!
304
if err = goyaml.Unmarshal([]byte(data), t.nodes); err != nil {
305
// TODO: Error handling, logging!
312
// reset empties the topology.
313
func (t *topology) reset() {
314
t.nodes = newTopologyNodes()
317
// dump returns the current topology as byte slice
318
func (t *topology) dump() ([]byte, error) {
319
return goyaml.Marshal(t.nodes)