14
from gi.repository import GUdev
16
from checkbox.dbus import connect_to_system_bus
17
from checkbox.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE
18
from checkbox.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE
19
from checkbox.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE
20
from checkbox.dbus.udisks2 import UDisks2Model, UDisks2Observer
21
from checkbox.dbus.udisks2 import is_udisks2_supported
22
from checkbox.dbus.udisks2 import lookup_udev_device
23
from checkbox.dbus.udisks2 import map_udisks1_connection_bus
24
from checkbox.heuristics.udisks2 import is_memory_card
25
from checkbox.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE
26
from checkbox.udev import get_interconnect_speed
27
from checkbox.udev import get_udev_block_devices
31
'''Class to implement a simple timer'''
33
self.start = time.time()
36
def __exit__(self, *args):
37
self.stop = time.time()
38
self.interval = self.stop - self.start
42
'''Class to create data files'''
43
def __init__(self, size):
44
self.tfile = tempfile.NamedTemporaryFile(delete=False)
47
self.path, self.name = os.path.split(self.tfile.name)
48
self._write_test_data_file(size)
50
def _generate_test_data(self):
51
seed = "104872948765827105728492766217823438120"
53
Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam
54
nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat
55
volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation
56
ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
57
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse
58
molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero
59
eros et accumsan et iusto odio dignissim qui blandit praesent luptatum
60
zzril delenit augue duis dolore te feugait nulla facilisi.
62
words = phrase.replace('\n', '').split()
63
word_deque = collections.deque(words)
64
seed_deque = collections.deque(seed)
66
yield ' '.join(list(word_deque))
67
word_deque.rotate(int(seed_deque[0]))
70
def _write_test_data_file(self, size):
71
data = self._generate_test_data()
72
while os.path.getsize(self.tfile.name) < size:
73
self.tfile.write(next(data).encode('UTF-8'))
77
def md5_hash_file(path):
80
with open(path, 'rb') as stream:
82
data = stream.read(8192)
86
except IOError as exc:
87
logging.error("unable to checksum %s: %s", path, exc)
90
return md5.hexdigest()
94
''' Class to contain various methods for testing removable disks '''
96
def __init__(self, device, memorycard):
97
self.rem_disks = {} # mounted before the script running
98
self.rem_disks_nm = {} # not mounted before the script running
99
self.rem_disks_memory_cards = {}
100
self.rem_disks_memory_cards_nm = {}
101
self.rem_disks_speed = {}
104
self.memorycard = memorycard
107
def read_file(self, source):
108
with open(source, 'rb') as infile:
110
self.data = infile.read()
111
except IOError as exc:
112
logging.error("Unable to read data from %s: %s", source, exc)
117
def write_file(self, data, dest):
119
outfile = open(dest, 'wb', 0)
120
except OSError as exc:
121
logging.error("Unable to open %s for writing.", dest)
122
logging.error(" %s", exc)
126
outfile.write(self.data)
127
except IOError as exc:
128
logging.error("Unable to write data to %s: %s", dest, exc)
132
os.fsync(outfile.fileno())
135
def clean_up(self, target):
138
except OSError as exc:
139
logging.error("Unable to remove tempfile %s", target)
140
logging.error(" %s", exc)
142
def _probe_disks(self):
144
Internal method used to probe for available disks
147
self.rem_disks{,_nm,_memory_cards,_memory_cards_nm,_speed}
149
bus, loop = connect_to_system_bus()
150
if is_udisks2_supported(bus):
151
self._probe_disks_udisks2(bus)
153
self._probe_disks_udisks1(bus)
155
def _probe_disks_udisks2(self, bus):
157
Internal method used to probe / discover available disks using udisks2
158
dbus interface using the provided dbus bus (presumably the system bus)
160
# We'll need udisks2 and udev to get the data we need
161
udisks2_observer = UDisks2Observer()
162
udisks2_model = UDisks2Model(udisks2_observer)
163
udisks2_observer.connect_to_bus(bus)
164
udev_client = GUdev.Client()
165
# Get a collection of all udev devices corresponding to block devices
166
udev_devices = get_udev_block_devices(udev_client)
167
# Get a collection of all udisks2 objects
168
udisks2_objects = udisks2_model.managed_objects
169
# Let's get a helper to simplify the loop below
171
def iter_filesystems_on_block_devices():
173
Generate a collection of UDisks2 object paths that
174
have both the filesystem and block device interfaces
176
for udisks2_object_path, interfaces in udisks2_objects.items():
177
if (UDISKS2_FILESYSTEM_INTERFACE in interfaces
178
and UDISKS2_BLOCK_INTERFACE in interfaces):
179
yield udisks2_object_path
180
# We need to know about all IO candidates,
181
# let's iterate over all the block devices reported by udisks2
182
for udisks2_object_path in iter_filesystems_on_block_devices():
183
# Get interfaces implemented by this object
184
udisks2_object = udisks2_objects[udisks2_object_path]
185
# Find the path of the udisks2 object that represents the drive
186
# this object is a part of
187
drive_object_path = (
188
udisks2_object[UDISKS2_BLOCK_INTERFACE]['Drive'])
189
# Lookup the drive object, if any. This can fail when
191
drive_object = udisks2_objects[drive_object_path]
194
"Unable to locate drive associated with %s",
198
drive_props = drive_object[UDISKS2_DRIVE_INTERFACE]
199
# Get the connection bus property from the drive interface of the
200
# drive object. This is required to filter out the devices we don't
201
# want to look at now.
202
connection_bus = drive_props["ConnectionBus"]
203
desired_connection_buses = set([
204
map_udisks1_connection_bus(device)
205
for device in self.device])
206
# Skip devices that are attached to undesired connection buses
207
if connection_bus not in desired_connection_buses:
209
# Lookup the udev object that corresponds to this object
211
udev_device = lookup_udev_device(udisks2_object, udev_devices)
214
"Unable to locate udev object that corresponds to: %s",
217
# Get the block device pathname,
218
# to avoid the confusion, this is something like /dev/sdbX
219
dev_file = udev_device.get_device_file()
220
# Get the list of mount points of this block device
222
udisks2_object[UDISKS2_FILESYSTEM_INTERFACE]['MountPoints'])
223
# Get the speed of the interconnect that is associated with the
224
# block device we're looking at. This is purely informational but
225
# it is a part of the required API
226
interconnect_speed = get_interconnect_speed(udev_device)
227
if interconnect_speed:
228
self.rem_disks_speed[dev_file] = (
229
interconnect_speed * 10 ** 6)
231
self.rem_disks_speed[dev_file] = None
232
# We need to skip-non memory cards if we look for memory cards and
233
# vice-versa so let's inspect the drive and use heuristics to
234
# detect memory cards (a memory card reader actually) now.
235
if self.memorycard != is_memory_card(drive_props['Vendor'],
236
drive_props['Model'],
237
drive_props['Media']):
239
# The if/else test below simply distributes the mount_point to the
240
# appropriate variable, to keep the API requirements. It is
241
# confusing as _memory_cards is variable is somewhat dummy.
243
# XXX: Arbitrarily pick the first of the mount points
244
mount_point = mount_points[0]
245
self.rem_disks_memory_cards[dev_file] = mount_point
246
self.rem_disks[dev_file] = mount_point
248
self.rem_disks_memory_cards_nm[dev_file] = None
249
self.rem_disks_nm[dev_file] = None
251
def _probe_disks_udisks1(self, bus):
253
Internal method used to probe / discover available disks using udisks1
254
dbus interface using the provided dbus bus (presumably the system bus)
256
ud_manager_obj = bus.get_object("org.freedesktop.UDisks",
257
"/org/freedesktop/UDisks")
258
ud_manager = dbus.Interface(ud_manager_obj, 'org.freedesktop.UDisks')
259
for dev in ud_manager.EnumerateDevices():
260
device_obj = bus.get_object("org.freedesktop.UDisks", dev)
261
device_props = dbus.Interface(device_obj, dbus.PROPERTIES_IFACE)
262
udisks = 'org.freedesktop.UDisks.Device'
263
if not device_props.Get(udisks, "DeviceIsDrive"):
264
dev_bus = device_props.Get(udisks, "DriveConnectionInterface")
265
if dev_bus in self.device:
266
parent_model = parent_vendor = ''
267
if device_props.Get(udisks, "DeviceIsPartition"):
268
parent_obj = bus.get_object(
269
"org.freedesktop.UDisks",
270
device_props.Get(udisks, "PartitionSlave"))
271
parent_props = dbus.Interface(
272
parent_obj, dbus.PROPERTIES_IFACE)
273
parent_model = parent_props.Get(udisks, "DriveModel")
274
parent_vendor = parent_props.Get(udisks, "DriveVendor")
275
parent_media = parent_props.Get(udisks, "DriveMedia")
277
if (dev_bus != 'sdio'
278
and not FLASH_RE.search(parent_media)
279
and not CARD_READER_RE.search(parent_model)
280
and not GENERIC_RE.search(parent_vendor)):
283
if (FLASH_RE.search(parent_media)
284
or CARD_READER_RE.search(parent_model)
285
or GENERIC_RE.search(parent_vendor)):
287
dev_file = str(device_props.Get(udisks, "DeviceFile"))
288
dev_speed = str(device_props.Get(udisks,
289
"DriveConnectionSpeed"))
290
self.rem_disks_speed[dev_file] = dev_speed
291
if len(device_props.Get(udisks, "DeviceMountPaths")) > 0:
292
devPath = str(device_props.Get(udisks,
293
"DeviceMountPaths")[0])
294
self.rem_disks[dev_file] = devPath
295
self.rem_disks_memory_cards[dev_file] = devPath
297
self.rem_disks_nm[dev_file] = None
298
self.rem_disks_memory_cards_nm[dev_file] = None
303
for key in self.rem_disks_nm:
304
temp_dir = tempfile.mkdtemp()
305
if self._mount(key, temp_dir) != 0:
306
logging.error("can't mount %s", key)
308
passed_mount[key] = temp_dir
310
if len(self.rem_disks_nm) == len(passed_mount):
311
self.rem_disks_nm = passed_mount
314
count = len(self.rem_disks_nm) - len(passed_mount)
315
self.rem_disks_nm = passed_mount
318
def _mount(self, dev_file, mount_point):
319
return subprocess.call(['mount', dev_file, mount_point])
323
for disk in self.rem_disks_nm:
324
if not self.rem_disks_nm[disk]:
326
if self._umount(disk) != 0:
328
logging.error("can't umount %s on %s",
329
disk, self.rem_disks_nm[disk])
332
def _umount(self, mount_point):
333
# '-l': lazy umount, dealing problem of unable to umount the device.
334
return subprocess.call(['umount', '-l', mount_point])
336
def clean_tmp_dir(self):
337
for disk in self.rem_disks_nm:
338
if not self.rem_disks_nm[disk]:
340
if not os.path.ismount(self.rem_disks_nm[disk]):
341
os.rmdir(self.rem_disks_nm[disk])
345
parser = argparse.ArgumentParser()
346
parser.add_argument('device',
347
choices=['usb', 'firewire', 'sdio',
348
'scsi', 'ata_serial_esata'],
350
help=("The type of removable media "
351
"(usb, firewire, sdio, scsi or ata_serial_esata)"
353
parser.add_argument('-l', '--list',
356
help="List the removable devices and mounting status")
357
parser.add_argument('-m', '--min-speed',
361
help="Minimum speed a device must support to be "
362
"considered eligible for being tested (bits/s)")
363
parser.add_argument('-p', '--pass-speed',
367
help="Minimum average throughput from all eligible"
368
"devices for the test to pass (MB/s)")
369
parser.add_argument('-i', '--iterations',
373
help=("The number of test cycles to run. One cycle is"
374
"comprised of generating --count data files of "
375
"--size bytes and writing them to each device."))
376
parser.add_argument('-c', '--count',
380
help='The number of random data files to generate')
381
parser.add_argument('-s', '--size',
385
help=("The size of the test data file to use "
386
"in Bytes. Default is %(default)s"))
387
parser.add_argument('-n', '--skip-not-mount',
390
help=("skip the removable devices "
391
"which haven't been mounted before the test."))
392
parser.add_argument('--memorycard', action="store_true",
393
help=("Memory cards devices on bus other than sdio "
394
"require this parameter to identify "
397
args = parser.parse_args()
399
test = DiskTest(args.device, args.memorycard)
402
# If we do have removable drives attached and mounted
403
if len(test.rem_disks) > 0 or len(test.rem_disks_nm) > 0:
404
if args.list: # Simply output a list of drives detected
406
print("Removable devices currently mounted:")
408
if len(test.rem_disks_memory_cards) > 0:
409
for disk, mnt_point in test.rem_disks_memory_cards.items():
410
print("%s : %s" % (disk, mnt_point))
414
print("Removable devices currently not mounted:")
415
if len(test.rem_disks_memory_cards_nm) > 0:
416
for disk in test.rem_disks_memory_cards_nm:
421
if len(test.rem_disks) > 0:
422
for disk, mnt_point in test.rem_disks.items():
423
print("%s : %s" % (disk, mnt_point))
427
print("Removable devices currently not mounted:")
428
if len(test.rem_disks_nm) > 0:
429
for disk in test.rem_disks_nm:
438
else: # Create a file, copy to disk and compare hashes
439
if args.skip_not_mount:
440
disks_all = test.rem_disks
442
# mount those haven't be mounted yet.
443
errors_mount = test.mount()
446
print("There're total %d device(s) failed at mounting."
448
errors += errors_mount
450
disks_all = dict(list(test.rem_disks.items())
451
+ list(test.rem_disks_nm.items()))
453
if len(disks_all) > 0:
454
print("Found the following mounted %s partitions:"
455
% ', '.join(args.device))
457
for disk, mount_point in disks_all.items():
458
supported_speed = test.rem_disks_speed[disk]
459
print(" %s : %s : %s bits/s" %
460
(disk, mount_point, supported_speed),
463
and int(args.min_speed) > int(supported_speed)):
464
print(" (Will not test it, speed is below %s bits/s)" %
465
args.min_speed, end="")
471
disks_eligible = {disk: disks_all[disk] for disk in disks_all
472
if not args.min_speed or
473
int(test.rem_disks_speed[disk])
474
>= int(args.min_speed)}
477
# Generate our data file(s)
478
for count in range(args.count):
479
test_files[count] = RandomData(args.size)
480
write_sizes.append(os.path.getsize(
481
test_files[count].tfile.name))
482
total_write_size = sum(write_sizes)
485
for disk, mount_point in disks_eligible.items():
486
print("%s (Total Data Size / iteration: %0.4f MB):" %
487
(disk, (total_write_size / 1024 / 1024)))
488
iteration_write_size = (
489
total_write_size * args.iterations) / 1024 / 1024
490
iteration_write_times = []
491
for iteration in range(args.iterations):
492
target_file_list = []
494
for file_index in range(args.count):
495
parent_file = test_files[file_index].tfile.name
496
parent_hash = md5_hash_file(parent_file)
498
test_files[file_index].name +
500
target_path = mount_point
501
target_file = os.path.join(target_path,
503
target_file_list.append(target_file)
504
test.read_file(parent_file)
505
with ActionTimer() as timer:
506
if not test.write_file(test.data,
509
"Failed to copy %s to %s",
510
parent_file, target_file)
513
write_times.append(timer.interval)
514
child_hash = md5_hash_file(target_file)
515
if parent_hash != child_hash:
517
"[Iteration %s] Parent and Child"
518
" copy hashes mismatch on %s!",
519
iteration, target_file)
521
"\tParent hash: %s", parent_hash)
523
"\tChild hash: %s", child_hash)
525
for file in target_file_list:
527
total_write_time = sum(write_times)
528
avg_write_time = total_write_time / args.count
531
total_write_size / total_write_time)
533
except ZeroDivisionError:
534
avg_write_speed = 0.00
536
iteration_write_times.append(total_write_time)
537
print("\t[Iteration %s] Average Speed: %0.4f"
538
% (iteration, avg_write_speed))
539
for iteration in range(args.iterations):
540
iteration_write_time = sum(iteration_write_times)
542
print("\t\tTotal Data Attempted: %0.4f MB"
543
% iteration_write_size)
544
print("\t\tTotal Time to write: %0.4f secs"
545
% iteration_write_time)
546
print("\t\tAverage Write Time: %0.4f secs" %
547
(iteration_write_time / args.iterations))
549
avg_write_speed = (iteration_write_size /
550
iteration_write_time)
551
except ZeroDivisionError:
552
avg_write_speed = 0.00
554
print("\t\tAverage Write Speed: %0.4f MB/s" %
557
for key in range(args.count):
558
test.clean_up(test_files[key].tfile.name)
559
if (len(test.rem_disks_nm) > 0):
560
if test.umount() != 0:
566
"Completed %s test iterations, but there were"
567
" errors", args.count)
569
elif len(disks_eligible) == 0:
571
"No %s disks with speed higher than %s bits/s",
572
args.device, args.min_speed)
576
#Pass is not assured!
577
if (not args.pass_speed or
578
avg_write_speed >= args.pass_speed):
581
print("FAIL: Average speed was lower than desired "
582
"pass speed of %s MB/s" % args.pass_speed)
585
logging.error("No device being mounted successfully "
586
"for testing, aborting")
589
else: # If we don't have removable drives attached and mounted
590
logging.error("No removable drives were detected, aborting")
593
if __name__ == '__main__':