3
# udisks2 integration test suite
5
# Run in udisks built tree to test local built binaries (needs
6
# --localstatedir=/var), or from anywhere else to test system installed
11
# src/tests/integration-test
12
# - Run only a particular class of tests:
13
# src/tests/integration-test Drive
14
# - Run only a single test:
15
# src/tests/integration-test FS.test_ext3
17
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
19
# This program is free software; you can redistribute it and/or modify
20
# it under the terms of the GNU General Public License as published by
21
# the Free Software Foundation; either version 2 of the License, or
22
# (at your option) any later version.
24
# This program is distributed in the hope that it will be useful,
25
# but WITHOUT ANY WARRANTY; without even the implied warranty of
26
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27
# GNU General Public License for more details.
30
# - add and test method for changing LUKS passphrase
31
# - test Format with take-ownership
37
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
38
libdir = os.path.join(srcdir, 'udisks', '.libs')
40
# as we can't change LD_LIBRARY_PATH within a running program, and doing
41
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
43
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
44
os.environ['LD_LIBRARY_PATH'] = libdir
45
os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
47
os.environ.get('GI_TYPELIB_PATH', ''))
48
os.execv(sys.argv[0], sys.argv)
49
assert False, 'not expecting to land here'
61
from gi.repository import GLib, Gio, UDisks
63
# find local test_polkit.py
64
sys.path.insert(0, os.path.dirname(__file__))
67
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
68
VDEV_SIZE = 300000000 # size of virtual test device
70
# Those file systems are known to have a broken handling of permissions, in
71
# particular the executable bit
72
BROKEN_PERMISSIONS_FS = ['ntfs']
74
# Some D-BUS API methods cause properties to not be up to date yet when a
75
# method call finishes, thus we do an udevadm settle as a workaround. Those
76
# methods should eventually get fixed properly, but it's unnerving to have
77
# the tests fail on them when you are working on something else. This flag
78
# gets set by the --no-workarounds option to disable those syncs, so that these
79
# race conditions can be fixed.
80
workaround_syncs = False
82
no_options = GLib.Variant('a{sv}', {})
84
# ----------------------------------------------------------------------------
86
class UDisksTestCase(unittest.TestCase):
87
'''Base class for udisks test cases.
89
This provides static functions which are useful for all test cases.
99
def init(klass, logfile=None):
100
'''start daemon and set up test environment'''
102
if os.geteuid() != 0:
103
print('this test suite needs to run as root', file=sys.stderr)
106
# run from local build tree if we are in one, otherwise use system instance
107
daemon_path = os.path.join(srcdir, 'src', 'udisksd')
108
if (os.access (daemon_path, os.X_OK)):
109
print('Testing binaries from local build tree')
110
klass.check_build_tree_config()
112
print('Testing installed system binaries')
114
for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
115
if l.startswith('Exec='):
116
daemon_path = l.split('=', 1)[1].split()[0]
118
assert daemon_path, 'could not determine daemon path from D-BUS .service file'
120
print('daemon path: ' + daemon_path)
122
klass.device = klass.setup_vdev()
124
# start polkit and udisks on a private DBus
125
klass.dbus = Gio.TestDBus()
127
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = klass.dbus.get_bus_address()
128
# do not try to communicate with the current desktop session; this will
129
# confuse it, as it cannot see this D-BUS instance
131
del os.environ['DISPLAY']
135
klass.daemon_log = open(logfile, 'w')
137
klass.daemon_log = tempfile.TemporaryFile()
138
klass.daemon = subprocess.Popen([daemon_path],
139
stdout=klass.daemon_log, stderr=subprocess.STDOUT)
140
assert klass.daemon.pid, 'daemon failed to start'
142
atexit.register(klass.cleanup)
144
# wait until the daemon has started up
146
while klass.manager is None and timeout > 0:
148
klass.client = UDisks.Client.new_sync(None)
149
assert klass.client != None
150
klass.manager = klass.client.get_manager()
152
assert klass.manager, 'daemon failed to start'
158
'''stop daemon again and clean up test environment'''
160
subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
162
os.kill(klass.daemon.pid, signal.SIGTERM)
166
klass.teardown_vdev(klass.device)
169
del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
174
'''Wait until pending events finished processing.
176
This should only be called for situations where we genuinely have an
177
asynchronous response, like invoking a CLI program and waiting for
178
udev/udisks to catch up on the change events.
180
subprocess.call(['udevadm', 'settle'])
181
context = GLib.main_context_default()
183
# wait until all GDBus events have been processed
184
while context.pending() and timeout > 0:
185
klass.client.settle()
189
sys.stderr.write('[wait timeout!] ')
191
klass.client.settle()
194
def sync_workaround(klass):
195
'''Wait until pending events finished processing (bug workaround).
197
This should be called for race conditions in the D-BUS API which cause
198
properties to not be up to date yet when a method call finishes. Those
199
should eventually get fixed properly, but it's unnerving to have the
200
tests fail on them when you are working on something else.
202
This sync is not done if running with --no-workarounds.
208
def zero_device(klass):
209
subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
210
stderr=subprocess.PIPE)
215
def devname(klass, partition=None):
216
'''Get name of test device or one of its partitions'''
219
if klass.device[-1].isdigit():
220
return klass.device + 'p' + str(partition)
222
return klass.device + str(partition)
227
def udisks_block(klass, partition=None):
228
'''Get UDisksBlock object for test device or partition'''
231
devname = klass.devname(partition)
232
dev_t = os.stat(devname).st_rdev
233
block = klass.client.get_block_for_dev(dev_t)
234
assert block, 'did not find an UDisksBlock object for %s' % devname
238
def udisks_filesystem(klass, partition=None):
239
'''Get UDisksFilesystem object for test device or partition
241
Return None if there is no file system on that device.
243
block = klass.udisks_block(partition)
244
return klass.client.get_object(block.get_object_path()).get_property('filesystem')
247
def blkid(klass, partition=None, device=None):
248
'''Call blkid and return dictionary of results.'''
251
device = klass.devname(partition)
253
cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
255
(key, value) = l.decode('UTF-8').split('=', 1)
256
result[key] = value.strip()
257
assert cmd.wait() == 0
261
def is_mountpoint(klass, path):
262
'''Check if given path is a mount point.'''
264
return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
267
def mkfs(klass, type, label=None, partition=None):
268
'''Create file system using mkfs.'''
271
assert label is None, 'minix does not support labels'
273
# work around mkswap not properly cleaning up an existing reiserfs
274
# signature (mailed kzak about it)
276
subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
277
stdout=subprocess.PIPE)
279
mkcmd = { 'swap': 'mkswap',
281
label_opt = { 'vfat': '-n',
284
extra_opt = { 'vfat': [ '-I', '-F', '32'],
286
'xfs': ['-f'], # XFS complains if there's an existing FS, so force
287
'ext2': ['-F'], # ext* complains about using entire device, so force
294
cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
296
cmd += [label_opt.get(type, '-L'), label]
297
cmd.append(klass.devname(partition))
299
subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
301
# kernel/udev generally detect those changes itself, but do not quite
302
# tell us when they are done; so do a little kludge here to know how
303
# long we need to wait
304
subprocess.call(['udevadm', 'trigger', '--action=change',
305
'--sysname-match=' + os.path.basename(klass.devname(partition))])
309
def fs_create(klass, partition, type, options):
310
'''Create file system using udisks.'''
312
block = klass.udisks_block(partition)
313
block.call_format_sync(type, options, None)
314
klass.sync_workaround()
317
def retry_busy(klass, fn, *args):
318
'''Call a function until it does not fail with "Busy".'''
324
except GLib.GError as e:
325
if not 'UDisks2.Error.DeviceBusy' in e.message:
327
sys.stderr.write('[busy] ')
332
def check_build_tree_config(klass):
333
'''Check configuration of build tree'''
335
# read make variables
337
var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
338
make = subprocess.Popen(['make', '-p', '/dev/null'],
339
stdout=subprocess.PIPE)
340
for l in make.stdout:
341
l = l.decode('UTF-8')
344
make_vars[m.group(1)] = m.group(2)
347
# expand make variables
348
subst_re = re.compile('\${([a-zA-Z_]+)}')
349
for (k, v) in make_vars.items():
351
m = subst_re.search(v)
353
v = subst_re.sub(make_vars.get(m.group(1), ''), v)
358
# check localstatedir
359
for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
360
os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
361
if not os.path.exists(d):
362
sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
366
def setup_vdev(klass):
367
'''create virtual test device
369
It is zeroed out initially.
371
Return the device path.
373
# ensure that the scsi_debug module is loaded
374
if os.path.isdir('/sys/module/scsi_debug'):
375
sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
378
assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
379
VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
381
# wait until all drives are created
384
dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
386
assert len(dirs) == 1
388
# determine the debug block devices
389
devs = os.listdir(dirs[0])
390
assert len(devs) == 1
391
dev = '/dev/' + devs[0]
392
assert os.path.exists(dev)
394
# let's be 100% sure that we pick a virtual one
395
assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
397
print('Set up test device: ' + dev)
401
def teardown_vdev(klass, device):
402
'''release and remove virtual test device'''
404
klass.remove_device(device)
405
assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
406
'Failure to rmmod scsi_debug'
409
def remove_device(klass, device):
410
'''remove virtual test device'''
412
device = device.split('/')[-1]
413
if os.path.exists('/sys/block/' + device):
414
f = open('/sys/block/%s/device/delete' % device, 'w')
417
while os.path.exists(device):
420
time.sleep(0.5) # TODO
423
def readd_devices(klass):
424
'''re-add virtual test devices after removal'''
426
scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
427
assert len(scan_files) > 0
429
open(f, 'w').write('- - -\n')
430
while not os.path.exists(klass.device):
435
# ----------------------------------------------------------------------------
437
class Manager(UDisksTestCase):
438
'''UDisksManager operations'''
440
def test_version(self):
443
self.assertTrue(self.manager.get_property('version')[0].isdigit())
445
def test_loop_rw(self):
446
'''loop device R/W'''
448
with tempfile.NamedTemporaryFile() as f:
449
f.truncate(100000000)
450
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
452
(path, out_fd_list) = self.manager.call_loop_setup_sync(
453
GLib.Variant('h', 0), # fd index
459
obj = self.client.get_object(path)
460
loop = obj.get_property('loop')
461
block = obj.get_property('block')
462
self.assertNotEqual(block, None)
463
self.assertNotEqual(loop, None)
464
self.assertEqual(obj.get_property('filesystem'), None)
467
self.assertEqual(loop.get_property('backing-file'), f.name)
469
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
470
block.call_format_sync('ext2', options, None)
472
self.assertNotEqual(obj.get_property('filesystem'), None)
474
self.assertEqual(block.get_property('id-label'), 'foo')
475
self.assertEqual(block.get_property('id-usage'), 'filesystem')
476
self.assertEqual(block.get_property('id-type'), 'ext2')
478
loop.call_delete_sync(no_options, None)
480
def test_loop_ro(self):
481
'''loop device R/O'''
483
with tempfile.NamedTemporaryFile() as f:
484
f.truncate(100000000)
485
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
487
(path, out_fd_list) = self.manager.call_loop_setup_sync(
488
GLib.Variant('h', 0), # fd index
489
GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
494
obj = self.client.get_object(path)
495
loop = obj.get_property('loop')
496
block = obj.get_property('block')
497
self.assertNotEqual(block, None)
498
self.assertNotEqual(loop, None)
499
self.assertEqual(obj.get_property('filesystem'), None)
502
self.assertEqual(loop.get_property('backing-file'), f.name)
504
# can't format due to permission error
505
self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
508
self.assertEqual(block.get_property('id-label'), '')
509
self.assertEqual(block.get_property('id-usage'), '')
510
self.assertEqual(block.get_property('id-type'), '')
513
loop.call_delete_sync(no_options, None)
515
# ----------------------------------------------------------------------------
517
class Drive(UDisksTestCase):
521
self.drive = self.client.get_drive_for_block(self.udisks_block())
522
self.assertNotEqual(self.drive, None)
524
def test_properties(self):
525
'''properties of UDisksDrive object'''
527
self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
528
self.assertEqual(self.drive.get_property('vendor'), 'Linux')
529
self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
530
self.assertEqual(self.drive.get_property('media-available'), True)
531
self.assertEqual(self.drive.get_property('optical'), False)
533
self.assertNotEqual(len(self.drive.get_property('serial')), 0)
534
self.assertNotEqual(len(self.drive.get_property('revision')), 0)
536
# ----------------------------------------------------------------------------
538
class FS(UDisksTestCase):
539
'''Test detection of all supported file systems'''
542
self.workdir = tempfile.mkdtemp()
543
self.block = self.udisks_block()
544
self.assertNotEqual(self.block, None)
547
if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
548
sys.stderr.write('[cleanup unmount] ')
549
shutil.rmtree (self.workdir)
552
'''properties of zeroed out device'''
555
self.assertEqual(self.block.get_property('device'), self.device)
556
self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
557
self.assertEqual(self.block.get_property('hint-system'), True)
558
self.assertEqual(self.block.get_property('id-label'), '')
559
self.assertEqual(self.block.get_property('id-usage'), '')
560
self.assertEqual(self.block.get_property('id-type'), '')
561
self.assertEqual(self.block.get_property('id-uuid'), '')
562
self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
563
obj = self.client.get_object(self.block.get_object_path())
564
self.assertEqual(obj.get_property('filesystem'), None)
565
self.assertEqual(obj.get_property('partition'), None)
566
self.assertEqual(obj.get_property('partition-table'), None)
570
self._do_fs_check('ext2')
574
self._do_fs_check('ext3')
578
self._do_fs_check('ext4')
580
def test_btrfs(self):
582
self._do_fs_check('btrfs')
584
def test_minix(self):
586
self._do_fs_check('minix')
590
self._do_fs_check('xfs')
594
self._do_fs_check('ntfs')
598
self._do_fs_check('vfat')
600
def test_reiserfs(self):
602
self._do_fs_check('reiserfs')
606
self._do_fs_check('swap')
608
def test_nilfs2(self):
610
self._do_fs_check('nilfs2')
612
def test_empty(self):
615
self.mkfs('ext4', 'foo')
616
block = self.udisks_block()
617
self.assertEqual(block.get_property('id-usage'), 'filesystem')
618
self.assertEqual(block.get_property('id-type'), 'ext4')
619
self.assertEqual(block.get_property('id-label'), 'foo')
620
self.assertNotEqual(self.udisks_filesystem(), None)
622
self.fs_create(None, 'empty', no_options)
624
self.assertEqual(block.get_property('id-usage'), '')
625
self.assertEqual(block.get_property('id-type'), '')
626
self.assertEqual(block.get_property('id-label'), '')
627
self.assertEqual(self.udisks_filesystem(), None)
629
def test_create_fs_unknown_type(self):
630
'''Format() with unknown type'''
633
self.fs_create(None, 'bogus', no_options)
634
self.fail('Expected failure for bogus file system')
635
except GLib.GError as e:
636
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
637
self.assertTrue('type bogus' in e.message)
639
def test_create_fs_unsupported_label(self):
640
'''Format() with unsupported label'''
642
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
644
self.fs_create(None, 'minix', options)
645
self.fail('Expected failure for unsupported label')
646
except GLib.GError as e:
647
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
649
def test_force_removal(self):
650
'''fs: forced removal'''
652
# create a fs and mount it
653
self.mkfs('ext4', 'udiskstest')
654
fs = self.udisks_filesystem()
655
mount_path = fs.call_mount_sync(no_options, None)
656
self.assertTrue(mount_path.endswith('udiskstest'))
657
self.assertTrue('/media/' in mount_path)
658
self.assertTrue(self.is_mountpoint(mount_path))
660
dev_t = os.stat(self.devname()).st_rdev
662
# removal should clean up mounts
663
self.remove_device(self.device)
664
self.assertFalse(os.path.exists(mount_path))
665
self.assertEqual(self.client.get_block_for_dev(dev_t), None)
667
# after putting it back, it should be mountable again
669
fs = self.udisks_filesystem()
670
self.assertEqual(fs.get_property('mount-points'), [])
672
mount_path = fs.call_mount_sync(no_options, None)
673
self.assertTrue(mount_path.endswith('udiskstest'))
674
self.assertTrue('/media/' in mount_path)
675
self.assertTrue(self.is_mountpoint(mount_path))
677
self.assertEqual(fs.get_property('mount-points'), [mount_path])
679
self.retry_busy(fs.call_unmount_sync, no_options, None)
681
self.assertEqual(fs.get_property('mount-points'), [])
683
def _do_fs_check(self, type):
684
'''Run checks for a particular file system.'''
686
if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
687
stdout=subprocess.PIPE) != 0:
688
sys.stderr.write('[no mkfs.%s, skip] ' % type)
690
# check correct D-Bus exception
692
self.fs_create(None, type, no_options)
693
self.fail('Expected failure for missing mkfs.' + type)
694
except GLib.GError as e:
695
self.assertTrue('UDisks2.Error.Failed' in e.message)
698
# do checks with command line tools (mkfs/mount/umount)
699
sys.stderr.write('[cli] ')
702
self._do_cli_check(type)
704
self._do_cli_check(type, 'test%stst' % type)
706
# put a different fs here instead of zeroing, so that we verify that
707
# udisks overrides existing FS (e. g. XFS complains then), and does not
708
# leave traces of other FS around
714
# do checks with udisks operations
715
sys.stderr.write('[ud] ')
716
self._do_udisks_check(type)
718
self._do_udisks_check(type, 'test%stst' % type)
719
# also test fs_create with an empty label
720
self._do_udisks_check(type, '')
722
def _do_cli_check(self, type, label=None):
723
'''udisks correctly picks up file system changes from command line tools'''
725
self.mkfs(type, label)
727
block = self.udisks_block()
729
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
731
self.assertEqual(block.get_property('id-type'), type)
732
self.assertEqual(block.get_property('id-label'), label or '')
733
self.assertEqual(block.get_property('hint-name'), '')
735
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
737
obj = self.client.get_object(self.block.get_object_path())
738
self.assertEqual(obj.get_property('partition'), None)
739
self.assertEqual(obj.get_property('partition-table'), None)
741
fs = obj.get_property('filesystem')
743
self.assertEqual(fs, None)
745
self.assertNotEqual(fs, None)
751
if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
752
stdout=subprocess.PIPE) == 0:
753
# prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
754
# defaults to ntfs-3g if installed); TODO: check other distros
755
mount_prog = 'mount.ntfs-3g'
758
ret = subprocess.call([mount_prog, self.device, self.workdir])
761
sys.stderr.write('[missing kernel driver, skip] ')
763
self.assertEqual(ret, 0)
767
self.assertEqual(fs.get_property('mount-points'), [self.workdir])
770
subprocess.call(['umount', self.workdir])
772
self.assertEqual(fs.get_property('mount-points'), [])
774
def _do_udisks_check(self, type, label=None):
775
'''udisks API correctly changes file system'''
778
if label is not None:
779
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
782
self.fs_create(None, type, options)
786
self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
787
self.assertEqual(id['ID_FS_TYPE'], type)
788
self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
790
block = self.udisks_block()
791
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
792
self.assertEqual(block.get_property('id-type'), type)
793
self.assertEqual(block.get_property('id-label'), label or '')
798
obj = self.client.get_object(self.block.get_object_path())
799
self.assertEqual(obj.get_property('partition'), None)
800
self.assertEqual(obj.get_property('partition-table'), None)
802
fs = self.udisks_filesystem()
803
self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
804
self.assertEqual(fs.get_property('mount-points'), [])
807
mount_path = fs.call_mount_sync(no_options, None)
809
self.assertTrue(mount_path.startswith('/run/media/'), mount_path)
811
self.assertTrue(mount_path.endswith(label))
814
self.assertEqual(fs.get_property('mount-points'), [mount_path])
815
self.assertTrue(self.is_mountpoint(mount_path))
817
# no ownership taken, should be root owned
818
st = os.stat(mount_path)
819
self.assertEqual((st.st_uid, st.st_gid), (0, 0))
821
self._do_file_perms_checks(type, mount_path)
824
self.retry_busy(fs.call_unmount_sync, no_options, None)
825
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
826
self.assertEqual(fs.get_property('mount-points'), [mount_path])
828
# create fs with taking ownership (daemon:mail == 1:8)
829
#if supports_unix_owners:
830
# options.append('take_ownership_uid=1')
831
# options.append('take_ownership_gid=8')
832
# self.fs_create(None, type, options)
833
# mount_path = iface.FilesystemMount('', [])
834
# st = os.stat(mount_path)
835
# self.assertEqual((st.st_uid, st.st_gid), (1, 8))
836
# self.retry_busy(self.partition_iface().FilesystemUnmount, [])
837
# self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
841
l = 'n"a\m\\"e' + type
843
# VFAT does not support some characters
844
self.assertRaises(GLib.GError, fs.call_set_label_sync, l,
848
fs.call_set_label_sync(l, no_options, None)
849
except GLib.GError as e:
850
if 'UDisks2.Error.NotSupported' in e.message:
851
# these fses are known to not support relabeling
852
self.assertTrue(type in ['minix', 'btrfs'])
858
block = self.udisks_block()
859
blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
860
'\\x5c', '\\').replace('\\x24', '$')
861
self.sync_workaround()
863
# EXFAIL: often (but not always) the label appears in all upper case
864
self.assertEqual(blkid_label.upper(), l.upper())
865
self.assertEqual(block.get_property('id-label').upper(), l.upper())
867
self.assertEqual(blkid_label, l)
868
self.assertEqual(block.get_property('id-label'), l)
870
# test setting empty label
871
fs.call_set_label_sync('', no_options, None)
872
self.sync_workaround()
873
self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
874
self.assertEqual(block.get_property('id-label'), '')
876
# check fs - Not implemented in udisks yet
877
#self.assertEqual(iface.FilesystemCheck([]), True)
879
def _do_file_perms_checks(self, type, mount_point):
880
'''Check for permissions for data files and executables.
882
This particularly checks sane and useful permissions on non-Unix file
885
if type in BROKEN_PERMISSIONS_FS:
888
f = os.path.join(mount_point, 'simpledata.txt')
890
self.assertTrue(os.access(f, os.R_OK))
891
self.assertTrue(os.access(f, os.W_OK))
892
self.assertFalse(os.access(f, os.X_OK))
894
f = os.path.join(mount_point, 'simple.exe')
895
shutil.copy('/bin/bash', f)
896
self.assertTrue(os.access(f, os.R_OK))
897
self.assertTrue(os.access(f, os.W_OK))
898
self.assertTrue(os.access(f, os.X_OK))
900
os.mkdir(os.path.join(mount_point, 'subdir'))
901
f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
903
self.assertTrue(os.access(f, os.R_OK))
904
self.assertTrue(os.access(f, os.W_OK))
905
self.assertFalse(os.access(f, os.X_OK))
907
f = os.path.join(mount_point, 'subdir', 'subdir.exe')
908
shutil.copy('/bin/bash', f)
909
self.assertTrue(os.access(f, os.R_OK))
910
self.assertTrue(os.access(f, os.W_OK))
911
self.assertTrue(os.access(f, os.X_OK))
913
## ----------------------------------------------------------------------------
915
class Smart(UDisksTestCase):
916
'''Check SMART operation.'''
919
'''SMART status of first internal hard disk
921
This is a best-effort readonly test.
925
if not os.path.exists(hd):
926
sys.stderr.write('[skip] ')
929
has_smart = subprocess.call(['skdump', '--can-smart', hd],
930
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
932
block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
933
self.assertNotEqual(block, None)
934
drive = self.client.get_drive_for_block(block)
935
ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
936
self.assertEqual(ata != None, has_smart)
939
sys.stderr.write('[avail] ')
940
self.assertEqual(ata.get_property('smart-supported'), True)
941
self.assertEqual(ata.get_property('smart-enabled'), True)
943
# wait for SMART data to be read
944
while ata.get_property('smart-updated') == 0:
945
sys.stderr.write('[wait for data] ')
948
# this is of course not truly correct for a test suite, but let's
949
# consider it a courtesy for developers :-)
950
self.assertEqual(ata.get_property('smart-failing'), False)
951
self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
953
sys.stderr.write('[N/A] ')
956
# ----------------------------------------------------------------------------
958
class Luks(UDisksTestCase):
962
'''clean up behind failed test cases'''
964
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
966
encrypted = crypt_obj.get_property('encrypted')
969
encrypted.call_lock_sync(no_options, None)
970
sys.stderr.write('[cleanup lock] ')
974
# needs to run before the other tests
975
def test_0_create_teardown(self):
976
'''LUKS create/teardown'''
978
self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
979
'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
980
'label': GLib.Variant('s', 'treasure'),
984
block = self.udisks_block()
985
obj = self.client.get_object(block.get_object_path())
986
self.assertEqual(obj.get_property('filesystem'), None)
987
encrypted = obj.get_property('encrypted')
988
self.assertNotEqual(encrypted, None)
990
# check crypted device info
991
self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
992
self.assertEqual(block.get_property('id-usage'), 'crypto')
993
self.assertEqual(block.get_property('id-label'), '')
994
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
995
self.assertEqual(block.get_property('device'), self.devname())
997
# check whether we can lock/unlock; we also need this to get the
999
encrypted.call_lock_sync(no_options, None)
1000
self.assertRaises(GLib.GError, encrypted.call_lock_sync,
1004
self.assertRaises(GLib.GError, encrypted.call_unlock_sync,
1005
'h4ckpassword', no_options, None)
1007
clear_path = encrypted.call_unlock_sync('s3kr1t',
1010
# check cleartext device info
1011
clear_obj = self.client.get_object(clear_path)
1012
self.assertEqual(clear_obj.get_property('encrypted'), None)
1013
clear_block = clear_obj.get_property('block')
1014
self.assertEqual(clear_block.get_property('id-type'), 'ext4')
1015
self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
1016
self.assertEqual(clear_block.get_property('id-label'), 'treasure')
1017
self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
1018
clear_dev = clear_block.get_property('device')
1019
self.assertNotEqual(clear_dev, None)
1020
self.assertEqual(clear_block.get_property('id-uuid'),
1021
self.blkid(device=clear_dev)['ID_FS_UUID'])
1023
clear_fs = clear_obj.get_property('filesystem')
1024
self.assertEqual(clear_fs.get_property('mount-points'), [])
1026
# check that we do not leak key information
1027
udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
1028
stdout=subprocess.PIPE)
1029
out = udev_dump.communicate()[0]
1030
self.assertFalse(b's3kr1t' in out, 'password in udev properties')
1031
self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
1034
# tear down cleartext device
1035
encrypted.call_lock_sync(no_options, None)
1036
self.assertFalse(os.path.exists(clear_dev))
1038
def test_luks_mount(self):
1039
'''LUKS mount/unmount'''
1041
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1042
encrypted = crypt_obj.get_property('encrypted')
1044
path = encrypted.call_unlock_sync('s3kr1t',
1046
self.client.settle()
1047
obj = self.client.get_object(path)
1048
fs = obj.get_property('filesystem')
1049
self.assertNotEqual(fs, None)
1052
mount_path = fs.call_mount_sync(no_options, None)
1055
self.assertTrue('/media/' in mount_path)
1056
self.assertTrue(mount_path.endswith('treasure'))
1057
self.assertTrue(self.is_mountpoint(mount_path))
1058
self.client.settle()
1059
self.assertEqual(fs.get_property('mount-points'), [mount_path])
1063
encrypted.call_lock_sync(no_options, None)
1064
self.fail('Lock() unexpectedly succeeded on mounted file system')
1065
except GLib.GError as e:
1066
self.assertTrue('UDisks2.Error.Failed' in e.message)
1069
self.retry_busy(fs.call_unmount_sync, no_options, None)
1070
self.client.settle()
1071
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1072
self.assertEqual(fs.get_property('mount-points'), [])
1075
encrypted.call_lock_sync(no_options, None)
1076
self.client.settle()
1077
self.assertEqual(self.client.get_object(path), None)
1079
def test_luks_forced_removal(self):
1080
'''LUKS forced removal'''
1082
# unlock and mount it
1083
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1084
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1087
fs = self.client.get_object(path).get_property('filesystem')
1088
mount_path = fs.call_mount_sync(no_options, None)
1089
self.assertTrue('/media/' in mount_path)
1090
self.assertTrue(mount_path.endswith('treasure'))
1092
# removal should clean up mounts
1093
self.remove_device(self.device)
1094
self.assertFalse(os.path.exists(mount_path))
1095
self.assertEqual(self.client.get_object(path), None)
1097
# after putting it back, it should be mountable again
1098
self.readd_devices()
1099
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1100
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1102
self.client.settle()
1103
fs = self.client.get_object(path).get_property('filesystem')
1104
mount_path = fs.call_mount_sync(no_options, None)
1105
self.assertTrue('/media/' in mount_path)
1106
self.assertTrue(mount_path.endswith('treasure'))
1109
self.retry_busy(fs.call_unmount_sync, no_options, None)
1110
self.client.settle()
1111
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1112
self.assertEqual(fs.get_property('mount-points'), [])
1115
crypt_obj.get_property('encrypted').call_lock_sync(
1117
self.client.settle()
1118
self.assertEqual(self.client.get_object(path), None)
1120
# ----------------------------------------------------------------------------
1122
class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase):
1123
'''Check operation with polkit.'''
1125
def test_internal_fs_forbidden(self):
1126
'''Create FS on internal drive (forbidden)'''
1128
self.start_polkitd(['org.freedesktop.udisks2.modify-device'])
1130
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitno')})
1131
with self.assertRaises(GLib.GError) as cm:
1132
self.fs_create(None, 'ext4', options)
1133
self.assertTrue('UDisks2.Error.NotAuthorized' in cm.exception.message,
1134
cm.exception.message)
1136
# did not actually do anything
1137
block = self.udisks_block()
1138
self.assertNotEqual(block.get_property('id-label'), 'polkitno')
1140
def test_internal_fs_allowed(self):
1141
'''Create FS on internal drive (allowed)'''
1143
self.start_polkitd(['org.freedesktop.udisks2.modify-device-system',
1144
'org.freedesktop.udisks2.modify-device'])
1146
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkityes')})
1147
self.fs_create(None, 'ext4', options)
1148
block = self.udisks_block()
1149
self.assertEqual(block.get_property('id-usage'), 'filesystem')
1150
self.assertEqual(block.get_property('id-type'), 'ext4')
1151
self.assertEqual(block.get_property('id-label'), 'polkityes')
1153
# ----------------------------------------------------------------------------
1155
if __name__ == '__main__':
1156
argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
1157
argparser.add_argument('-l', '--log-file', dest='logfile',
1158
help='write daemon log to a file')
1159
argparser.add_argument('-w', '--no-workarounds',
1160
action="store_true", default=False,
1161
help='Disable workarounds for race conditions in the D-BUS API')
1162
argparser.add_argument('testname', nargs='*',
1163
help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
1164
args = argparser.parse_args()
1166
workaround_syncs = not args.no_workarounds
1168
UDisksTestCase.init(logfile=args.logfile)
1170
tests = unittest.TestLoader().loadTestsFromNames(args.testname,
1171
__import__('__main__'))
1173
tests = unittest.TestLoader().loadTestsFromName('__main__')
1174
if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():