1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#!/usr/bin/python
# Copyright (C) 2009 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from twisted.trial import unittest
#from twisted.internet.defer import Deferred
import dbus
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
from usbcreator.backends.udisks import UDisksBackend
import subprocess, os
class TestUDisksBackend(unittest.TestCase):
def setUp(self):
fake_udisks = os.path.join(os.path.dirname(__file__),
'fake_udisks.py')
self.proc = subprocess.Popen(['python', fake_udisks])
import time
time.sleep(1)
#self._startDBus()
self.bus = dbus.SessionBus()
self.backend = UDisksBackend(bus=self.bus)
def tearDown(self):
self.proc.terminate()
self.proc.wait()
#self._stopDBus()
# XXX evand 2009-06-03: Uninteresting bit of code to test, as we test our
# fake EnumerateDevices and signal handling elsewhere.
#def test_detect_devices(self):
# d = Deferred()
# def handle_reply(res):
# print(res)
# d.callback(res)
# self.backend.detect_devices(cb=handle_reply)
# return d
def test_disk_added(self):
d = '/org/freedesktop/UDisks/devices/sdb'
self.backend._device_added(d)
def test_partition_added(self):
d = '/org/freedesktop/UDisks/devices/sdb1'
self.backend._device_added(d)
def test_cd_added(self):
d = '/org/freedesktop/UDisks/devices/scd0'
self.backend._device_added(d)
# XXX evand 2009-06-03: Just use the 3 second delay for now, then come back
# to this. Getting the tests done and the core code written is more
# important.
#def _startDBus(self):
# """Start our own session bus daemon for testing."""
# config_file = os.path.join(os.path.dirname(os.getcwd()), "tests",
# "dbus-session.conf")
# dbus_args = ["--fork",
# "--config-file=" + config_file,
# "--print-address"]
# p = subprocess.Popen(['/bin/dbus-daemon'] + dbus_args,
# bufsize=4096, stdout=subprocess.PIPE,
# universal_newlines=True)
# data = p.stdout
# self.dbus_address = "".join(data.readlines()).strip()
# self.dbus_pid = p.pid + 1
# if self.dbus_address != "":
# os.environ["DBUS_SESSION_BUS_ADDRESS"] = self.dbus_address
# else:
# os.kill(self.dbus_pid, signal.SIGKILL)
# raise DBusLaunchError("There was a problem launching dbus-daemon.")
#def _stopDBus(self):
# """Stop our DBus session bus daemon."""
# del os.environ["DBUS_SESSION_BUS_ADDRESS"]
# os.kill(self.dbus_pid, signal.SIGKILL)
|