~veebers/autopilot/fix_1178014

« back to all changes in this revision

Viewing changes to autopilot/tests/__init__.py

  • Committer: Thomi Richards
  • Date: 2012-05-06 22:45:27 UTC
  • Revision ID: thomi.richards@canonical.com-20120506224527-xh6wixqiw0rarkmh
Imported code from unity.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Autopilot tests for Unity.
 
3
"""
 
4
 
 
5
 
 
6
from compizconfig import Setting, Plugin
 
7
from dbus import DBusException
 
8
import logging
 
9
import os
 
10
from StringIO import StringIO
 
11
from subprocess import call, check_output, Popen, PIPE, STDOUT
 
12
from tempfile import mktemp
 
13
from testscenarios import TestWithScenarios
 
14
from testtools import TestCase
 
15
from testtools.content import text_content
 
16
from testtools.matchers import Equals
 
17
import time
 
18
 
 
19
from autopilot.emulators.bamf import Bamf
 
20
from autopilot.emulators.unity import (
 
21
    set_log_severity,
 
22
    start_log_to_file,
 
23
    reset_logging,
 
24
    )
 
25
from autopilot.emulators.unity.dash import Dash
 
26
from autopilot.emulators.unity.hud import Hud
 
27
from autopilot.emulators.unity.launcher import LauncherController
 
28
from autopilot.emulators.unity.panel import PanelController
 
29
from autopilot.emulators.unity.switcher import Switcher
 
30
from autopilot.emulators.unity.window_manager import WindowManager
 
31
from autopilot.emulators.unity.workspace import WorkspaceManager
 
32
from autopilot.emulators.X11 import ScreenGeometry, Keyboard, Mouse, reset_display
 
33
from autopilot.glibrunner import GlibRunner
 
34
from autopilot.globals import (global_context,
 
35
    video_recording_enabled,
 
36
    video_record_directory,
 
37
    )
 
38
from autopilot.keybindings import KeybindingsHelper
 
39
 
 
40
 
 
41
logger = logging.getLogger(__name__)
 
42
 
 
43
 
 
44
try:
 
45
    from testscenarios.scenarios import multiply_scenarios
 
46
except ImportError:
 
47
    from itertools import product
 
48
    def multiply_scenarios(*scenarios):
 
49
        """Multiply two or more iterables of scenarios.
 
50
 
 
51
        It is safe to pass scenario generators or iterators.
 
52
 
 
53
        :returns: A list of compound scenarios: the cross-product of all
 
54
            scenarios, with the names concatenated and the parameters
 
55
            merged together.
 
56
        """
 
57
        result = []
 
58
        scenario_lists = map(list, scenarios)
 
59
        for combination in product(*scenario_lists):
 
60
            names, parameters = zip(*combination)
 
61
            scenario_name = ','.join(names)
 
62
            scenario_parameters = {}
 
63
            for parameter in parameters:
 
64
                scenario_parameters.update(parameter)
 
65
            result.append((scenario_name, scenario_parameters))
 
66
        return result
 
67
 
 
68
 
 
69
class LoggedTestCase(TestWithScenarios, TestCase):
 
70
    """Initialize the logging for the test case."""
 
71
 
 
72
    def setUp(self):
 
73
        self._setUpTestLogging()
 
74
        self._setUpUnityLogging()
 
75
        # The reason that the super setup is done here is due to making sure
 
76
        # that the logging is properly set up prior to calling it.
 
77
        super(LoggedTestCase, self).setUp()
 
78
 
 
79
    def _setUpTestLogging(self):
 
80
        class MyFormatter(logging.Formatter):
 
81
 
 
82
            def formatTime(self, record, datefmt=None):
 
83
                ct = self.converter(record.created)
 
84
                if datefmt:
 
85
                    s = time.strftime(datefmt, ct)
 
86
                else:
 
87
                    t = time.strftime("%H:%M:%S", ct)
 
88
                    s = "%s.%03d" % (t, record.msecs)
 
89
                return s
 
90
 
 
91
        self._log_buffer = StringIO()
 
92
        root_logger = logging.getLogger()
 
93
        root_logger.setLevel(logging.DEBUG)
 
94
        handler = logging.StreamHandler(stream=self._log_buffer)
 
95
        log_format = "%(asctime)s %(levelname)s %(module)s:%(lineno)d - %(message)s"
 
96
        handler.setFormatter(MyFormatter(log_format))
 
97
        root_logger.addHandler(handler)
 
98
        #Tear down logging in a cleanUp handler, so it's done after all other
 
99
        # tearDown() calls and cleanup handlers.
 
100
        self.addCleanup(self._tearDownLogging)
 
101
 
 
102
    def _tearDownLogging(self):
 
103
        logger = logging.getLogger()
 
104
        for handler in logger.handlers:
 
105
            handler.flush()
 
106
            self._log_buffer.seek(0)
 
107
            self.addDetail('test-log', text_content(self._log_buffer.getvalue()))
 
108
            logger.removeHandler(handler)
 
109
        # Calling del to remove the handler and flush the buffer.  We are
 
110
        # abusing the log handlers here a little.
 
111
        del self._log_buffer
 
112
 
 
113
    def _setUpUnityLogging(self):
 
114
        self._unity_log_file_name = mktemp(prefix=self.shortDescription())
 
115
        start_log_to_file(self._unity_log_file_name)
 
116
        self.addCleanup(self._tearDownUnityLogging)
 
117
 
 
118
    def _tearDownUnityLogging(self):
 
119
        # If unity dies, our dbus interface has gone, and reset_logging will fail
 
120
        # but we still want our log, so we ignore any errors.
 
121
        try:
 
122
            reset_logging()
 
123
        except DBusException:
 
124
            pass
 
125
        with open(self._unity_log_file_name) as unity_log:
 
126
            self.addDetail('unity-log', text_content(unity_log.read()))
 
127
        os.remove(self._unity_log_file_name)
 
128
        self._unity_log_file_name = ""
 
129
 
 
130
    def set_unity_log_level(self, component, level):
 
131
        """Set the unity log level for 'component' to 'level'.
 
132
 
 
133
        Valid levels are: TRACE, DEBUG, INFO, WARNING and ERROR.
 
134
 
 
135
        Components are dotted unity component names. The empty string specifies
 
136
        the root logging component.
 
137
        """
 
138
        set_log_severity(component, level)
 
139
 
 
140
 
 
141
class VideoCapturedTestCase(LoggedTestCase):
 
142
    """Video capture autopilot tests, saving the results if the test failed."""
 
143
 
 
144
    _recording_app = '/usr/bin/recordmydesktop'
 
145
    _recording_opts = ['--no-sound', '--no-frame', '-o',]
 
146
 
 
147
    def setUp(self):
 
148
        super(VideoCapturedTestCase, self).setUp()
 
149
        global video_recording_enabled
 
150
        if video_recording_enabled and not self._have_recording_app():
 
151
            video_recording_enabled = False
 
152
            logger.warning("Disabling video capture since '%s' is not present", self._recording_app)
 
153
 
 
154
        if video_recording_enabled:
 
155
            self._test_passed = True
 
156
            self.addOnException(self._on_test_failed)
 
157
            self.addCleanup(self._stop_video_capture)
 
158
            self._start_video_capture()
 
159
 
 
160
    def _have_recording_app(self):
 
161
        return os.path.exists(self._recording_app)
 
162
 
 
163
    def _start_video_capture(self):
 
164
        args = self._get_capture_command_line()
 
165
        self._capture_file = self._get_capture_output_file()
 
166
        self._ensure_directory_exists_but_not_file(self._capture_file)
 
167
        args.append(self._capture_file)
 
168
        logger.debug("Starting: %r", args)
 
169
        self._capture_process = Popen(args, stdout=PIPE, stderr=STDOUT)
 
170
 
 
171
    def _stop_video_capture(self):
 
172
        """Stop the video capture. If the test failed, save the resulting file."""
 
173
 
 
174
        if self._test_passed:
 
175
            # We use kill here because we don't want the recording app to start
 
176
            # encoding the video file (since we're removing it anyway.)
 
177
            self._capture_process.kill()
 
178
            self._capture_process.wait()
 
179
        else:
 
180
            self._capture_process.terminate()
 
181
            self._capture_process.wait()
 
182
            if self._capture_process.returncode != 0:
 
183
                self.addDetail('video capture log', text_content(self._capture_process.stdout.read()))
 
184
        self._capture_process = None
 
185
 
 
186
    def _get_capture_command_line(self):
 
187
        return [self._recording_app] + self._recording_opts
 
188
 
 
189
    def _get_capture_output_file(self):
 
190
        return os.path.join(video_record_directory, '%s.ogv' % (self.shortDescription()))
 
191
 
 
192
    def _ensure_directory_exists_but_not_file(self, file_path):
 
193
        dirpath = os.path.dirname(file_path)
 
194
        if not os.path.exists(dirpath):
 
195
            os.makedirs(dirpath)
 
196
        elif os.path.exists(file_path):
 
197
            logger.warning("Video capture file '%s' already exists, deleting.", file_path)
 
198
            os.remove(file_path)
 
199
 
 
200
    def _on_test_failed(self, ex_info):
 
201
        """Called when a test fails."""
 
202
        self._test_passed = False
 
203
 
 
204
 
 
205
class AutopilotTestCase(VideoCapturedTestCase, KeybindingsHelper):
 
206
    """Wrapper around testtools.TestCase that takes care of some cleaning."""
 
207
 
 
208
    run_test_with = GlibRunner
 
209
 
 
210
    KNOWN_APPS = {
 
211
        'Character Map' : {
 
212
            'desktop-file': 'gucharmap.desktop',
 
213
            'process-name': 'gucharmap',
 
214
            },
 
215
        'Calculator' : {
 
216
            'desktop-file': 'gcalctool.desktop',
 
217
            'process-name': 'gcalctool',
 
218
            },
 
219
        'Mahjongg' : {
 
220
            'desktop-file': 'mahjongg.desktop',
 
221
            'process-name': 'mahjongg',
 
222
            },
 
223
        'Remmina' : {
 
224
            'desktop-file': 'remmina.desktop',
 
225
            'process-name': 'remmina',
 
226
            },
 
227
        'System Settings' : {
 
228
            'desktop-file': 'gnome-control-center.desktop',
 
229
            'process-name': 'gnome-control-center',
 
230
            },
 
231
        'Text Editor' : {
 
232
            'desktop-file': 'gedit.desktop',
 
233
            'process-name': 'gedit',
 
234
            },
 
235
        }
 
236
 
 
237
    def setUp(self):
 
238
        super(AutopilotTestCase, self).setUp()
 
239
        self.bamf = Bamf()
 
240
        self.keyboard = Keyboard()
 
241
        self.mouse = Mouse()
 
242
        self.dash = Dash()
 
243
        self.hud = Hud()
 
244
        self.launcher = self._get_launcher_controller()
 
245
        self.panels = self._get_panel_controller()
 
246
        self.switcher = Switcher()
 
247
        self.window_manager = self._get_window_manager()
 
248
        self.workspace = WorkspaceManager()
 
249
        self.screen_geo = ScreenGeometry()
 
250
        self.addCleanup(self.workspace.switch_to, self.workspace.current_workspace)
 
251
        self.addCleanup(Keyboard.cleanup)
 
252
        self.addCleanup(Mouse.cleanup)
 
253
 
 
254
    def start_app(self, app_name, files=[], locale=None):
 
255
        """Start one of the known apps, and kill it on tear down.
 
256
 
 
257
        If files is specified, start the application with the specified files.
 
258
        If locale is specified, the locale will be set when the application is launched.
 
259
 
 
260
        The method returns the BamfApplication instance.
 
261
 
 
262
        """
 
263
        if locale:
 
264
            os.putenv("LC_ALL", locale)
 
265
            self.addCleanup(os.unsetenv, "LC_ALL")
 
266
            logger.info("Starting application '%s' with files %r in locale %s", app_name, files, locale)
 
267
        else:
 
268
            logger.info("Starting application '%s' with files %r", app_name, files)
 
269
 
 
270
        app = self.KNOWN_APPS[app_name]
 
271
        self.bamf.launch_application(app['desktop-file'], files)
 
272
        apps = self.bamf.get_running_applications_by_desktop_file(app['desktop-file'])
 
273
        self.addCleanup(call, "kill `pidof %s`" % (app['process-name']), shell=True)
 
274
        self.assertThat(len(apps), Equals(1))
 
275
        return apps[0]
 
276
 
 
277
    def close_all_app(self, app_name):
 
278
        """Close all instances of the app_name."""
 
279
        app = self.KNOWN_APPS[app_name]
 
280
        self.addCleanup(call, "kill `pidof %s`" % (app['process-name']), shell=True)
 
281
        super(LoggedTestCase, self).tearDown()
 
282
 
 
283
    def get_app_instances(self, app_name):
 
284
        """Get BamfApplication instances for app_name."""
 
285
        desktop_file = self.KNOWN_APPS[app_name]['desktop-file']
 
286
        return self.bamf.get_running_applications_by_desktop_file(desktop_file)
 
287
 
 
288
    def app_is_running(self, app_name):
 
289
        """Returns true if an instance of the application is running."""
 
290
        apps = self.get_app_instances(app_name)
 
291
        return len(apps) > 0
 
292
 
 
293
    def call_gsettings_cmd(self, command, schema, *args):
 
294
        """Set a desktop wide gsettings option
 
295
 
 
296
        Using the gsettings command because there's a bug with importing
 
297
        from gobject introspection and pygtk2 simultaneously, and the Xlib
 
298
        keyboard layout bits are very unweildy. This seems like the best
 
299
        solution, even a little bit brutish.
 
300
        """
 
301
        cmd = ['gsettings', command, schema] + list(args)
 
302
        # strip to remove the trailing \n.
 
303
        ret = check_output(cmd).strip()
 
304
        time.sleep(5)
 
305
        reset_display()
 
306
        return ret
 
307
 
 
308
    def set_unity_option(self, option_name, option_value):
 
309
        """Set an option in the unity compiz plugin options.
 
310
 
 
311
        The value will be set for the current test only.
 
312
 
 
313
        """
 
314
        self.set_compiz_option("unityshell", option_name, option_value)
 
315
 
 
316
    def set_compiz_option(self, plugin_name, setting_name, setting_value):
 
317
        """Set setting `setting_name` in compiz plugin `plugin_name` to value `setting_value`
 
318
        for one test only.
 
319
        """
 
320
        old_value = self._set_compiz_option(plugin_name, setting_name, setting_value)
 
321
        self.addCleanup(self._set_compiz_option, plugin_name, setting_name, old_value)
 
322
        # Allow unity time to respond to the new setting.
 
323
        time.sleep(0.5)
 
324
 
 
325
    def _set_compiz_option(self, plugin_name, option_name, option_value):
 
326
        logger.info("Setting compiz option '%s' in plugin '%s' to %r",
 
327
            option_name, plugin_name, option_value)
 
328
        plugin = Plugin(global_context, plugin_name)
 
329
        setting = Setting(plugin, option_name)
 
330
        old_value = setting.Value
 
331
        setting.Value = option_value
 
332
        global_context.Write()
 
333
        return old_value
 
334
 
 
335
    def _get_launcher_controller(self):
 
336
        controllers = LauncherController.get_all_instances()
 
337
        self.assertThat(len(controllers), Equals(1))
 
338
        return controllers[0]
 
339
 
 
340
    def _get_panel_controller(self):
 
341
        controllers = PanelController.get_all_instances()
 
342
        self.assertThat(len(controllers), Equals(1))
 
343
        return controllers[0]
 
344
 
 
345
    def _get_window_manager(self):
 
346
        managers = WindowManager.get_all_instances()
 
347
        self.assertThat(len(managers), Equals(1))
 
348
        return managers[0]
 
349
 
 
350
    def assertVisibleWindowStack(self, stack_start):
 
351
        """Check that the visible window stack starts with the windows passed in.
 
352
 
 
353
        The start_stack is an iterable of BamfWindow objects.
 
354
        Minimised windows are skipped.
 
355
 
 
356
        """
 
357
        stack = [win for win in self.bamf.get_open_windows() if not win.is_hidden]
 
358
        for pos, win in enumerate(stack_start):
 
359
            self.assertThat(stack[pos].x_id, Equals(win.x_id),
 
360
                            "%r at %d does not equal %r" % (stack[pos], pos, win))