~brendan-donegan/checkbox/bug1064425

« back to all changes in this revision

Viewing changes to scripts/removable_storage_test

merge udisks2-test-script branch:

* scripts: add udisks2 implementation to removable_storage_test
* scripts: use connect_to_system_bus() to connect to dbus
* scripts: re-factor DiskTest._probe_disks() for udisks2 support
* scripts: remove unused instance variables

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
import tempfile
12
12
import time
13
13
 
 
14
from gi.repository import GUdev
 
15
 
 
16
from checkbox.dbus import connect_to_system_bus
 
17
from checkbox.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE
 
18
from checkbox.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE
 
19
from checkbox.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE
 
20
from checkbox.dbus.udisks2 import UDisks2Model, UDisks2Observer
 
21
from checkbox.dbus.udisks2 import is_udisks2_supported
 
22
from checkbox.dbus.udisks2 import lookup_udev_device
 
23
from checkbox.dbus.udisks2 import map_udisks1_connection_bus
 
24
from checkbox.heuristics.udisks2 import is_memory_card
14
25
from checkbox.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE
 
26
from checkbox.udev import get_interconnect_speed
 
27
from checkbox.udev import get_udev_block_devices
15
28
 
16
29
 
17
30
class ActionTimer():
47
60
        zzril delenit augue duis dolore te feugait nulla facilisi.
48
61
        '''
49
62
        words = phrase.replace('\n', '').split()
50
 
 
51
63
        word_deque = collections.deque(words)
52
64
        seed_deque = collections.deque(seed)
53
 
 
54
65
        while True:
55
66
            yield ' '.join(list(word_deque))
56
67
            word_deque.rotate(int(seed_deque[0]))
58
69
 
59
70
    def _write_test_data_file(self, size):
60
71
        data = self._generate_test_data()
61
 
 
62
72
        while os.path.getsize(self.tfile.name) < size:
63
73
            self.tfile.write(next(data).encode('UTF-8'))
64
74
        return self
84
94
    ''' Class to contain various methods for testing removable disks '''
85
95
 
86
96
    def __init__(self, device, memorycard):
87
 
        self.process = None
88
 
        self.cmd = None
89
 
        self.timeout = 3
90
 
        self.returnCode = None
91
97
        self.rem_disks = {}     # mounted before the script running
92
98
        self.rem_disks_nm = {}  # not mounted before the script running
93
99
        self.rem_disks_memory_cards = {}
127
133
            logging.error("Unable to remove tempfile %s: %s", target, exc)
128
134
 
129
135
    def _probe_disks(self):
130
 
        ''' Probes dbus to find any attached/mounted devices'''
131
 
        bus = dbus.SystemBus()
 
136
        """
 
137
        Internal method used to probe for available disks
 
138
 
 
139
        Indirectly sets:
 
140
            self.rem_disks{,_nm,_memory_cards,_memory_cards_nm,_speed}
 
141
        """
 
142
        bus, loop = connect_to_system_bus()
 
143
        if is_udisks2_supported(bus):
 
144
            self._probe_disks_udisks2(bus)
 
145
        else:
 
146
            self._probe_disks_udisks1(bus)
 
147
 
 
148
    def _probe_disks_udisks2(self, bus):
 
149
        """
 
150
        Internal method used to probe / discover available disks using udisks2
 
151
        dbus interface using the provided dbus bus (presumably the system bus)
 
152
        """
 
153
        # We'll need udisks2 and udev to get the data we need
 
154
        udisks2_observer = UDisks2Observer()
 
155
        udisks2_model = UDisks2Model(udisks2_observer)
 
156
        udisks2_observer.connect_to_bus(bus)
 
157
        udev_client = GUdev.Client()
 
158
        # Get a collection of all udev devices corresponding to block devices
 
159
        udev_devices = get_udev_block_devices(udev_client)
 
160
        # Get a collection of all udisks2 objects
 
161
        udisks2_objects = udisks2_model.managed_objects
 
162
        # Let's get a helper to simplify the loop below
 
163
 
 
164
        def iter_filesystems_on_block_devices():
 
165
            """
 
166
            Generate a collection of UDisks2 object paths that
 
167
            have both the filesystem and block device interfaces
 
168
            """
 
169
            for udisks2_object_path, interfaces in udisks2_objects.items():
 
170
                if (UDISKS2_FILESYSTEM_INTERFACE in interfaces
 
171
                    and UDISKS2_BLOCK_INTERFACE in interfaces):
 
172
                    yield udisks2_object_path
 
173
        # We need to know about all IO candidates,
 
174
        # let's iterate over all the block devices reported by udisks2
 
175
        for udisks2_object_path in iter_filesystems_on_block_devices():
 
176
            # Get interfaces implemented by this object
 
177
            udisks2_object = udisks2_objects[udisks2_object_path]
 
178
            # Find the path of the udisks2 object that represents the drive
 
179
            # this object is a part of
 
180
            drive_object_path = (
 
181
                udisks2_object[UDISKS2_BLOCK_INTERFACE]['Drive'])
 
182
            # Lookup the drive object, if any. This can fail when
 
183
            try:
 
184
                drive_object = udisks2_objects[drive_object_path]
 
185
            except KeyError:
 
186
                logging.error(
 
187
                    "Unable to locate drive associated with %s",
 
188
                    udisks2_object_path)
 
189
                continue
 
190
            else:
 
191
                drive_props = drive_object[UDISKS2_DRIVE_INTERFACE]
 
192
            # Get the connection bus property from the drive interface of the
 
193
            # drive object. This is required to filter out the devices we don't
 
194
            # want to look at now.
 
195
            connection_bus = drive_props["ConnectionBus"]
 
196
            desired_connection_buses = set([
 
197
                map_udisks1_connection_bus(device)
 
198
                for device in self.device])
 
199
            # Skip devices that are attached to undesired connection buses
 
200
            if connection_bus not in desired_connection_buses:
 
201
                continue
 
202
            # Lookup the udev object that corresponds to this object
 
203
            try:
 
204
                udev_device = lookup_udev_device(udisks2_object, udev_devices)
 
205
            except LookupError:
 
206
                logging.error(
 
207
                    "Unable to locate udev object that corresponds to: %s",
 
208
                    udisks2_object_path)
 
209
                continue
 
210
            # Get the block device pathname,
 
211
            # to avoid the confusion, this is something like /dev/sdbX
 
212
            dev_file = udev_device.get_device_file()
 
213
            # Get the list of mount points of this block device
 
214
            mount_points = (
 
215
                udisks2_object[UDISKS2_FILESYSTEM_INTERFACE]['MountPoints'])
 
216
            # Get the speed of the interconnect that is associated with the
 
217
            # block device we're looking at. This is purely informational but
 
218
            # it is a part of the required API
 
219
            interconnect_speed = get_interconnect_speed(udev_device)
 
220
            if interconnect_speed:
 
221
                self.rem_disks_speed[dev_file] = (
 
222
                    interconnect_speed * 10 ** 6)
 
223
            else:
 
224
                self.rem_disks_speed[dev_file] = None
 
225
            # We need to skip-non memory cards if we look for memory cards and
 
226
            # vice-versa so let's inspect the drive and use heuristics to
 
227
            # detect memory cards (a memory card reader actually) now.
 
228
            if self.memorycard != is_memory_card(drive_props['Vendor'],
 
229
                                                 drive_props['Model'],
 
230
                                                 drive_props['Media']):
 
231
                continue
 
232
            # The if/else test below simply distributes the mount_point to the
 
233
            # appropriate variable, to keep the API requirements. It is
 
234
            # confusing as _memory_cards is variable is somewhat dummy.
 
235
            if mount_points:
 
236
                # XXX: Arbitrarily pick the first of the mount points
 
237
                mount_point = mount_points[0]
 
238
                self.rem_disks_memory_cards[dev_file] = mount_point
 
239
                self.rem_disks[dev_file] = mount_point
 
240
            else:
 
241
                self.rem_disks_memory_cards_nm[dev_file] = None
 
242
                self.rem_disks_nm[dev_file] = None
 
243
 
 
244
    def _probe_disks_udisks1(self, bus):
 
245
        """
 
246
        Internal method used to probe / discover available disks using udisks1
 
247
        dbus interface using the provided dbus bus (presumably the system bus)
 
248
        """
132
249
        ud_manager_obj = bus.get_object("org.freedesktop.UDisks",
133
250
                                        "/org/freedesktop/UDisks")
134
251
        ud_manager = dbus.Interface(ud_manager_obj, 'org.freedesktop.UDisks')