1.1.32
by Natalia Bidart (nessita)
Import upstream version 2.99.5 |
1 |
# -*- coding: utf-8 -*-
|
2 |
#
|
|
3 |
# Copyright 2012 Canonical Ltd.
|
|
4 |
#
|
|
5 |
# This program is free software: you can redistribute it and/or modify it
|
|
6 |
# under the terms of the GNU General Public License version 3, as published
|
|
7 |
# by the Free Software Foundation.
|
|
8 |
#
|
|
9 |
# This program is distributed in the hope that it will be useful, but
|
|
10 |
# WITHOUT ANY WARRANTY; without even the implied warranties of
|
|
11 |
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
|
|
12 |
# PURPOSE. See the GNU General Public License for more details.
|
|
13 |
#
|
|
14 |
# You should have received a copy of the GNU General Public License along
|
|
15 |
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
||
17 |
"""Tests for the glib runner helper module."""
|
|
18 |
||
19 |
import subprocess |
|
20 |
||
21 |
# pylint: disable=E0611
|
|
22 |
from gi.repository import GLib |
|
23 |
# pylint: enable=E0611
|
|
24 |
from twisted.internet import defer |
|
25 |
||
26 |
from ubuntu_sso.utils import runner |
|
27 |
from ubuntu_sso.utils.runner import glib |
|
28 |
from ubuntu_sso.utils.runner.tests.test_runner import SpawnProgramTestCase |
|
29 |
||
30 |
||
31 |
class FakedSignal(object): |
|
32 |
"""Fake a glib signal."""
|
|
33 |
||
34 |
def __init__(self, name): |
|
35 |
self.name = name |
|
36 |
self._handlers = [] |
|
37 |
||
38 |
def connect(self, handler): |
|
39 |
"""Connect 'handler' with this signal."""
|
|
40 |
self._handlers.append(handler) |
|
41 |
||
42 |
def emit(self, *args, **kwargs): |
|
43 |
"""Emit this signal."""
|
|
44 |
for handler in self._handlers: |
|
45 |
handler(*args, **kwargs) |
|
46 |
||
47 |
||
48 |
class FakedProcess(object): |
|
49 |
"""Fake a glib Process."""
|
|
50 |
||
51 |
_argv = _flags = None |
|
52 |
_pid = 123456 |
|
53 |
_pids = [] |
|
54 |
_status_code = 0 |
|
55 |
||
56 |
SpawnFlags = GLib.SpawnFlags |
|
57 |
GError = GLib.GError |
|
58 |
||
59 |
def __init__(self): |
|
60 |
self.pid = lambda: self._pid |
|
61 |
self.started = FakedSignal('started') |
|
62 |
self.finished = FakedSignal('finished') |
|
63 |
self.error = FakedSignal('error') |
|
64 |
||
65 |
def spawn_async(self, argv, flags): |
|
66 |
"""Spwan a process."""
|
|
67 |
if 'None' in argv: |
|
68 |
exc = GLib.GError() |
|
69 |
exc.message = str('Can not handle None') |
|
70 |
exc.code = 24 |
|
71 |
raise exc |
|
72 |
||
73 |
self._argv = argv |
|
74 |
self._flags = flags |
|
75 |
||
76 |
try: |
|
77 |
subprocess.call(argv) |
|
78 |
except Exception, e: |
|
79 |
exc = GLib.GError() |
|
80 |
exc.message = str(e) |
|
81 |
exc.code = 42 |
|
82 |
raise exc |
|
83 |
||
84 |
self._pids.append(self._pid) |
|
85 |
return (self._pid, None, None, None) |
|
86 |
||
87 |
def child_watch_add(self, pid, child_watch): |
|
88 |
"""Addd a child watch."""
|
|
89 |
if pid in self._pids: |
|
90 |
child_watch(pid, self._status_code) |
|
91 |
||
92 |
def spawn_close_pid(self, pid): |
|
93 |
"""Close the 'pid'."""
|
|
94 |
self._pids.remove(pid) |
|
95 |
||
96 |
||
97 |
class GLibSpawnProgramTestCase(SpawnProgramTestCase): |
|
98 |
"""The test suite for the spawn_program method (using GLib)."""
|
|
99 |
||
100 |
use_reactor = False |
|
101 |
||
102 |
@defer.inlineCallbacks |
|
103 |
def setUp(self): |
|
104 |
yield super(GLibSpawnProgramTestCase, self).setUp() |
|
105 |
# Since we can't mix plan glib runner and the gireactor, we patch
|
|
106 |
# GLib.spawn_async and fake the conditions so the glib runner is chosen
|
|
107 |
self.process = FakedProcess() |
|
108 |
self.patch(glib, 'GLib', self.process) |
|
109 |
self.patch(runner, 'is_twisted_reactor_installed', lambda: False) |
|
110 |
self.patch(runner, 'is_qt4_main_loop_installed', lambda: False) |
|
111 |
||
112 |
# Access to a protected member _flags, _argv of a client class
|
|
113 |
# pylint: disable=W0212
|
|
114 |
||
115 |
@defer.inlineCallbacks |
|
116 |
def test_flags_are_correct(self): |
|
117 |
"""The flags are the correct ones."""
|
|
118 |
yield self.spawn_fn(self.args) |
|
119 |
||
120 |
flags = GLib.SpawnFlags.DO_NOT_REAP_CHILD | \ |
|
121 |
GLib.SpawnFlags.SEARCH_PATH | \ |
|
122 |
GLib.SpawnFlags.STDOUT_TO_DEV_NULL | \ |
|
123 |
GLib.SpawnFlags.STDERR_TO_DEV_NULL |
|
124 |
self.assertEqual(self.process._flags, flags) |
|
125 |
||
126 |
@defer.inlineCallbacks |
|
127 |
def test_argv_is_bytes(self): |
|
128 |
"""The argv parameter is converted to bytes."""
|
|
129 |
yield self.spawn_fn(self.args) |
|
130 |
||
131 |
bytes_args = [a.encode('utf-8') for a in self.args] |
|
132 |
self.assertEqual(self.process._argv, bytes_args) |