~ps-jenkins/ubuntu-push/ubuntu-vivid-proposed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/*
 Copyright 2013-2014 Canonical Ltd.

 This program is free software: you can redistribute it and/or modify it
 under the terms of the GNU General Public License version 3, as published
 by the Free Software Foundation.

 This program is distributed in the hope that it will be useful, but
 WITHOUT ANY WARRANTY; without even the implied warranties of
 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 PURPOSE.  See the GNU General Public License for more details.

 You should have received a copy of the GNU General Public License along
 with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

package bus

// Here we define the Endpoint, which represents the DBus connection itself.

import (
	"encoding/base64"
	"errors"
	"fmt"

	"launchpad.net/go-dbus/v1"

	"launchpad.net/ubuntu-push/logger"
)

/*****************************************************************
 *    Endpoint (and its implementation)
 */

type BusMethod func(string, []interface{}, []interface{}) ([]interface{}, error)
type DispatchMap map[string]BusMethod

// bus.Endpoint represents the DBus connection itself.
type Endpoint interface {
	GrabName(allowReplacement bool) <-chan error
	WatchSignal(member string, f func(...interface{}), d func()) error
	WatchMethod(DispatchMap, string, ...interface{})
	Signal(string, string, []interface{}) error
	Call(member string, args []interface{}, rvs ...interface{}) error
	GetProperty(property string) (interface{}, error)
	SetProperty(property string, suffix string, value interface{}) error
	Dial() error
	Close()
	String() string
}

type endpoint struct {
	busT  Bus
	bus   *dbus.Connection
	proxy *dbus.ObjectProxy
	addr  Address
	log   logger.Logger
}

// constructor
func newEndpoint(bus Bus, addr Address, log logger.Logger) *endpoint {
	return &endpoint{busT: bus, addr: addr, log: log}
}

// ensure endpoint implements Endpoint
var _ Endpoint = (*endpoint)(nil)

/*
   public methods

XXX:   these are almost entirely untested, as that would need
XXX:   integration tests we are currently missing.
*/

// Dial() (re)establishes the connection with dbus
//
// XXX: mostly untested
func (endp *endpoint) Dial() error {
	bus, err := dbus.Connect(endp.busT.(concreteBus).dbusType())
	if err != nil {
		return err
	}
	d := dbus.BusDaemon{bus.Object(dbus.BUS_DAEMON_NAME, dbus.BUS_DAEMON_PATH)}
	name := endp.addr.Name
	hasOwner, err := d.NameHasOwner(name)
	if err != nil {
		endp.log.Debugf("Unable to determine ownership of %#v: %v", name, err)
		bus.Close()
		return err
	}
	if !hasOwner {
		// maybe it's waiting to be activated?
		names, err := d.ListActivatableNames()
		if err != nil {
			endp.log.Debugf("%#v has no owner, and when listing activatable: %v", name, err)
			bus.Close()
			return err
		}
		found := false
		for _, name := range names {
			if name == name {
				found = true
				break
			}
		}
		if !found {
			msg := fmt.Sprintf("%#v has no owner, and not in activatables", name)
			endp.log.Debugf(msg)
			bus.Close()
			return errors.New(msg)
		}
	}
	endp.log.Infof("%#v dialed in.", name)
	endp.bus = bus
	endp.proxy = bus.Object(name, dbus.ObjectPath(endp.addr.Path))
	return nil
}

// WatchSignal() takes a member name, sets up a watch for it (on the name,
// path and interface provided when creating the endpoint), and then calls f()
// with the unpacked value. If it's unable to set up the watch it returns an
// error. If the watch fails once established, d() is called. Typically f()
// sends the values over a channel, and d() would close the channel.
//
// XXX: untested
func (endp *endpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
	watch, err := endp.proxy.WatchSignal(endp.addr.Interface, member)
	if err != nil {
		endp.log.Debugf("Failed to set up the watch: %s", err)
		return err
	}

	go endp.unpackMessages(watch, f, d, member)

	return nil
}

// Call() invokes the provided member method (on the name, path and
// interface provided when creating the endpoint). args can be built
// using bus.Args(...). The return value is unpacked into rvs before being
// returned.
//
// XXX: untested
func (endp *endpoint) Call(member string, args []interface{}, rvs ...interface{}) error {
	msg, err := endp.proxy.Call(endp.addr.Interface, member, args...)
	if err != nil {
		return err
	}
	err = msg.Args(rvs...)
	if err != nil {
		return err
	}
	return nil
}

// GetProperty uses the org.freedesktop.DBus.Properties interface's Get method
// to read a given property on the name, path and interface provided when
// creating the endpoint. The return value is unpacked into a dbus.Variant,
// and its value returned.
//
// XXX: untested
func (endp *endpoint) GetProperty(property string) (interface{}, error) {
	msg, err := endp.proxy.Call("org.freedesktop.DBus.Properties", "Get", endp.addr.Interface, property)
	if err != nil {
		return nil, err
	}
	variantvs := endp.unpackOneMsg(msg, property)
	switch len(variantvs) {
	default:
		return nil, fmt.Errorf("Too many values in Properties.Get response: %d", len(variantvs))
	case 0:
		return nil, fmt.Errorf("Not enough values in Properties.Get response: %d", len(variantvs))
	case 1:
		// carry on
	}
	variant, ok := variantvs[0].(*dbus.Variant)
	if !ok {
		return nil, fmt.Errorf("Response from Properties.Get wasn't a *dbus.Variant")
	}
	return variant.Value, nil
}

// SetProperty calls org.freedesktop.DBus.Properties's Set method
//
// XXX: untested
func (endp *endpoint) SetProperty(property string, suffix string, value interface{}) error {
	// can't use the pre-existing ObjectProxy for this one
	proxy := endp.bus.Object(endp.addr.Name, dbus.ObjectPath(endp.addr.Path+suffix))
	_, err := proxy.Call("org.freedesktop.DBus.Properties", "Set", endp.addr.Interface, property, value)
	return err
}

// Close the connection to dbus.
//
// XXX: untested
func (endp *endpoint) Close() {
	if endp.bus != nil {
		endp.bus.Close()
		endp.bus = nil
		endp.proxy = nil
	}
}

// String() performs advanced endpoint stringification
//
// XXX: untested
func (endp *endpoint) String() string {
	return fmt.Sprintf("<Connection to %s %#v>", endp.bus, endp.addr)
}

// GrabName() takes over the name on the bus, reporting errors over the
// returned channel.
//
// While the first result will be nil on success, successive results would
// typically indicate another process trying to take over the name.
//
// XXX: untested
func (endp *endpoint) GrabName(allowReplacement bool) <-chan error {
	flags := dbus.NameFlagAllowReplacement | dbus.NameFlagReplaceExisting
	if !allowReplacement {
		flags = 0
	}
	return endp.bus.RequestName(endp.addr.Name, flags).C
}

// Signal() sends out a signal called <member> containing <args>.
//
// XXX: untested
func (endp *endpoint) Signal(member string, suffix string, args []interface{}) error {
	path := dbus.ObjectPath(endp.addr.Path + suffix)
	msg := dbus.NewSignalMessage(path, endp.addr.Interface, member)
	if args != nil {
		err := msg.AppendArgs(args...)
		if err != nil {
			endp.log.Errorf("unable to build dbus signal message: %v", err)
			return err
		}
	}
	err := endp.bus.Send(msg)
	if err != nil {
		endp.log.Errorf("unable to send dbus signal: %v", err)
	} else {
		endp.log.Debugf("sent dbus signal %s(%#v)", member, args)
	}
	return nil
}

// WatchMethod() uses the given DispatchMap to answer incoming method
// calls.
//
// XXX: untested
func (endp *endpoint) WatchMethod(dispatch DispatchMap, suffix string, extra ...interface{}) {
	ch := make(chan *dbus.Message)
	go func() {
		var reply *dbus.Message

		err_iface := endp.addr.Interface + ".Error"

		for msg := range ch {
			meth, ok := dispatch[msg.Member]
			if !ok || msg.Interface != endp.addr.Interface {
				reply = dbus.NewErrorMessage(msg,
					"org.freedesktop.DBus.Error.UnknownMethod", "Unknown method")
				endp.log.Errorf("WatchMethod: unknown method %s", msg.Member)
			} else {
				args := msg.AllArgs()
				rvals, err := meth(string(msg.Path), args, extra)
				if err != nil {
					reply = dbus.NewErrorMessage(msg, err_iface, err.Error())
					endp.log.Errorf("WatchMethod: %s(%v, %#v, %#v) failure: %#v", msg.Member, msg.Path, args, extra, err)
				} else {
					var san_rvals []string
					for _, element := range rvals {
						sane := fmt.Sprintf("%v", element)
						_, err := base64.StdEncoding.DecodeString(sane)
						if err == nil {
							sane = "LooksLikeAToken=="
						}
						san_rvals = append(san_rvals, sane)
					}
					endp.log.Debugf("WatchMethod: %s(%v, %#v, %#v) success: %#v", msg.Member, msg.Path, args, extra, san_rvals)
					reply = dbus.NewMethodReturnMessage(msg)
					err = reply.AppendArgs(rvals...)
					if err != nil {
						endp.log.Errorf("WatchMethod: unable to build dbus response message: %v", err)
						reply = dbus.NewErrorMessage(msg, err_iface, err.Error())
					}
				}
			}
			err := endp.bus.Send(reply)
			if err != nil {
				endp.log.Errorf("WatchMethod: unable to send reply: %v", err)
			}

		}
	}()
	path := dbus.ObjectPath(endp.addr.Path + suffix)
	endp.bus.RegisterObjectPath(path, ch)
}

/*
   private methods
*/

// unpackOneMsg unpacks the value from the response msg
//
// XXX: untested
func (endp *endpoint) unpackOneMsg(msg *dbus.Message, member string) []interface{} {
	var varmap map[string]dbus.Variant
	if err := msg.Args(&varmap); err != nil {
		return msg.AllArgs()
	}
	return []interface{}{varmap}
}

// unpackMessages unpacks the value from the watch
//
// XXX: untested
func (endp *endpoint) unpackMessages(watch *dbus.SignalWatch, f func(...interface{}), d func(), member string) {
	for {
		msg, ok := <-watch.C
		if !ok {
			break
		}
		f(endp.unpackOneMsg(msg, member)...)
	}
	endp.log.Errorf("Got not-OK from %s watch", member)
	d()
}