~ubuntu-branches/ubuntu/saucy/autopilot/saucy-proposed

« back to all changes in this revision

Viewing changes to autopilot/introspection/__init__.py

  • Committer: Package Import Robot
  • Author(s): Didier Roche
  • Date: 2013-06-07 13:33:46 UTC
  • mfrom: (57.1.1 saucy-proposed)
  • Revision ID: package-import@ubuntu.com-20130607133346-42zvbl1h2k1v54ac
Tags: 1.3daily13.06.05-0ubuntu2
autopilot-touch only suggests python-ubuntu-platform-api for now.
It's not in distro and we need that requirement to be fulfilled to
have unity 7, 100 scopes and the touch stack to distro.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2
 
# Copyright 2012 Canonical
3
 
# Author: Thomi Richards
4
 
#
5
 
# This program is free software: you can redistribute it and/or modify it
6
 
# under the terms of the GNU General Public License version 3, as published
7
 
# by the Free Software Foundation.
8
 
#
9
 
 
10
 
"""Package for introspection support."""
 
2
#
 
3
# Autopilot Functional Test Tool
 
4
# Copyright (C) 2012-2013 Canonical
 
5
#
 
6
# This program is free software: you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation, either version 3 of the License, or
 
9
# (at your option) any later version.
 
10
#
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
#
 
19
 
 
20
 
 
21
"""Package for introspection support.
 
22
 
 
23
This package contains the internal implementation of the autopilot introspection
 
24
mechanism, and probably isn't useful to most test authors.
 
25
 
 
26
"""
 
27
from __future__ import absolute_import
11
28
 
12
29
import dbus
13
 
from gi.repository import Gio
14
30
import logging
15
31
import subprocess
16
 
from testtools.content import text_content
17
32
from time import sleep
18
33
import os
19
 
import signal
20
 
 
21
 
 
 
34
 
 
35
 
 
36
from autopilot.introspection.backends import DBusAddress
22
37
from autopilot.introspection.constants import (
23
38
    AUTOPILOT_PATH,
24
39
    QT_AUTOPILOT_IFACE,
25
40
    AP_INTROSPECTION_IFACE,
26
 
    DBUS_INTROSPECTION_IFACE,
27
41
    )
28
42
from autopilot.introspection.dbus import (
29
 
    clear_object_registry,
 
43
    CustomEmulatorBase,
30
44
    DBusIntrospectionObject,
31
 
    object_passes_filters,
32
 
    get_session_bus,
 
45
    get_classname_from_path,
33
46
    )
 
47
from autopilot.dbus_handler import get_session_bus
34
48
from autopilot.utilities import get_debug_logger
35
49
 
36
50
 
37
51
logger = logging.getLogger(__name__)
38
52
 
39
53
 
40
 
class ApplicationIntrospectionTestMixin(object):
41
 
    """A mix-in class to make launching applications for introsection easier.
42
 
 
43
 
    .. important:: You should not instantiate this class directly. Instead, use
44
 
     one of the derived classes.
45
 
 
46
 
    """
47
 
 
48
 
    def launch_test_application(self, application, *arguments, **kwargs):
49
 
        """Launch *application* and retrieve a proxy object for the application.
50
 
 
51
 
        Use this method to launch a supported application and start testing it.
52
 
        The application can be specified as:
53
 
 
54
 
         * A Desktop file, either with or without a path component.
55
 
         * An executable file, either with a path, or one that is in the $PATH.
56
 
 
57
 
        This method supports the following keyword arguments:
58
 
 
59
 
         * *launch_dir*. If set to a directory that exists the process will be
60
 
           launched from that directory.
61
 
 
62
 
         * *capture_output*. If set to True (the default), the process output
63
 
           will be captured and attached to the test as test detail.
64
 
 
65
 
        :raises: **ValueError** if unknown keyword arguments are passed.
66
 
        :return: A proxy object that represents the application. Introspection
67
 
         data is retrievable via this object.
68
 
 
69
 
         """
70
 
        if not isinstance(application, basestring):
71
 
            raise TypeError("'application' parameter must be a string.")
72
 
        cwd = kwargs.pop('launch_dir', None)
73
 
        capture_output = kwargs.pop('capture_output', True)
74
 
        if kwargs:
75
 
            raise ValueError("Unknown keyword arguments: %s." %
76
 
                (', '.join( repr(k) for k in kwargs.keys())))
77
 
 
78
 
        if application.endswith('.desktop'):
79
 
            proc = Gio.DesktopAppInfo.new(application)
80
 
            application = proc.get_executable()
81
 
 
82
 
        path, args = self.prepare_environment(application, list(arguments))
83
 
 
84
 
        process = launch_autopilot_enabled_process(path,
85
 
                                                    args,
86
 
                                                    capture_output,
87
 
                                                    cwd=cwd)
88
 
        self.addCleanup(self._kill_process_and_attach_logs, process)
89
 
        return get_autopilot_proxy_object_for_process(process)
 
54
def get_application_launcher(app_path):
 
55
    """Return an instance of :class:`ApplicationLauncher` that knows how to launch
 
56
    the application at 'app_path'.
 
57
    """
 
58
    # TODO: this is a teeny bit hacky - we call ldd to check whether this application
 
59
    # links to certain library. We're assuming that linking to libQt* or libGtk*
 
60
    # means the application is introspectable. This excludes any non-dynamically
 
61
    # linked executables, which we may need to fix further down the line.
 
62
    try:
 
63
        ldd_output = subprocess.check_output(["ldd", app_path]).strip().lower()
 
64
    except subprocess.CalledProcessError as e:
 
65
        raise RuntimeError(e)
 
66
    if 'libqtcore' in ldd_output or 'libqt5core' in ldd_output:
 
67
        from autopilot.introspection.qt import QtApplicationLauncher
 
68
        return QtApplicationLauncher()
 
69
    elif 'libgtk' in ldd_output:
 
70
        from autopilot.introspection.gtk import GtkApplicationLauncher
 
71
        return GtkApplicationLauncher()
 
72
    return None
 
73
 
 
74
 
 
75
def get_application_launcher_from_string_hint(hint):
 
76
    """Return in instance of :class:`ApplicationLauncher` given a string hint."""
 
77
    from autopilot.introspection.qt import QtApplicationLauncher
 
78
    from autopilot.introspection.gtk import GtkApplicationLauncher
 
79
 
 
80
    hint = hint.lower()
 
81
    if hint == 'qt':
 
82
        return QtApplicationLauncher()
 
83
    elif hint == 'gtk':
 
84
        return GtkApplicationLauncher()
 
85
    return None
 
86
 
 
87
 
 
88
def launch_application(launcher, application, *arguments, **kwargs):
 
89
    """Launch an application, and return a process object.
 
90
 
 
91
    :param launcher: An instance of the :class:`ApplicationLauncher` class to
 
92
        prepare the environment before launching the application itself.
 
93
    """
 
94
 
 
95
    if not isinstance(application, basestring):
 
96
        raise TypeError("'application' parameter must be a string.")
 
97
    cwd = kwargs.pop('launch_dir', None)
 
98
    capture_output = kwargs.pop('capture_output', True)
 
99
    if kwargs:
 
100
        raise ValueError("Unknown keyword arguments: %s." %
 
101
            (', '.join( repr(k) for k in kwargs.keys())))
 
102
 
 
103
    path, args = launcher.prepare_environment(application, list(arguments))
 
104
 
 
105
    process = launch_process(path,
 
106
        args,
 
107
        capture_output,
 
108
        cwd=cwd
 
109
        )
 
110
    return process
 
111
 
 
112
 
 
113
class ApplicationLauncher(object):
 
114
    """A class that knows how to launch an application with a certain type of
 
115
    introspection enabled.
 
116
 
 
117
    """
90
118
 
91
119
    def prepare_environment(self, app_path, arguments):
92
120
        """Prepare the application, or environment to launch with autopilot-support.
99
127
        """
100
128
        raise NotImplementedError("Sub-classes must implement this method.")
101
129
 
102
 
    def _kill_process_and_attach_logs(self, process):
103
 
        process.kill()
104
 
        logger.info("waiting for process to exit.")
105
 
        for i in range(10):
106
 
            if process.returncode is not None:
107
 
                break
108
 
            if i == 9:
109
 
                logger.info("Terminating process group, since it hasn't exited after 10 seconds.")
110
 
                os.killpg(process.pid, signal.SIGTERM)
111
 
            sleep(1)
112
 
        stdout, stderr = process.communicate()
113
 
        self.addDetail('process-stdout', text_content(stdout))
114
 
        self.addDetail('process-stderr', text_content(stderr))
115
 
 
116
 
 
117
 
def launch_autopilot_enabled_process(application, args, capture_output, **kwargs):
118
 
    """Launch an autopilot-enabled process and return the proxy object."""
 
130
 
 
131
 
 
132
def launch_process(application, args, capture_output, **kwargs):
 
133
    """Launch an autopilot-enabled process and return the process object."""
119
134
    commandline = [application]
120
135
    commandline.extend(args)
121
136
    logger.info("Launching process: %r", commandline)
132
147
    return process
133
148
 
134
149
 
135
 
def get_child_pids(pid):
136
 
    """Get a list of all child process Ids, for the given parent.
137
 
 
138
 
    """
139
 
    def get_children(pid):
140
 
        command = ['ps', '-o', 'pid', '--ppid', str(pid), '--noheaders']
141
 
        try:
142
 
            raw_output = subprocess.check_output(command)
143
 
        except subprocess.CalledProcessError:
144
 
            return []
145
 
        return [int(p) for p in raw_output.split()]
146
 
 
147
 
    result = [pid]
148
 
    data = get_children(pid)
149
 
    while data:
150
 
        pid = data.pop(0)
151
 
        result.append(pid)
152
 
        data.extend(get_children(pid))
153
 
 
154
 
    return result
155
 
 
156
 
 
157
 
def get_autopilot_proxy_object_for_process(process):
 
150
def get_autopilot_proxy_object_for_process(process, emulator_base):
158
151
    """Return the autopilot proxy object for the given *process*.
159
152
 
160
153
    :raises: **RuntimeError** if no autopilot interface was found.
173
166
    bus_object = session_bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
174
167
    bus_iface = dbus.Interface(bus_object, 'org.freedesktop.DBus')
175
168
 
176
 
    # clear the object registry, since it's specific to the dbus service, and we
177
 
    # have just started a new service. We don't want the old types hanging around
178
 
    # in the registry. We need a better method for this however.
179
 
    clear_object_registry()
180
 
 
181
169
    logger.info("Looking for autopilot interface for PID %d (and children)", pid)
182
170
    # We give the process 10 seconds grace time to get the dbus interface up...
183
171
    for i in range(10):
190
178
                if name_pid in eligible_pids:
191
179
                    # We've found at least one connection to the session bus from
192
180
                    # this PID. Might not be the one we want however...
193
 
                    proxy = make_proxy_object_from_service_name(name, AUTOPILOT_PATH)
 
181
                    dbus_address_instance = get_dbus_address_object(name, AUTOPILOT_PATH)
 
182
                    proxy = make_proxy_object(dbus_address_instance, emulator_base)
194
183
                    proxy.set_process(process)
195
184
                    return proxy
196
185
            except Exception as e:
199
188
    raise RuntimeError("Unable to find Autopilot interface.")
200
189
 
201
190
 
202
 
def make_proxy_object_from_service_name(service_name, obj_path):
203
 
    """Returns a root proxy object given a DBus service name."""
 
191
def get_child_pids(pid):
 
192
    """Get a list of all child process Ids, for the given parent.
 
193
 
 
194
    """
 
195
    def get_children(pid):
 
196
        command = ['ps', '-o', 'pid', '--ppid', str(pid), '--noheaders']
 
197
        try:
 
198
            raw_output = subprocess.check_output(command)
 
199
        except subprocess.CalledProcessError:
 
200
            return []
 
201
        return [int(p) for p in raw_output.split()]
 
202
 
 
203
    result = [pid]
 
204
    data = get_children(pid)
 
205
    while data:
 
206
        pid = data.pop(0)
 
207
        result.append(pid)
 
208
        data.extend(get_children(pid))
 
209
 
 
210
    return result
 
211
 
 
212
 
 
213
def get_dbus_address_object(service_name, obj_path, dbus_addr=None):
204
214
    # parameters can sometimes be dbus.String instances, sometimes QString instances.
205
215
    # it's easier to convert them here than at the calling sites.
206
216
    service_name = str(service_name)
207
217
    obj_path = str(obj_path)
208
218
 
209
 
    proxy_bases = get_proxy_object_base_clases(service_name, obj_path)
210
 
    cls_name, cls_state = get_proxy_object_class_name_and_state(service_name, obj_path)
 
219
    if dbus_addr is not None:
 
220
        be = DBusAddress.CustomBus(dbus_addr, service_name, obj_path)
 
221
    else:
 
222
        be = DBusAddress.SessionBus(service_name, obj_path)
 
223
    return be
 
224
 
 
225
 
 
226
def make_proxy_object(data_source, emulator_base):
 
227
    """Returns a root proxy object given a DBus service name."""
 
228
 
 
229
    proxy_bases = get_proxy_object_base_clases(data_source)
 
230
    if emulator_base is None:
 
231
        emulator_base = type('DefaultEmulatorBase', (CustomEmulatorBase,), {})
 
232
    proxy_bases = proxy_bases + (emulator_base, )
 
233
    cls_name, cls_state = get_proxy_object_class_name_and_state(data_source)
211
234
 
212
235
    clsobj = type(str(cls_name),
213
236
        proxy_bases,
214
 
        dict(DBUS_SERVICE=service_name,
215
 
            DBUS_OBJECT=obj_path
 
237
        dict(_Backend = data_source
216
238
            )
217
239
        )
 
240
 
218
241
    proxy = clsobj.get_root_instance()
219
242
    return proxy
220
243
 
221
244
 
222
 
def get_proxy_object_base_clases(service_name, obj_path):
 
245
def get_proxy_object_base_clases(backend):
223
246
    """Return  tuple of the base classes to use when creating a proxy object
224
247
    for the given service name & path.
225
248
 
227
250
 
228
251
    """
229
252
 
230
 
    bases = [ApplicationProxyObect]
 
253
    bases = [ApplicationProxyObject]
231
254
 
232
 
    dbus_object = get_session_bus().get_object(service_name, obj_path)
233
 
    introspection_iface = dbus.Interface(dbus_object, DBUS_INTROSPECTION_IFACE)
234
 
    intro_xml = introspection_iface.Introspect()
 
255
    intro_xml = backend.dbus_introspection_iface.Introspect()
235
256
    if AP_INTROSPECTION_IFACE not in intro_xml:
236
 
        raise RuntimeError("Could not find Autopilot interface on service name '%s'" % service_name)
 
257
        raise RuntimeError("Could not find Autopilot interface on DBus backend '%s'" % backend)
237
258
 
238
259
    if QT_AUTOPILOT_IFACE in intro_xml:
239
260
        from autopilot.introspection.qt import QtObjectProxyMixin
242
263
    return tuple(bases)
243
264
 
244
265
 
245
 
def get_proxy_object_class_name_and_state(service_name, obj_path):
 
266
def get_proxy_object_class_name_and_state(backend):
246
267
    """Return the class name and root state dictionary."""
247
 
    dbus_object = get_session_bus().get_object(service_name, obj_path)
248
 
    dbus_iface = dbus.Interface(dbus_object, AP_INTROSPECTION_IFACE)
249
 
    return dbus_iface.GetState("/")[0]
250
 
 
251
 
 
252
 
class ApplicationProxyObect(DBusIntrospectionObject):
 
268
    object_path, object_state = backend.introspection_iface.GetState("/")[0]
 
269
    return get_classname_from_path(object_path), object_state
 
270
 
 
271
 
 
272
class ApplicationProxyObject(DBusIntrospectionObject):
253
273
    """A class that better supports query data from an application."""
254
274
 
255
 
    def __init__(self, state, path_info=None):
256
 
        super(ApplicationProxyObect, self).__init__(state, path_info)
 
275
    def __init__(self, state, path):
 
276
        super(ApplicationProxyObject, self).__init__(state, path)
257
277
        self._process = None
258
278
 
259
 
    def select_single(self, type_name='*', **kwargs):
260
 
        """Get a single node from the introspection tree, with type equal to
261
 
        *type_name* and (optionally) matching the keyword filters present in
262
 
        *kwargs*.
263
 
 
264
 
        For example:
265
 
 
266
 
        >>> app.select_single('QPushButton', objectName='clickme')
267
 
        ... returns a QPushButton whose 'objectName' property is 'clickme'.
268
 
 
269
 
        If nothing is returned from the query, this method returns None.
270
 
 
271
 
        :raises: **ValueError** if the query returns more than one item. *If you
272
 
         want more than one item, use select_many instead*.
273
 
 
274
 
        """
275
 
        instances = self.select_many(type_name, **kwargs)
276
 
        if len(instances) > 1:
277
 
            raise ValueError("More than one item was returned for query")
278
 
        if not instances:
279
 
            return None
280
 
        return instances[0]
281
 
 
282
 
    def select_many(self, type_name='*', **kwargs):
283
 
        """Get a list of nodes from the introspection tree, with type equal to
284
 
        *type_name* and (optionally) matching the keyword filters present in
285
 
        *kwargs*.
286
 
 
287
 
        For example:
288
 
 
289
 
        >>> app.select_many('QPushButton', enabled=True)
290
 
        ... returns a list of QPushButtons that are enabled.
291
 
 
292
 
        If you only want to get one item, use select_single instead.
293
 
 
294
 
        """
295
 
        logger.debug("Selecting objects of %s with attributes: %r",
296
 
            'any type' if type_name == '*' else 'type ' + type_name,
297
 
            kwargs)
298
 
 
299
 
        path = "//%s" % type_name
300
 
        state_dicts = self.get_state_by_path(path)
301
 
        instances = [self.make_introspection_object(i) for i in state_dicts]
302
 
        return filter(lambda i: object_passes_filters(i, **kwargs), instances)
303
 
 
304
279
    def set_process(self, process):
305
280
        """Set the subprocess.Popen object of the process that this is a proxy for.
306
281
 
320
295
    def kill_application(self):
321
296
        """Kill the running process that this is a proxy for using 'kill `pid`'."""
322
297
        subprocess.call(["kill", "%d" % self._process.pid])
 
298