3
# udisks2 integration test suite
5
# Run in udisks built tree to test local built binaries (needs
6
# --localstatedir=/var), or from anywhere else to test system installed
11
# src/tests/integration-test
12
# - Run only a particular class of tests:
13
# src/tests/integration-test Drive
14
# - Run only a single test:
15
# src/tests/integration-test FS.test_ext3
17
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
19
# This program is free software; you can redistribute it and/or modify
20
# it under the terms of the GNU General Public License as published by
21
# the Free Software Foundation; either version 2 of the License, or
22
# (at your option) any later version.
24
# This program is distributed in the hope that it will be useful,
25
# but WITHOUT ANY WARRANTY; without even the implied warranty of
26
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27
# GNU General Public License for more details.
30
# - add and test method for changing LUKS passphrase
31
# - test Format with take-ownership
37
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
38
libdir = os.path.join(srcdir, 'udisks', '.libs')
40
# as we can't change LD_LIBRARY_PATH within a running program, and doing
41
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
43
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
44
os.environ['LD_LIBRARY_PATH'] = libdir
45
os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
47
os.environ.get('GI_TYPELIB_PATH', ''))
48
os.execv(sys.argv[0], sys.argv)
49
assert False, 'not expecting to land here'
61
from gi.repository import GLib, Gio, UDisks
63
# find local test_polkit.py
64
sys.path.insert(0, os.path.dirname(__file__))
67
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
68
VDEV_SIZE = 300000000 # size of virtual test device
70
# Those file systems are known to have a broken handling of permissions, in
71
# particular the executable bit
72
BROKEN_PERMISSIONS_FS = ['ntfs']
74
# Some D-BUS API methods cause properties to not be up to date yet when a
75
# method call finishes, thus we do an udevadm settle as a workaround. Those
76
# methods should eventually get fixed properly, but it's unnerving to have
77
# the tests fail on them when you are working on something else. This flag
78
# gets set by the --no-workarounds option to disable those syncs, so that these
79
# race conditions can be fixed.
80
workaround_syncs = False
82
no_options = GLib.Variant('a{sv}', {})
84
# ----------------------------------------------------------------------------
86
class UDisksTestCase(unittest.TestCase):
87
'''Base class for udisks test cases.
89
This provides static functions which are useful for all test cases.
100
def init(klass, logfile=None):
101
'''start daemon and set up test environment'''
103
if os.geteuid() != 0:
104
print('this test suite needs to run as root', file=sys.stderr)
107
# run from local build tree if we are in one, otherwise use system instance
108
klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd')
109
if (os.access (klass.daemon_path, os.X_OK)):
110
print('Testing binaries from local build tree')
111
klass.check_build_tree_config()
113
print('Testing installed system binaries')
114
klass.daemon_path = None
115
for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
116
if l.startswith('Exec='):
117
klass.daemon_path = l.split('=', 1)[1].split()[0]
119
assert klass.daemon_path, 'could not determine daemon path from D-BUS .service file'
121
print('daemon path: ' + klass.daemon_path)
123
(klass.device, klass.cd_device) = klass.setup_vdev()
125
# start polkit and udisks on a private DBus
126
klass.dbus = Gio.TestDBus()
128
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = klass.dbus.get_bus_address()
129
# do not try to communicate with the current desktop session; this will
130
# confuse it, as it cannot see this D-BUS instance
132
del os.environ['DISPLAY']
136
klass.daemon_log = open(logfile, 'w')
138
klass.daemon_log = tempfile.TemporaryFile()
139
atexit.register(klass.cleanup)
145
'''stop daemon again and clean up test environment'''
147
subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
151
klass.teardown_vdev(klass.device)
154
del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
158
def start_daemon(klass):
159
assert klass.daemon == None
160
klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'],
161
stdout=klass.daemon_log, stderr=subprocess.STDOUT)
162
assert klass.daemon.pid, 'daemon failed to start'
164
# wait until the daemon has started up
167
while klass.manager is None and timeout > 0:
169
klass.client = UDisks.Client.new_sync(None)
170
assert klass.client != None
171
klass.manager = klass.client.get_manager()
173
assert klass.manager, 'daemon failed to start'
174
assert klass.daemon.pid, 'daemon failed to start'
179
def stop_daemon(klass):
181
os.kill(klass.daemon.pid, signal.SIGTERM)
187
'''Wait until pending events finished processing.
189
This should only be called for situations where we genuinely have an
190
asynchronous response, like invoking a CLI program and waiting for
191
udev/udisks to catch up on the change events.
193
subprocess.call(['udevadm', 'settle'])
194
context = GLib.main_context_default()
196
# wait until all GDBus events have been processed
197
while context.pending() and timeout > 0:
198
klass.client.settle()
202
sys.stderr.write('[wait timeout!] ')
204
klass.client.settle()
207
def sync_workaround(klass):
208
'''Wait until pending events finished processing (bug workaround).
210
This should be called for race conditions in the D-BUS API which cause
211
properties to not be up to date yet when a method call finishes. Those
212
should eventually get fixed properly, but it's unnerving to have the
213
tests fail on them when you are working on something else.
215
This sync is not done if running with --no-workarounds.
221
def zero_device(klass):
222
subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
223
stderr=subprocess.PIPE)
228
def devname(klass, partition=None, cd=False):
229
'''Get name of test device or one of its partitions
231
If cd is True, return the CD device, otherwise the hard disk device.
234
dev = klass.cd_device
238
if dev[-1].isdigit():
239
return dev + 'p' + str(partition)
241
return dev + str(partition)
246
def udisks_block(klass, partition=None, cd=False):
247
'''Get UDisksBlock object for test device or partition
249
If cd is True, return the CD device, otherwise the hard disk device.
252
devname = klass.devname(partition, cd)
253
dev_t = os.stat(devname).st_rdev
254
block = klass.client.get_block_for_dev(dev_t)
255
assert block, 'did not find an UDisksBlock object for %s' % devname
259
def udisks_filesystem(klass, partition=None, cd=False):
260
'''Get UDisksFilesystem object for test device or partition
262
Return None if there is no file system on that device.
264
If cd is True, return the CD device, otherwise the hard disk device.
266
block = klass.udisks_block(partition, cd)
267
return klass.client.get_object(block.get_object_path()).get_filesystem()
270
def blkid(klass, partition=None, device=None):
271
'''Call blkid and return dictionary of results.'''
274
device = klass.devname(partition)
276
cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
278
(key, value) = l.decode('UTF-8').split('=', 1)
279
result[key] = value.strip()
280
assert cmd.wait() == 0
284
def is_mountpoint(klass, path):
285
'''Check if given path is a mount point.'''
287
return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
290
def mkfs(klass, type, label=None, partition=None):
291
'''Create file system using mkfs.'''
294
assert label is None, 'minix does not support labels'
296
# work around mkswap not properly cleaning up an existing reiserfs
297
# signature (mailed kzak about it)
299
subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
300
stdout=subprocess.PIPE)
302
mkcmd = { 'swap': 'mkswap',
305
label_opt = { 'vfat': '-n',
308
extra_opt = { 'vfat': [ '-I', '-F', '32'],
310
'xfs': ['-f'], # XFS complains if there's an existing FS, so force
311
'ext2': ['-F'], # ext* complains about using entire device, so force
318
cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
320
cmd += [label_opt.get(type, '-L'), label]
321
cmd.append(klass.devname(partition))
323
subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
325
# kernel/udev generally detect those changes itself, but do not quite
326
# tell us when they are done; so do a little kludge here to know how
327
# long we need to wait
328
subprocess.call(['udevadm', 'trigger', '--action=change',
329
'--sysname-match=' + os.path.basename(klass.devname(partition))])
333
def fs_create(klass, partition, type, options):
334
'''Create file system using udisks.'''
336
block = klass.udisks_block(partition)
337
block.call_format_sync(type, options, None)
338
klass.sync_workaround()
341
def retry_busy(klass, fn, *args):
342
'''Call a function until it does not fail with "Busy".'''
348
except GLib.GError as e:
349
if not 'UDisks2.Error.DeviceBusy' in e.message:
351
sys.stderr.write('[busy] ')
356
def check_build_tree_config(klass):
357
'''Check configuration of build tree'''
359
# read make variables
361
var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
362
make = subprocess.Popen(['make', '-p', '/dev/null'],
363
stdout=subprocess.PIPE)
364
for l in make.stdout:
365
l = l.decode('UTF-8')
368
make_vars[m.group(1)] = m.group(2)
371
# expand make variables
372
subst_re = re.compile('\${([a-zA-Z_]+)}')
373
for (k, v) in make_vars.items():
375
m = subst_re.search(v)
377
v = subst_re.sub(make_vars.get(m.group(1), ''), v)
382
# check localstatedir
383
for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
384
os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
385
if not os.path.exists(d):
386
sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
390
def setup_vdev(klass):
391
'''create virtual test devices
393
It is zeroed out initially.
395
Return a pair (writable HD device path, readonly CD device path).
397
# ensure that the scsi_debug module is loaded
398
if os.path.isdir('/sys/module/scsi_debug'):
399
sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
402
# work around scsi_debug not implementing CD-ROM SCSI commands, so that
403
# udev's cdrom_id does not recognize tracks
404
scsi_debug_rules = '/run/udev/rules.d/60-persistent-storage-scsi_debug.rules'
405
if os.path.isdir('/run/udev/rules.d') and not os.path.exists(scsi_debug_rules):
406
with open(scsi_debug_rules, 'w') as f:
407
f.write('''KERNEL=="sr*", ENV{DISK_EJECT_REQUEST}!="?*", ATTRS{model}=="scsi_debug*", ENV{ID_CDROM_MEDIA}=="?*", IMPORT{program}="/sbin/blkid -o udev -p -u noraid $tempnode"
410
subprocess.call('sync; pkill --signal HUP udevd || pkill --signal HUP systemd-udevd',
413
# craete a fake SCSI hard drive
414
assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
415
VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
417
# wait until drive got created
419
while len(rw_dirs) < 1:
420
rw_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
422
assert len(rw_dirs) == 1
424
# create a fake CD-ROM, too
425
with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
426
f.write('5') # henceforth, created devices will be CD drives
427
with open('/sys/bus/pseudo/drivers/scsi_debug/add_host', 'w') as f:
428
f.write('1') # generate a new drive
429
subprocess.call(['udevadm', 'settle'])
430
with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
434
while len(ro_dirs) < 2:
435
ro_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
437
ro_dirs.remove(rw_dirs[0])
438
assert len(ro_dirs) == 1
440
# determine the debug block devices
441
devs = os.listdir(ro_dirs[0])
442
assert len(devs) == 1
443
ro_dev = '/dev/' + devs[0]
444
devs = os.listdir(rw_dirs[0])
445
assert len(devs) == 1
446
rw_dev = '/dev/' + devs[0]
447
assert os.path.exists(ro_dev)
448
assert os.path.exists(rw_dev)
450
# let's be 100% sure that we pick a virtual one
451
assert open('/sys/block/%s/device/model' % os.path.basename(rw_dev)).read().strip() == 'scsi_debug'
453
print('Set up test device: r/w: %s, r/o: %s' % (rw_dev, ro_dev))
454
return (rw_dev, ro_dev)
457
def teardown_vdev(klass, device):
458
'''release and remove virtual test device'''
460
klass.remove_device(device)
461
assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
462
'Failure to rmmod scsi_debug'
465
def remove_device(klass, device):
466
'''remove virtual test device'''
468
device = device.split('/')[-1]
469
if os.path.exists('/sys/block/' + device):
470
f = open('/sys/block/%s/device/delete' % device, 'w')
473
while os.path.exists(device):
476
time.sleep(0.5) # TODO
479
def readd_devices(klass):
480
'''re-add virtual test devices after removal'''
482
scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
483
assert len(scan_files) > 0
485
open(f, 'w').write('- - -\n')
486
while not os.path.exists(klass.device):
491
# ----------------------------------------------------------------------------
493
class Manager(UDisksTestCase):
494
'''UDisksManager operations'''
496
def test_version(self):
499
self.assertTrue(self.manager.get_property('version')[0].isdigit())
501
def test_loop_rw(self):
502
'''loop device R/W'''
504
with tempfile.NamedTemporaryFile() as f:
505
f.truncate(100000000)
506
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
508
(path, out_fd_list) = self.manager.call_loop_setup_sync(
509
GLib.Variant('h', 0), # fd index
515
obj = self.client.get_object(path)
516
loop = obj.get_property('loop')
517
block = obj.get_property('block')
518
self.assertNotEqual(block, None)
519
self.assertNotEqual(loop, None)
520
self.assertEqual(obj.get_property('filesystem'), None)
523
self.assertEqual(loop.get_property('backing-file'), f.name)
525
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
526
block.call_format_sync('ext2', options, None)
528
self.assertNotEqual(obj.get_property('filesystem'), None)
530
self.assertEqual(block.get_property('id-label'), 'foo')
531
self.assertEqual(block.get_property('id-usage'), 'filesystem')
532
self.assertEqual(block.get_property('id-type'), 'ext2')
534
loop.call_delete_sync(no_options, None)
536
def test_loop_ro(self):
537
'''loop device R/O'''
539
with tempfile.NamedTemporaryFile() as f:
540
f.truncate(100000000)
541
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
543
(path, out_fd_list) = self.manager.call_loop_setup_sync(
544
GLib.Variant('h', 0), # fd index
545
GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
550
obj = self.client.get_object(path)
551
loop = obj.get_property('loop')
552
block = obj.get_property('block')
553
self.assertNotEqual(block, None)
554
self.assertNotEqual(loop, None)
555
self.assertEqual(obj.get_property('filesystem'), None)
558
self.assertEqual(loop.get_property('backing-file'), f.name)
560
# can't format due to permission error
561
self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
564
self.assertEqual(block.get_property('id-label'), '')
565
self.assertEqual(block.get_property('id-usage'), '')
566
self.assertEqual(block.get_property('id-type'), '')
569
loop.call_delete_sync(no_options, None)
571
# ----------------------------------------------------------------------------
573
class Drive(UDisksTestCase):
577
self.drive = self.client.get_drive_for_block(self.udisks_block())
578
self.assertNotEqual(self.drive, None)
580
def test_properties(self):
581
'''properties of UDisksDrive object'''
583
self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
584
self.assertEqual(self.drive.get_property('vendor'), 'Linux')
585
self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
586
self.assertEqual(self.drive.get_property('media-available'), True)
587
self.assertEqual(self.drive.get_property('optical'), False)
589
self.assertNotEqual(len(self.drive.get_property('serial')), 0)
590
self.assertNotEqual(len(self.drive.get_property('revision')), 0)
592
# ----------------------------------------------------------------------------
594
class FS(UDisksTestCase):
595
'''Test detection of all supported file systems'''
598
self.workdir = tempfile.mkdtemp()
599
self.block = self.udisks_block()
600
self.assertNotEqual(self.block, None)
603
if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
604
sys.stderr.write('[cleanup unmount] ')
605
shutil.rmtree (self.workdir)
608
'''properties of zeroed out device'''
611
self.assertEqual(self.block.get_property('device'), self.device)
612
self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
613
self.assertEqual(self.block.get_property('hint-system'), True)
614
self.assertEqual(self.block.get_property('id-label'), '')
615
self.assertEqual(self.block.get_property('id-usage'), '')
616
self.assertEqual(self.block.get_property('id-type'), '')
617
self.assertEqual(self.block.get_property('id-uuid'), '')
618
self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
619
obj = self.client.get_object(self.block.get_object_path())
620
self.assertEqual(obj.get_property('filesystem'), None)
621
self.assertEqual(obj.get_property('partition'), None)
622
self.assertEqual(obj.get_property('partition-table'), None)
626
self._do_fs_check('ext2')
630
self._do_fs_check('ext3')
634
self._do_fs_check('ext4')
636
def test_btrfs(self):
638
self._do_fs_check('btrfs')
640
def test_minix(self):
642
self._do_fs_check('minix')
646
self._do_fs_check('xfs')
650
self._do_fs_check('ntfs')
654
self._do_fs_check('vfat')
656
def test_reiserfs(self):
658
self._do_fs_check('reiserfs')
662
self._do_fs_check('swap')
664
def test_nilfs2(self):
666
self._do_fs_check('nilfs2')
668
def test_empty(self):
671
self.mkfs('ext4', 'foo')
672
block = self.udisks_block()
673
self.assertEqual(block.get_property('id-usage'), 'filesystem')
674
self.assertEqual(block.get_property('id-type'), 'ext4')
675
self.assertEqual(block.get_property('id-label'), 'foo')
676
self.assertNotEqual(self.udisks_filesystem(), None)
678
self.fs_create(None, 'empty', no_options)
680
self.assertEqual(block.get_property('id-usage'), '')
681
self.assertEqual(block.get_property('id-type'), '')
682
self.assertEqual(block.get_property('id-label'), '')
683
self.assertEqual(self.udisks_filesystem(), None)
685
def test_create_fs_unknown_type(self):
686
'''Format() with unknown type'''
689
self.fs_create(None, 'bogus', no_options)
690
self.fail('Expected failure for bogus file system')
691
except GLib.GError as e:
692
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
693
self.assertTrue('type bogus' in e.message)
695
def test_create_fs_unsupported_label(self):
696
'''Format() with unsupported label'''
698
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
700
self.fs_create(None, 'minix', options)
701
self.fail('Expected failure for unsupported label')
702
except GLib.GError as e:
703
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
705
def test_force_removal(self):
706
'''fs: forced removal'''
708
# create a fs and mount it
709
self.mkfs('ext4', 'udiskstest')
710
fs = self.udisks_filesystem()
711
mount_path = fs.call_mount_sync(no_options, None)
712
self.assertTrue(mount_path.endswith('udiskstest'))
713
self.assertTrue('/media/' in mount_path)
714
self.assertTrue(self.is_mountpoint(mount_path))
716
dev_t = os.stat(self.devname()).st_rdev
718
# removal should clean up mounts
719
self.remove_device(self.device)
720
self.assertFalse(os.path.exists(mount_path))
721
self.assertEqual(self.client.get_block_for_dev(dev_t), None)
723
# after putting it back, it should be mountable again
725
fs = self.udisks_filesystem()
726
self.assertEqual(fs.get_property('mount-points'), [])
728
mount_path = fs.call_mount_sync(no_options, None)
729
self.assertTrue(mount_path.endswith('udiskstest'))
730
self.assertTrue('/media/' in mount_path)
731
self.assertTrue(self.is_mountpoint(mount_path))
733
self.assertEqual(fs.get_property('mount-points'), [mount_path])
735
self.retry_busy(fs.call_unmount_sync, no_options, None)
737
self.assertEqual(fs.get_property('mount-points'), [])
739
def test_existing_manual_mount_point(self):
740
'''fs: does not reuse existing manual mount point'''
742
self.mkfs('ext4', 'udiskstest')
743
fs = self.udisks_filesystem()
745
# mount it, determine mount path, and unmount again
746
mount_path = fs.call_mount_sync(no_options, None)
747
self.assertTrue(mount_path.endswith('udiskstest'))
749
self.retry_busy(fs.call_unmount_sync, no_options, None)
751
self.assertEqual(fs.get_property('mount-points'), [])
753
# cleans up mountpoint
754
self.assertFalse(os.path.exists(mount_path))
756
# now manually create the mount point
759
# now this should use mount_path + '1'
761
new_mount_path = fs.call_mount_sync(no_options, None)
762
self.retry_busy(fs.call_unmount_sync, no_options, None)
764
self.assertEqual(fs.get_property('mount-points'), [])
765
self.assertEqual(new_mount_path, mount_path + '1')
769
def test_existing_udisks_mount_point(self):
770
'''fs: reuses existing udisks mount point'''
772
self.mkfs('ext4', 'udiskstest')
773
fs = self.udisks_filesystem()
775
# mount it, determine mount path
776
mount_path = fs.call_mount_sync(no_options, None)
777
self.assertTrue(mount_path.endswith('udiskstest'))
779
# stop the daemon (happens during a package upgrade)
780
UDisksTestCase.stop_daemon()
782
# mount should still be there; unmount it manually
783
self.assertTrue(self.is_mountpoint(mount_path))
784
subprocess.check_call(['umount', mount_path])
786
# restart daemon, mount again; this should use the same mount point as
788
UDisksTestCase.start_daemon()
789
fs = self.udisks_filesystem()
790
new_mount_path = fs.call_mount_sync(no_options, None)
791
self.retry_busy(fs.call_unmount_sync, no_options, None)
793
self.assertEqual(fs.get_property('mount-points'), [])
794
self.assertEqual(new_mount_path, mount_path)
796
def _do_fs_check(self, type):
797
'''Run checks for a particular file system.'''
801
mkfs = 'mkfs.' + type
803
if type != 'swap' and subprocess.call(['which', mkfs],
804
stdout=subprocess.PIPE) != 0:
805
sys.stderr.write('[no %s, skip] ' % mkfs)
807
# check correct D-Bus exception
809
self.fs_create(None, type, no_options)
810
self.fail('Expected failure for missing mkfs.' + type)
811
except GLib.GError as e:
812
self.assertTrue('UDisks2.Error.Failed' in e.message)
815
# do checks with command line tools (mkfs/mount/umount)
816
sys.stderr.write('[cli] ')
819
self._do_cli_check(type)
821
self._do_cli_check(type, 'test%stst' % type)
823
# put a different fs here instead of zeroing, so that we verify that
824
# udisks overrides existing FS (e. g. XFS complains then), and does not
825
# leave traces of other FS around
831
# do checks with udisks operations
832
sys.stderr.write('[ud] ')
833
self._do_udisks_check(type)
835
self._do_udisks_check(type, 'test%stst' % type)
836
# also test fs_create with an empty label
837
self._do_udisks_check(type, '')
839
def _do_cli_check(self, type, label=None):
840
'''udisks correctly picks up file system changes from command line tools'''
842
self.mkfs(type, label)
844
block = self.udisks_block()
846
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
848
self.assertEqual(block.get_property('id-type'), type)
849
self.assertEqual(block.get_property('id-label'), label or '')
850
self.assertEqual(block.get_property('hint-name'), '')
852
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
854
obj = self.client.get_object(self.block.get_object_path())
855
self.assertEqual(obj.get_property('partition'), None)
856
self.assertEqual(obj.get_property('partition-table'), None)
858
fs = obj.get_property('filesystem')
860
self.assertEqual(fs, None)
862
self.assertNotEqual(fs, None)
868
if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
869
stdout=subprocess.PIPE) == 0:
870
# prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
871
# defaults to ntfs-3g if installed); TODO: check other distros
872
mount_prog = 'mount.ntfs-3g'
875
ret = subprocess.call([mount_prog, self.device, self.workdir])
878
sys.stderr.write('[missing kernel driver, skip] ')
880
self.assertEqual(ret, 0)
884
self.assertEqual(fs.get_property('mount-points'), [self.workdir])
887
subprocess.call(['umount', self.workdir])
889
self.assertEqual(fs.get_property('mount-points'), [])
891
def _do_udisks_check(self, type, label=None):
892
'''udisks API correctly changes file system'''
895
if label is not None:
896
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
899
self.fs_create(None, type, options)
903
self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
904
self.assertEqual(id['ID_FS_TYPE'], type)
905
self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
907
block = self.udisks_block()
908
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
909
self.assertEqual(block.get_property('id-type'), type)
910
self.assertEqual(block.get_property('id-label'), label or '')
915
obj = self.client.get_object(self.block.get_object_path())
916
self.assertEqual(obj.get_property('partition'), None)
917
self.assertEqual(obj.get_property('partition-table'), None)
919
fs = self.udisks_filesystem()
920
self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
921
self.assertEqual(fs.get_property('mount-points'), [])
924
mount_path = fs.call_mount_sync(no_options, None)
926
self.assertTrue('/media/' in mount_path)
928
self.assertTrue(mount_path.endswith(label))
931
self.assertEqual(fs.get_property('mount-points'), [mount_path])
932
self.assertTrue(self.is_mountpoint(mount_path))
934
# no ownership taken, should be root owned
935
st = os.stat(mount_path)
936
self.assertEqual((st.st_uid, st.st_gid), (0, 0))
938
self._do_file_perms_checks(type, mount_path)
941
self.retry_busy(fs.call_unmount_sync, no_options, None)
942
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
943
self.assertEqual(fs.get_property('mount-points'), [mount_path])
945
# create fs with taking ownership (daemon:mail == 1:8)
946
#if supports_unix_owners:
947
# options.append('take_ownership_uid=1')
948
# options.append('take_ownership_gid=8')
949
# self.fs_create(None, type, options)
950
# mount_path = iface.FilesystemMount('', [])
951
# st = os.stat(mount_path)
952
# self.assertEqual((st.st_uid, st.st_gid), (1, 8))
953
# self.retry_busy(self.partition_iface().FilesystemUnmount, [])
954
# self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
958
l = 'n"a\m\\"e' + type
960
# VFAT does not support some characters
961
self.assertRaises(GLib.GError, fs.call_set_label_sync, l,
965
fs.call_set_label_sync(l, no_options, None)
966
except GLib.GError as e:
967
if 'UDisks2.Error.NotSupported' in e.message:
968
# these fses are known to not support relabeling
969
self.assertTrue(type in ['minix', 'btrfs'])
975
block = self.udisks_block()
976
blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
977
'\\x5c', '\\').replace('\\x24', '$')
978
self.sync_workaround()
980
# EXFAIL: often (but not always) the label appears in all upper case
981
self.assertEqual(blkid_label.upper(), l.upper())
982
self.assertEqual(block.get_property('id-label').upper(), l.upper())
984
self.assertEqual(blkid_label, l)
985
self.assertEqual(block.get_property('id-label'), l)
987
# test setting empty label
988
fs.call_set_label_sync('', no_options, None)
989
self.sync_workaround()
990
self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
991
self.assertEqual(block.get_property('id-label'), '')
993
# check fs - Not implemented in udisks yet
994
#self.assertEqual(iface.FilesystemCheck([]), True)
996
# check mounting of a read-only device
997
# this is known-broken for reiserfs and xfs right now:
998
# https://github.com/karelzak/util-linux/issues/17
999
# https://github.com/karelzak/util-linux/issues/18
1000
if type not in ['reiserfs', 'xfs']:
1001
# the scsi_debug CD drive content is the same as for the HD drive, but
1002
# udev does not know about this; so give it a nudge to re-probe
1003
subprocess.call(['udevadm', 'trigger', '--action=change',
1004
'--sysname-match=' + os.path.basename(self.cd_device)])
1007
cd_fs = self.udisks_filesystem(cd=True)
1009
mount_path = cd_fs.call_mount_sync(no_options, None)
1011
self.assertTrue('/media/' in mount_path)
1013
self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
1014
self.assertTrue(self.is_mountpoint(mount_path))
1016
self.assertEqual(self.udisks_block(cd=True).get_property('read-only'), True)
1018
self.retry_busy(cd_fs.call_unmount_sync, no_options, None)
1019
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1020
self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
1022
def _do_file_perms_checks(self, type, mount_point):
1023
'''Check for permissions for data files and executables.
1025
This particularly checks sane and useful permissions on non-Unix file
1028
if type in BROKEN_PERMISSIONS_FS:
1031
f = os.path.join(mount_point, 'simpledata.txt')
1032
open(f, 'w').close()
1033
self.assertTrue(os.access(f, os.R_OK))
1034
self.assertTrue(os.access(f, os.W_OK))
1035
self.assertFalse(os.access(f, os.X_OK))
1037
f = os.path.join(mount_point, 'simple.exe')
1038
shutil.copy('/bin/bash', f)
1039
self.assertTrue(os.access(f, os.R_OK))
1040
self.assertTrue(os.access(f, os.W_OK))
1041
self.assertTrue(os.access(f, os.X_OK))
1043
os.mkdir(os.path.join(mount_point, 'subdir'))
1044
f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
1045
open(f, 'w').close()
1046
self.assertTrue(os.access(f, os.R_OK))
1047
self.assertTrue(os.access(f, os.W_OK))
1048
self.assertFalse(os.access(f, os.X_OK))
1050
f = os.path.join(mount_point, 'subdir', 'subdir.exe')
1051
shutil.copy('/bin/bash', f)
1052
self.assertTrue(os.access(f, os.R_OK))
1053
self.assertTrue(os.access(f, os.W_OK))
1054
self.assertTrue(os.access(f, os.X_OK))
1056
## ----------------------------------------------------------------------------
1058
class Smart(UDisksTestCase):
1059
'''Check SMART operation.'''
1062
'''SMART status of first internal hard disk
1064
This is a best-effort readonly test.
1068
if not os.path.exists(hd):
1069
sys.stderr.write('[skip] ')
1072
has_smart = subprocess.call(['skdump', '--can-smart', hd],
1073
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
1075
block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
1076
self.assertNotEqual(block, None)
1077
drive = self.client.get_drive_for_block(block)
1078
ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
1079
self.assertEqual(ata != None, has_smart)
1082
sys.stderr.write('[avail] ')
1083
self.assertEqual(ata.get_property('smart-supported'), True)
1084
self.assertEqual(ata.get_property('smart-enabled'), True)
1086
# wait for SMART data to be read
1087
while ata.get_property('smart-updated') == 0:
1088
sys.stderr.write('[wait for data] ')
1091
# this is of course not truly correct for a test suite, but let's
1092
# consider it a courtesy for developers :-)
1093
self.assertEqual(ata.get_property('smart-failing'), False)
1094
self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
1096
sys.stderr.write('[N/A] ')
1099
# ----------------------------------------------------------------------------
1101
class Luks(UDisksTestCase):
1105
'''clean up behind failed test cases'''
1107
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1109
encrypted = crypt_obj.get_property('encrypted')
1112
encrypted.call_lock_sync(no_options, None)
1113
sys.stderr.write('[cleanup lock] ')
1117
# needs to run before the other tests
1118
def test_0_create_teardown(self):
1119
'''LUKS create/teardown'''
1121
self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
1122
'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
1123
'label': GLib.Variant('s', 'treasure'),
1127
block = self.udisks_block()
1128
obj = self.client.get_object(block.get_object_path())
1129
self.assertEqual(obj.get_property('filesystem'), None)
1130
encrypted = obj.get_property('encrypted')
1131
self.assertNotEqual(encrypted, None)
1133
# check crypted device info
1134
self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
1135
self.assertEqual(block.get_property('id-usage'), 'crypto')
1136
self.assertEqual(block.get_property('id-label'), '')
1137
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
1138
self.assertEqual(block.get_property('device'), self.devname())
1140
# check whether we can lock/unlock; we also need this to get the
1142
encrypted.call_lock_sync(no_options, None)
1143
self.assertRaises(GLib.GError, encrypted.call_lock_sync,
1147
self.assertRaises(GLib.GError, encrypted.call_unlock_sync,
1148
'h4ckpassword', no_options, None)
1150
clear_path = encrypted.call_unlock_sync('s3kr1t',
1153
# check cleartext device info
1154
clear_obj = self.client.get_object(clear_path)
1155
self.assertEqual(clear_obj.get_property('encrypted'), None)
1156
clear_block = clear_obj.get_property('block')
1157
self.assertEqual(clear_block.get_property('id-type'), 'ext4')
1158
self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
1159
self.assertEqual(clear_block.get_property('id-label'), 'treasure')
1160
self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
1161
clear_dev = clear_block.get_property('device')
1162
self.assertNotEqual(clear_dev, None)
1163
self.assertEqual(clear_block.get_property('id-uuid'),
1164
self.blkid(device=clear_dev)['ID_FS_UUID'])
1166
clear_fs = clear_obj.get_property('filesystem')
1167
self.assertEqual(clear_fs.get_property('mount-points'), [])
1169
# check that we do not leak key information
1170
udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
1171
stdout=subprocess.PIPE)
1172
out = udev_dump.communicate()[0]
1173
self.assertFalse(b's3kr1t' in out, 'password in udev properties')
1174
self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
1177
# tear down cleartext device
1178
encrypted.call_lock_sync(no_options, None)
1179
self.assertFalse(os.path.exists(clear_dev))
1181
def test_luks_mount(self):
1182
'''LUKS mount/unmount'''
1184
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1185
encrypted = crypt_obj.get_property('encrypted')
1187
path = encrypted.call_unlock_sync('s3kr1t',
1189
self.client.settle()
1190
obj = self.client.get_object(path)
1191
fs = obj.get_property('filesystem')
1192
self.assertNotEqual(fs, None)
1195
mount_path = fs.call_mount_sync(no_options, None)
1198
self.assertTrue('/media/' in mount_path)
1199
self.assertTrue(mount_path.endswith('treasure'))
1200
self.assertTrue(self.is_mountpoint(mount_path))
1201
self.client.settle()
1202
self.assertEqual(fs.get_property('mount-points'), [mount_path])
1206
encrypted.call_lock_sync(no_options, None)
1207
self.fail('Lock() unexpectedly succeeded on mounted file system')
1208
except GLib.GError as e:
1209
self.assertTrue('UDisks2.Error.Failed' in e.message)
1212
self.retry_busy(fs.call_unmount_sync, no_options, None)
1213
self.client.settle()
1214
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1215
self.assertEqual(fs.get_property('mount-points'), [])
1218
encrypted.call_lock_sync(no_options, None)
1219
self.client.settle()
1220
self.assertEqual(self.client.get_object(path), None)
1222
def test_luks_forced_removal(self):
1223
'''LUKS forced removal'''
1225
# unlock and mount it
1226
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1227
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1230
fs = self.client.get_object(path).get_property('filesystem')
1231
mount_path = fs.call_mount_sync(no_options, None)
1232
self.assertTrue('/media/' in mount_path)
1233
self.assertTrue(mount_path.endswith('treasure'))
1235
# removal should clean up mounts
1237
self.remove_device(self.device)
1238
self.assertFalse(os.path.exists(mount_path))
1241
if self.client.get_object(path) is None:
1244
# we do not have a main loop, and cannot currently use
1245
# g_main_context_get_default() from introspection, so
1246
# instead of refreshing self.client, get a new one
1247
self.client = UDisks.Client.new_sync(None)
1249
self.assertGreater(timeout, 0, 'timeout waiting for object path %s to disappear' % path)
1251
self.readd_devices()
1253
# after putting it back, it should be mountable again
1254
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1255
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1257
self.client.settle()
1258
fs = self.client.get_object(path).get_property('filesystem')
1259
mount_path = fs.call_mount_sync(no_options, None)
1260
self.assertTrue('/media/' in mount_path)
1261
self.assertTrue(mount_path.endswith('treasure'))
1264
self.retry_busy(fs.call_unmount_sync, no_options, None)
1265
self.client.settle()
1266
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1267
self.assertEqual(fs.get_property('mount-points'), [])
1270
crypt_obj.get_property('encrypted').call_lock_sync(
1272
self.client.settle()
1273
self.assertEqual(self.client.get_object(path), None)
1275
# ----------------------------------------------------------------------------
1277
class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase):
1278
'''Check operation with polkit.'''
1280
def test_internal_fs_forbidden(self):
1281
'''Create FS on internal drive (forbidden)'''
1283
self.start_polkitd(['org.freedesktop.udisks2.modify-device'])
1285
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitno')})
1286
with self.assertRaises(GLib.GError) as cm:
1287
self.fs_create(None, 'ext4', options)
1288
self.assertTrue('UDisks2.Error.NotAuthorized' in cm.exception.message,
1289
cm.exception.message)
1291
# did not actually do anything
1292
block = self.udisks_block()
1293
self.assertNotEqual(block.get_property('id-label'), 'polkitno')
1295
def test_internal_fs_allowed(self):
1296
'''Create FS on internal drive (allowed)'''
1298
self.start_polkitd(['org.freedesktop.udisks2.modify-device-system',
1299
'org.freedesktop.udisks2.modify-device'])
1301
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkityes')})
1302
self.fs_create(None, 'ext4', options)
1303
block = self.udisks_block()
1304
self.assertEqual(block.get_property('id-usage'), 'filesystem')
1305
self.assertEqual(block.get_property('id-type'), 'ext4')
1306
self.assertEqual(block.get_property('id-label'), 'polkityes')
1308
def test_removable_fs(self):
1309
'''Create FS on removable drive (allowed)'''
1311
self.start_polkitd(['org.freedesktop.udisks2.filesystem-mount'])
1313
# the scsi_debug CD drive content is the same as for the HD drive, but
1314
# udev does not know about this; so give it a nudge to re-probe
1315
subprocess.call(['udevadm', 'trigger', '--action=change',
1316
'--sysname-match=' + os.path.basename(self.cd_device)])
1320
fs = self.udisks_filesystem(cd=True)
1321
self.assertNotEqual(fs, None)
1322
mount_path = fs.call_mount_sync(no_options, None)
1323
self.assertTrue('/media/' in mount_path, mount_path)
1325
self.retry_busy(fs.call_unmount_sync, no_options, None)
1326
self.client.settle()
1328
# ----------------------------------------------------------------------------
1330
if __name__ == '__main__':
1331
argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
1332
argparser.add_argument('-l', '--log-file', dest='logfile',
1333
help='write daemon log to a file')
1334
argparser.add_argument('-w', '--no-workarounds',
1335
action="store_true", default=False,
1336
help='Disable workarounds for race conditions in the D-BUS API')
1337
argparser.add_argument('testname', nargs='*',
1338
help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
1339
args = argparser.parse_args()
1341
workaround_syncs = not args.no_workarounds
1343
UDisksTestCase.init(logfile=args.logfile)
1345
tests = unittest.TestLoader().loadTestsFromNames(args.testname,
1346
__import__('__main__'))
1348
tests = unittest.TestLoader().loadTestsFromName('__main__')
1349
if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():