1
// Copyright 2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
11
"github.com/juju/errors"
12
jc "github.com/juju/testing/checkers"
13
gc "gopkg.in/check.v1"
15
"github.com/juju/juju/environs"
16
"github.com/juju/juju/instance"
17
"github.com/juju/juju/network"
18
"github.com/juju/juju/status"
19
"github.com/juju/juju/testing"
20
"github.com/juju/juju/worker/workertest"
23
type aggregateSuite struct {
27
var _ = gc.Suite(&aggregateSuite{})
29
type testInstance struct {
32
addresses []network.Address
37
var _ instance.Instance = (*testInstance)(nil)
39
func (t *testInstance) Id() instance.Id {
43
func (t *testInstance) Addresses() ([]network.Address, error) {
47
return t.addresses, nil
50
func (t *testInstance) Status() instance.InstanceStatus {
51
return instance.InstanceStatus{Status: status.StatusUnknown, Message: t.status}
54
type testInstanceGetter struct {
56
// ids is set when the Instances method is called.
58
results map[instance.Id]instance.Instance
63
func (tig *testInstanceGetter) Instances(ids []instance.Id) (result []instance.Instance, err error) {
65
atomic.AddInt32(&tig.counter, 1)
66
results := make([]instance.Instance, len(ids))
67
for i, id := range ids {
68
// We don't check 'ok' here, because we want the Instance{nil}
70
results[i] = tig.results[id]
72
return results, tig.err
75
func (tig *testInstanceGetter) newTestInstance(id instance.Id, status string, addresses []string) *testInstance {
76
if tig.results == nil {
77
tig.results = make(map[instance.Id]instance.Instance)
79
thisInstance := &testInstance{
82
addresses: network.NewAddresses(addresses...),
84
tig.results[thisInstance.Id()] = thisInstance
88
// Test that one request gets sent after suitable delay.
89
func (s *aggregateSuite) TestSingleRequest(c *gc.C) {
90
// We setup a couple variables here so that we can use them locally without
91
// type assertions. Then we use them in the aggregatorConfig.
92
testGetter := new(testInstanceGetter)
93
clock := testing.NewClock(time.Now())
95
cfg := aggregatorConfig{
101
// Add a new test instance.
102
testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
104
aggregator, err := newAggregator(cfg)
105
c.Check(err, jc.ErrorIsNil)
107
// Ensure the worker is killed and cleaned up if the test exits early.
108
defer workertest.CleanKill(c, aggregator)
110
// Create a test in a goroutine and make sure we wait for it to finish.
111
var wg sync.WaitGroup
115
info, err := aggregator.instanceInfo("foo")
116
c.Check(err, jc.ErrorIsNil)
117
c.Check(info.status.Message, gc.DeepEquals, "foobar")
120
// Unwind the test clock
121
waitAlarms(c, clock, 1)
126
// Ensure we kill the worker before looking at our testInstanceGetter to
127
// ensure there's no possibility of a race.
128
workertest.CleanKill(c, aggregator)
130
ids := testGetter.ids
131
c.Assert(ids, gc.DeepEquals, []instance.Id{"foo"})
134
// Test several requests in a short space of time get batched.
135
func (s *aggregateSuite) TestMultipleResponseHandling(c *gc.C) {
136
// We setup a couple variables here so that we can use them locally without
137
// type assertions. Then we use them in the aggregatorConfig.
138
testGetter := new(testInstanceGetter)
139
clock := testing.NewClock(time.Now())
141
cfg := aggregatorConfig{
147
// Setup multiple instances to batch
148
testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
149
testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
150
testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
152
aggregator, err := newAggregator(cfg)
153
c.Check(err, jc.ErrorIsNil)
155
// Ensure the worker is killed and cleaned up if the test exits early.
156
defer workertest.CleanKill(c, aggregator)
158
// Create a closure for tests we can launch in goroutines.
159
var wg sync.WaitGroup
160
checkInfo := func(id instance.Id, expectStatus string) {
162
info, err := aggregator.instanceInfo(id)
163
c.Check(err, jc.ErrorIsNil)
164
c.Check(info.status.Message, gc.Equals, expectStatus)
167
// Launch and wait for these
169
go checkInfo("foo2", "not foobar")
170
go checkInfo("foo3", "ok-ish")
172
// Unwind the testing clock to let our requests through.
173
waitAlarms(c, clock, 2)
176
// Check we're still alive.
177
workertest.CheckAlive(c, aggregator)
179
// Wait until the tests pass.
182
// Ensure we kill the worker before looking at our testInstanceGetter to
183
// ensure there's no possibility of a race.
184
workertest.CleanKill(c, aggregator)
186
// Ensure we got our list back with the expected contents.
187
c.Assert(testGetter.ids, jc.SameContents, []instance.Id{"foo2", "foo3"})
189
// Ensure we called instances once and have no errors there.
190
c.Assert(testGetter.err, jc.ErrorIsNil)
191
c.Assert(testGetter.counter, gc.DeepEquals, int32(1))
194
// Test that advancing delay-time.Nanosecond and then killing causes all
195
// pending reqs to fail.
196
func (s *aggregateSuite) TestKillingWorkerKillsPendinReqs(c *gc.C) {
197
// Setup local variables.
198
testGetter := new(testInstanceGetter)
199
clock := testing.NewClock(time.Now())
201
cfg := aggregatorConfig{
207
testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
208
testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
209
testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
211
aggregator, err := newAggregator(cfg)
212
c.Check(err, jc.ErrorIsNil)
214
defer workertest.CleanKill(c, aggregator)
216
// Set up a couple tests we can launch.
217
var wg sync.WaitGroup
218
checkInfo := func(id instance.Id) {
220
info, err := aggregator.instanceInfo(id)
221
c.Check(err.Error(), gc.Equals, "instanceInfo call aborted")
222
c.Check(info.status.Message, gc.Equals, "")
225
// Launch a couple tests.
229
// Advance the clock and kill the worker.
230
waitAlarms(c, clock, 1)
231
clock.Advance(delay - time.Nanosecond)
236
// Make sure we're dead.
237
workertest.CheckKilled(c, aggregator)
240
// Make sure we have no ids, since we're dead.
241
c.Assert(len(testGetter.ids), gc.DeepEquals, 0)
243
// Ensure we called instances once and have no errors there.
244
c.Assert(testGetter.err, jc.ErrorIsNil)
245
c.Assert(testGetter.counter, gc.DeepEquals, int32(0))
249
// Test that having sent/advanced/received one batch, you can
250
// send/advance/receive again and that works, too.
251
func (s *aggregateSuite) TestMultipleBatches(c *gc.C) {
252
// Setup some local variables.
253
testGetter := new(testInstanceGetter)
254
clock := testing.NewClock(time.Now())
256
cfg := aggregatorConfig{
262
testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
263
testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
265
aggregator, err := newAggregator(cfg)
266
c.Check(err, jc.ErrorIsNil)
268
// Ensure the worker is killed and cleaned up if the test exits early.
269
defer workertest.CleanKill(c, aggregator)
271
// Create a checker we can launch as goroutines
272
var wg sync.WaitGroup
273
checkInfo := func(id instance.Id, expectStatus string) {
275
info, err := aggregator.instanceInfo(id)
276
c.Check(err, jc.ErrorIsNil)
277
c.Check(info.status.Message, gc.Equals, expectStatus)
280
// Launch and wait for these
282
go checkInfo("foo2", "not foobar")
283
go checkInfo("foo3", "ok-ish")
285
// Unwind the testing clock to let our requests through.
286
waitAlarms(c, clock, 2)
289
// Check we're still alive
290
workertest.CheckAlive(c, aggregator)
292
// Wait until the checkers pass
293
// TODO(redir): These could block forever, we should make the effort to be
294
// robust here per http://reviews.vapour.ws/r/4885/
297
// Ensure we got our list back with the expected length.
298
c.Assert(len(testGetter.ids), gc.DeepEquals, 2)
300
// And then a second batch
301
testGetter.newTestInstance("foo4", "spam", []string{"192.168.1.4"})
302
testGetter.newTestInstance("foo5", "eggs", []string{"192.168.1.5"})
304
// Launch and wait for this second batch
306
go checkInfo("foo4", "spam")
307
go checkInfo("foo5", "eggs")
309
for i := 0; i < 2; i++ {
310
// Unwind again to let our next batch through.
313
// // Advance the clock again.
316
// Check we're still alive
317
workertest.CheckAlive(c, aggregator)
319
// Wait until the checkers pass
322
// Shutdown the worker.
323
workertest.CleanKill(c, aggregator)
325
// Ensure we got our list back with the correct length
326
c.Assert(len(testGetter.ids), gc.DeepEquals, 2)
328
// Ensure we called instances once and have no errors there.
329
c.Assert(testGetter.err, jc.ErrorIsNil)
330
c.Assert(testGetter.counter, gc.Equals, int32(2))
333
// Test that things behave as expected when env.Instances errors.
334
func (s *aggregateSuite) TestInstancesErrors(c *gc.C) {
335
// Setup local variables.
336
testGetter := new(testInstanceGetter)
337
clock := testing.NewClock(time.Now())
338
delay := time.Millisecond
339
cfg := aggregatorConfig{
345
testGetter.newTestInstance("foo", "foobar", []string{"192.168.1.2"})
346
testGetter.err = environs.ErrNoInstances
347
aggregator, err := newAggregator(cfg)
348
c.Check(err, jc.ErrorIsNil)
350
defer workertest.CleanKill(c, aggregator)
352
// Launch test in a goroutine and wait for it.
353
var wg sync.WaitGroup
357
_, err = aggregator.instanceInfo("foo")
358
c.Assert(err, gc.Equals, environs.ErrNoInstances)
361
// Unwind to let our request through.
362
waitAlarms(c, clock, 1)
367
// Kill the worker so we know there is no race checking the erroringTestGetter.
368
workertest.CleanKill(c, aggregator)
370
c.Assert(testGetter.err, gc.Equals, environs.ErrNoInstances)
371
c.Assert(testGetter.counter, gc.Equals, int32(1))
374
func (s *aggregateSuite) TestPartialInstanceErrors(c *gc.C) {
375
testGetter := new(testInstanceGetter)
376
clock := testing.NewClock(time.Now())
379
cfg := aggregatorConfig{
385
testGetter.err = environs.ErrPartialInstances
386
testGetter.newTestInstance("foo", "not foobar", []string{"192.168.1.2"})
388
aggregator, err := newAggregator(cfg)
389
c.Check(err, jc.ErrorIsNil)
391
// Ensure the worker is killed and cleaned up if the test exits early.
392
defer workertest.CleanKill(c, aggregator)
394
// // Create a checker we can launch as goroutines
395
var wg sync.WaitGroup
396
checkInfo := func(id instance.Id, expectStatus string, expectedError error) {
398
info, err := aggregator.instanceInfo(id)
399
if expectedError == nil {
400
c.Check(err, jc.ErrorIsNil)
402
c.Check(err.Error(), gc.Equals, expectedError.Error())
404
c.Check(info.status.Message, gc.Equals, expectStatus)
407
// Launch and wait for these
409
go checkInfo("foo", "not foobar", nil)
410
go checkInfo("foo2", "", errors.New("instance foo2 not found"))
412
// Unwind the testing clock to let our requests through.
413
waitAlarms(c, clock, 2)
416
// Check we're still alive.
417
workertest.CheckAlive(c, aggregator)
419
// Wait until the checkers pass.
422
// Now kill the worker so we don't risk a race in the following assertions.
423
workertest.CleanKill(c, aggregator)
425
// Ensure we got our list back with the correct length.
426
c.Assert(len(testGetter.ids), gc.Equals, 2)
428
// Ensure we called instances once.
429
// TODO(redir): all this stuff is really crying out to be, e.g.
430
// testGetter.CheckOneCall(c, "foo", "foo2") per
431
// http://reviews.vapour.ws/r/4885/
432
c.Assert(testGetter.counter, gc.Equals, int32(1))
435
func waitAlarms(c *gc.C, clock *testing.Clock, count int) {
436
timeout := time.After(testing.LongWait)
437
for i := 0; i < count; i++ {
439
case <-clock.Alarms():
441
c.Fatalf("timed out waiting for %dth alarm set", i)