1
# This file is part of the Juju GUI, which lets users view and manage Juju
2
# environments within a graphical interface (https://launchpad.net/juju-gui).
3
# Copyright (C) 2013 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU Affero General Public License version 3, as published by
7
# the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY,
11
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
# Affero General Public License for more details.
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
"""Juju GUI server test utilities."""
19
from contextlib import contextmanager
21
import multiprocessing
27
from tornado import websocket
30
from guiserver import auth
31
from guiserver.bundles import base
34
class EchoWebSocketHandler(websocket.WebSocketHandler):
35
"""A WebSocket server echoing back messages."""
37
def initialize(self, close_future, io_loop):
38
"""Echo WebSocket server initializer.
40
The handler receives a close Future and the current Tornado IO loop.
41
The close Future is fired when the connection is closed.
42
The close Future can also be used to force a connection termination by
45
self._closed_future = close_future
46
self._connected = True
47
io_loop.add_future(close_future, self.force_close)
49
def force_close(self, future):
50
"""Close the connection to the client."""
54
def on_message(self, message):
55
"""Echo back the received message."""
56
self.write_message(message, isinstance(message, bytes))
59
"""Fire the _closed_future if not already done."""
60
self._connected = False
61
if not self._closed_future.done():
62
self._closed_future.set_result(None)
65
class GoAPITestMixin(object):
66
"""Add helper methods for testing the Go API implementation."""
68
def get_auth_backend(self):
69
"""Return an authentication backend suitable for the Go API."""
70
return auth.get_backend('go')
72
def make_login_request(
73
self, request_id=42, username='user', password='passwd',
75
"""Create and return a login request message.
77
If encoded is set to True, the returned message will be JSON encoded.
80
'RequestId': request_id,
83
'Params': {'AuthTag': username, 'Password': password},
85
return json.dumps(data) if encoded else data
87
def make_login_response(
88
self, request_id=42, successful=True, encoded=False):
89
"""Create and return a login response message.
91
If encoded is set to True, the returned message will be JSON encoded.
92
By default, a successful response is returned. Set successful to False
93
to return an authentication failure.
95
data = {'RequestId': request_id, 'Response': {}}
97
data['Error'] = 'invalid entity name or password'
98
return json.dumps(data) if encoded else data
101
class PythonAPITestMixin(object):
102
"""Add helper methods for testing the Python API implementation."""
104
def get_auth_backend(self):
105
"""Return an authentication backend suitable for the Python API."""
106
return auth.get_backend('python')
108
def make_login_request(
109
self, request_id=42, username='user', password='passwd',
111
"""Create and return a login request message.
113
If encoded is set to True, the returned message will be JSON encoded.
116
'request_id': request_id,
119
'password': password,
121
return json.dumps(data) if encoded else data
123
def make_login_response(
124
self, request_id=42, successful=True, encoded=False):
125
"""Create and return a login response message.
127
If encoded is set to True, the returned message will be JSON encoded.
128
By default, a successful response is returned. Set successful to False
129
to return an authentication failure.
131
data = {'request_id': request_id, 'op': 'login'}
133
data['result'] = True
136
return json.dumps(data) if encoded else data
139
class BundlesTestMixin(object):
140
"""Add helper methods for testing the GUI server bundles support."""
142
apiurl = 'wss://api.example.com:17070'
148
charm: "cs:precise/wordpress-15"
159
charm: "cs:precise/mysql-26"
162
"binlog-format": MIXED
164
"dataset-size": "80%"
167
"ha-mcastport": "5411"
168
"max-connections": "-1"
169
"preferred-storage-engine": InnoDB
170
"query-cache-size": "-1"
171
"query-cache-type": "OFF"
173
"tuning-level": safest
185
def get_name_and_bundle(self):
186
"""Return a tuple (bundle name, contents) parsing self.bundle."""
187
all_contents = yaml.load(self.bundle)
188
return all_contents.items()[0]
190
def make_deployer(self, apiversion=base.SUPPORTED_API_VERSIONS[0]):
191
"""Create and return a Deployer instance."""
192
return base.Deployer(self.apiurl, apiversion)
194
def make_view_request(self, params=None, is_authenticated=True):
195
"""Create and return a mock request to be passed to bundle views.
197
The resulting request contains the given parameters and a
198
guiserver.auth.User instance.
199
If is_authenticated is True, the user in the request is logged in.
204
username='user', password='passwd',
205
is_authenticated=is_authenticated)
206
return mock.Mock(params=params, user=user)
208
def make_deployment_request(
209
self, request, request_id=42, params=None, encoded=False):
210
"""Create and return a deployment request message.
212
If encoded is set to True, the returned message will be JSON encoded.
215
'Import': {'Name': 'bundle', 'YAML': 'bundle: contents'},
216
'Watch': {'DeploymentId': 0},
217
'Next': {'WatcherId': 0},
221
params = defaults[request]
223
'RequestId': request_id,
228
return json.dumps(data) if encoded else data
230
def make_deployment_response(
231
self, request_id=42, response=None, error=None, encoded=False):
232
"""Create and return a deployment response message.
234
If encoded is set to True, the returned message will be JSON encoded.
238
data = {'RequestId': request_id, 'Response': response}
239
if error is not None:
240
data['Error'] = error
241
return json.dumps(data) if encoded else data
243
def patch_validate(self, side_effect=None):
244
"""Mock the blocking validate function."""
245
mock_validate = MultiProcessMock(side_effect=side_effect)
246
return mock.patch('guiserver.bundles.blocking.validate', mock_validate)
248
def patch_import_bundle(self, side_effect=None):
249
"""Mock the blocking import_bundle function."""
250
mock_import_bundle = MultiProcessMock(side_effect=side_effect)
251
import_bundle_path = 'guiserver.bundles.blocking.import_bundle'
252
return mock.patch(import_bundle_path, mock_import_bundle)
255
class WSSTestMixin(object):
256
"""Add some helper methods for testing secure WebSocket handlers."""
258
def get_wss_url(self, path):
259
"""Return an absolute secure WebSocket url for the given path."""
260
return 'wss://localhost:{}{}'.format(self.get_http_port(), path)
263
class MultiProcessMock(object):
264
"""Return a callable mock object to be used across multiple processes.
266
In a multiprocess context the usual mock.Mock() does not work as expected:
267
see <https://code.google.com/p/mock/issues/detail?id=139>.
269
Help sharing call info between separate processes, and ensuring that the
270
callable is called in a separate process.
271
Note that only self.__call__() must be executed in a separate process: all
272
the other methods are supposed to be called in the main process.
275
def __init__(self, side_effect=None):
276
"""Initialize the mock object.
278
Calling this object will return side_effect if it is not an exception.
279
If otherwise side_effect is an exception, that error will be raised.
281
# When testing across multiple processes, a SIGPIPE can intermittently
282
# generate a broken pipe IOError. In order to avoid that, restore the
283
# default handler for the SIGPIPE signal when initializing this mock.
284
# See <http://bugs.python.org/issue1652>.
285
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
286
self.side_effect = side_effect
287
manager = multiprocessing.Manager()
288
self.queue = manager.Queue()
292
def __call__(self, *args, **kwargs):
293
"""Return or raise self.side_effect.
295
This method is supposed to be called in a separate process.
297
side_effect = self.side_effect
298
self.queue.put((os.getpid(), args, kwargs))
299
if isinstance(side_effect, Exception):
303
def _consume_queue(self):
304
"""Collect info about how this mock has been called."""
305
while not self.queue.empty():
306
pid, args, kwargs = self.queue.get()
307
self._call_pids.append(pid)
308
self._call_args.append((args, kwargs))
312
"""Return a list of (args, kwargs) tuples.
314
Each pair in the list represents the arguments of a single call.
316
self._consume_queue()
317
return list(self._call_args)
320
def call_count(self):
321
"""Return the number of times this mock has been called."""
322
return len(self.call_args)
324
def assert_called_once_with(self, *args, **kwargs):
325
"""Ensure this mock has been called once with the given arguments."""
327
call_count = self.call_count
328
if self.call_count != 1:
329
error = 'Expected to be called once. Called {} times.'
330
raise AssertionError(error.format(call_count))
332
expected = (args, kwargs)
333
obtained = self._call_args[0]
334
if expected != obtained:
336
'Called with different arguments.\n'
337
'Expected: {}\nObtained: {}'.format(expected, obtained)
339
raise AssertionError(error)
341
def assert_called_in_a_separate_process(self):
342
"""Ensure this object was called in a separate process."""
343
assert self.call_count, 'Not even called.'
344
pid = self._call_pids[-1]
345
assert pid != os.getpid(), 'Called in the same process: {}'.format(pid)
348
class TestMultiProcessMock(unittest.TestCase):
350
def call(self, function, *args, **kwargs):
351
"""Execute the given callable in a separate process.
353
Pass the given args and kwargs to the callable.
355
process = multiprocessing.Process(
356
target=function, args=args, kwargs=kwargs)
361
def assert_error(self, error):
362
"""Ensure an AssertionError is raised in the context block.
364
Also check the error message is the expected one.
366
with self.assertRaises(AssertionError) as context_manager:
368
self.assertEqual(error, str(context_manager.exception))
370
def test_not_called(self):
371
# If the mock object has not been called, both assertions fail.
372
mock_callable = MultiProcessMock()
373
# The assert_called_once_with assertion fails.
374
with self.assert_error('Expected to be called once. Called 0 times.'):
375
mock_callable.assert_called_once_with()
376
# The assert_called_in_a_separate_process assertion fails.
377
with self.assert_error('Not even called.'):
378
mock_callable.assert_called_in_a_separate_process()
381
# The mock object can be called in a separate process.
382
mock_callable = MultiProcessMock()
383
self.call(mock_callable)
384
mock_callable.assert_called_once_with()
385
mock_callable.assert_called_in_a_separate_process()
387
def test_call_same_process(self):
388
# The mock object knows if it has been called in the main process.
389
mock_callable = MultiProcessMock()
391
mock_callable.assert_called_once_with()
393
with self.assert_error('Called in the same process: {}'.format(pid)):
394
mock_callable.assert_called_in_a_separate_process()
396
def test_multiple_calls(self):
397
# The assert_called_once_with assertion fails if the mock object has
398
# been called multiple times.
399
mock_callable = MultiProcessMock()
402
with self.assert_error('Expected to be called once. Called 2 times.'):
403
mock_callable.assert_called_once_with()
405
def test_call_args(self):
406
# The mock object call arguments can be inspected.
407
mock_callable = MultiProcessMock()
408
self.call(mock_callable, 1, 2, foo='bar')
409
self.assertEqual([((1, 2), {'foo': 'bar'})], mock_callable.call_args)
411
def test_multiple_call_args(self):
412
# Call arguments are collected for each call.
413
mock_callable = MultiProcessMock()
414
self.call(mock_callable, 1)
415
self.call(mock_callable, 2, foo=None)
418
((2,), {'foo': None})
420
self.assertEqual(expected, mock_callable.call_args)
422
def test_call_count(self):
423
# The number of calls are correctly tracked.
424
mock_callable = MultiProcessMock()
425
self.call(mock_callable)
426
self.assertEqual(1, mock_callable.call_count)
427
self.call(mock_callable, 1, 2)
428
self.assertEqual(2, mock_callable.call_count)
430
self.assertEqual(3, mock_callable.call_count)
432
def test_default_return_value(self):
433
# The mock object returns None by default.
434
mock_callable = MultiProcessMock()
435
self.assertIsNone(mock_callable())
437
def test_customized_return_value(self):
438
# The mock object can be configured to return a customized value.
439
mock_callable = MultiProcessMock(side_effect='my-value')
440
self.assertEqual('my-value', mock_callable())
442
def test_raise_error(self):
443
# The mock object can be configured to raise an exception.
444
mock_callable = MultiProcessMock(side_effect=ValueError())
445
self.assertRaises(ValueError, mock_callable)