3
# udisks2 integration test suite
5
# Run in udisks built tree to test local built binaries (needs
6
# --localstatedir=/var), or from anywhere else to test system installed
11
# src/tests/integration-test
12
# - Run only a particular class of tests:
13
# src/tests/integration-test Drive
14
# - Run only a single test:
15
# src/tests/integration-test FS.test_ext3
17
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
19
# This program is free software; you can redistribute it and/or modify
20
# it under the terms of the GNU General Public License as published by
21
# the Free Software Foundation; either version 2 of the License, or
22
# (at your option) any later version.
24
# This program is distributed in the hope that it will be useful,
25
# but WITHOUT ANY WARRANTY; without even the implied warranty of
26
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27
# GNU General Public License for more details.
30
# - add and test method for changing LUKS passphrase
31
# - test Format with take-ownership
37
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
38
libdir = os.path.join(srcdir, 'udisks', '.libs')
40
# as we can't change LD_LIBRARY_PATH within a running program, and doing
41
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
43
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
44
os.environ['LD_LIBRARY_PATH'] = libdir
45
os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
47
os.environ.get('GI_TYPELIB_PATH', ''))
48
os.execv(sys.argv[0], sys.argv)
49
assert False, 'not expecting to land here'
61
from gi.repository import GLib, Gio, UDisks
63
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
64
VDEV_SIZE = 300000000 # size of virtual test device
66
# Those file systems are known to have a broken handling of permissions, in
67
# particular the executable bit
68
BROKEN_PERMISSIONS_FS = ['ntfs']
70
# Some D-BUS API methods cause properties to not be up to date yet when a
71
# method call finishes, thus we do an udevadm settle as a workaround. Those
72
# methods should eventually get fixed properly, but it's unnerving to have
73
# the tests fail on them when you are working on something else. This flag
74
# gets set by the --no-workarounds option to disable those syncs, so that these
75
# race conditions can be fixed.
76
workaround_syncs = False
78
no_options = GLib.Variant('a{sv}', {})
80
# ----------------------------------------------------------------------------
82
class UDisksTestCase(unittest.TestCase):
83
'''Base class for udisks test cases.
85
This provides static functions which are useful for all test cases.
96
def init(klass, logfile=None):
97
'''start daemon and set up test environment'''
100
print('this test suite needs to run as root', file=sys.stderr)
103
# run from local build tree if we are in one, otherwise use system instance
104
daemon_path = os.path.join(srcdir, 'src', 'udisksd')
105
if (os.access (daemon_path, os.X_OK)):
106
klass.tool_path = 'tools/.libs/udisksctl'
107
print('Testing binaries from local build tree')
108
klass.check_build_tree_config()
110
print('Testing installed system binaries')
112
for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
113
if l.startswith('Exec='):
114
daemon_path = l.split('=', 1)[1].split()[0]
116
assert daemon_path, 'could not determine daemon path from D-BUS .service file'
118
klass.tool_path = 'udisksctl'
119
print('daemon path: ' + daemon_path)
121
for l in open('/usr/share/dbus-1/system-services/org.freedesktop.PolicyKit1.service'):
122
if l.startswith('Exec='):
123
klass.polkit_path = l.split('=', 1)[1].split()[0].strip()
125
assert klass.polkit_path, 'could not determine polkit path from D-BUS .service file'
126
print('polkitd path: ' + klass.polkit_path)
128
klass.device = klass.setup_vdev()
130
# use a D-BUS config which permits root and nobody
131
klass.dbus_conf = tempfile.NamedTemporaryFile()
132
klass.dbus_conf.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN"
133
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
136
<listen>unix:tmpdir=/tmp</listen>
137
<policy context="default">
138
<allow send_destination="*" eavesdrop="true"/>
139
<allow eavesdrop="true"/>
142
<allow user="nobody"/>
146
klass.dbus_conf.flush()
148
# start polkit and udisks on a private DBus
149
dbus = subprocess.Popen(['dbus-launch', '--config-file=' + klass.dbus_conf.name],
150
stdout=subprocess.PIPE, universal_newlines=True)
151
dbus_out = dbus.communicate()[0]
152
for l in dbus_out.splitlines():
153
k, v = l.split('=', 1)
154
if k == 'DBUS_SESSION_BUS_PID':
155
klass.dbus_pid = int(v)
156
if k == 'DBUS_SESSION_BUS_ADDRESS':
157
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = v
158
# do not try to communicate with the current desktop session; this will
159
# confuse it, as it cannot see this D-BUS instance
161
del os.environ['DISPLAY']
165
klass.daemon_log = open(logfile, 'w')
167
klass.daemon_log = tempfile.TemporaryFile()
168
klass.daemon = subprocess.Popen([daemon_path],
169
stdout=klass.daemon_log, stderr=subprocess.STDOUT)
170
assert klass.daemon.pid, 'daemon failed to start'
172
atexit.register(klass.cleanup)
174
# wait until the daemon has started up
176
while klass.manager is None and timeout > 0:
178
klass.client = UDisks.Client.new_sync(None)
179
assert klass.client != None
180
klass.manager = klass.client.get_manager()
182
assert klass.manager, 'daemon failed to start'
188
'''stop daemon again and clean up test environment'''
190
subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
192
os.kill(klass.daemon.pid, signal.SIGTERM)
196
klass.teardown_vdev(klass.device)
199
klass.dbus_conf.close()
200
del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
201
os.kill(klass.dbus_pid, signal.SIGTERM)
205
'''Wait until pending events finished processing.
207
This should only be called for situations where we genuinely have an
208
asynchronous response, like invoking a CLI program and waiting for
209
udev/udisks to catch up on the change events.
211
subprocess.call(['udevadm', 'settle'])
212
klass.client.settle()
215
def sync_workaround(klass):
216
'''Wait until pending events finished processing (bug workaround).
218
This should be called for race conditions in the D-BUS API which cause
219
properties to not be up to date yet when a method call finishes. Those
220
should eventually get fixed properly, but it's unnerving to have the
221
tests fail on them when you are working on something else.
223
This sync is not done if running with --no-workarounds.
229
def zero_device(klass):
230
subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
231
stderr=subprocess.PIPE)
235
def devname(klass, partition=None):
236
'''Get name of test device or one of its partitions'''
239
if klass.device[-1].isdigit():
240
return klass.device + 'p' + str(partition)
242
return klass.device + str(partition)
247
def udisks_block(klass, partition=None):
248
'''Get UDisksBlock object for test device or partition'''
251
devname = klass.devname(partition)
252
dev_t = os.stat(devname).st_rdev
253
block = klass.client.get_block_for_dev(dev_t)
254
assert block, 'did not find an UDisksBlock object for %s' % devname
258
def udisks_filesystem(klass, partition=None):
259
'''Get UDisksFilesystem object for test device or partition
261
Return None if there is no file system on that device.
263
block = klass.udisks_block(partition)
264
return klass.client.get_object(block.get_object_path()).get_property('filesystem')
267
def blkid(klass, partition=None, device=None):
268
'''Call blkid and return dictionary of results.'''
271
device = klass.devname(partition)
273
cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
275
(key, value) = l.decode('UTF-8').split('=', 1)
276
result[key] = value.strip()
277
assert cmd.wait() == 0
281
def is_mountpoint(klass, path):
282
'''Check if given path is a mount point.'''
284
return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
287
def mkfs(klass, type, label=None, partition=None):
288
'''Create file system using mkfs.'''
291
assert label is None, 'minix does not support labels'
293
# work around mkswap not properly cleaning up an existing reiserfs
294
# signature (mailed kzak about it)
296
subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
297
stdout=subprocess.PIPE)
299
mkcmd = { 'swap': 'mkswap',
301
label_opt = { 'vfat': '-n',
304
extra_opt = { 'vfat': [ '-I', '-F', '32'],
306
'xfs': ['-f'], # XFS complains if there's an existing FS, so force
307
'ext2': ['-F'], # ext* complains about using entire device, so force
314
cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
316
cmd += [label_opt.get(type, '-L'), label]
317
cmd.append(klass.devname(partition))
319
subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
321
# kernel/udev generally detect those changes itself, but do not quite
322
# tell us when they are done; so do a little kludge here to know how
323
# long we need to wait
324
subprocess.call(['udevadm', 'trigger', '--action=change',
325
'--sysname-match=' + os.path.basename(klass.devname(partition))])
329
def fs_create(klass, partition, type, options):
330
'''Create file system using udisks.'''
332
block = klass.udisks_block(partition)
333
block.call_format_sync(type, options, None)
334
klass.sync_workaround()
337
def retry_busy(klass, fn, *args):
338
'''Call a function until it does not fail with "Busy".'''
344
except GLib.GError as e:
345
if not 'UDisks2.Error.DeviceBusy' in e.message:
347
sys.stderr.write('[busy] ')
352
def check_build_tree_config(klass):
353
'''Check configuration of build tree'''
355
# read make variables
357
var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
358
make = subprocess.Popen(['make', '-p', '/dev/null'],
359
stdout=subprocess.PIPE)
360
for l in make.stdout:
361
l = l.decode('UTF-8')
364
make_vars[m.group(1)] = m.group(2)
367
# expand make variables
368
subst_re = re.compile('\${([a-zA-Z_]+)}')
369
for (k, v) in make_vars.items():
371
m = subst_re.search(v)
373
v = subst_re.sub(make_vars.get(m.group(1), ''), v)
378
# check localstatedir
379
for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
380
os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
381
if not os.path.exists(d):
382
sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
386
def setup_vdev(klass):
387
'''create virtual test device
389
It is zeroed out initially.
391
Return the device path.
393
# ensure that the scsi_debug module is loaded
394
if os.path.isdir('/sys/module/scsi_debug'):
395
sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
398
assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
399
VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
401
# wait until all drives are created
404
dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
406
assert len(dirs) == 1
408
# determine the debug block devices
409
devs = os.listdir(dirs[0])
410
assert len(devs) == 1
411
dev = '/dev/' + devs[0]
412
assert os.path.exists(dev)
414
# let's be 100% sure that we pick a virtual one
415
assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
417
print('Set up test device: ' + dev)
421
def teardown_vdev(klass, device):
422
'''release and remove virtual test device'''
424
klass.remove_device(device)
425
assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
426
'Failure to rmmod scsi_debug'
429
def remove_device(klass, device):
430
'''remove virtual test device'''
432
device = device.split('/')[-1]
433
if os.path.exists('/sys/block/' + device):
434
f = open('/sys/block/%s/device/delete' % device, 'w')
437
while os.path.exists(device):
440
time.sleep(0.5) # TODO
443
def readd_devices(klass):
444
'''re-add virtual test devices after removal'''
446
scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
447
assert len(scan_files) > 0
449
open(f, 'w').write('- - -\n')
450
while not os.path.exists(klass.device):
455
# ----------------------------------------------------------------------------
457
class Manager(UDisksTestCase):
458
'''UDisksManager operations'''
460
def test_version(self):
463
self.assertTrue(self.manager.get_property('version')[0].isdigit())
465
def test_loop_rw(self):
466
'''loop device R/W'''
468
with tempfile.NamedTemporaryFile() as f:
469
f.truncate(100000000)
470
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
472
(path, out_fd_list) = self.manager.call_loop_setup_sync(
473
GLib.Variant('h', 0), # fd index
479
obj = self.client.get_object(path)
480
loop = obj.get_property('loop')
481
block = obj.get_property('block')
482
self.assertNotEqual(block, None)
483
self.assertNotEqual(loop, None)
484
self.assertEqual(obj.get_property('filesystem'), None)
487
self.assertEqual(loop.get_property('backing-file'), f.name)
489
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
490
block.call_format_sync('ext2', options, None)
492
self.assertNotEqual(obj.get_property('filesystem'), None)
494
self.assertEqual(block.get_property('id-label'), 'foo')
495
self.assertEqual(block.get_property('id-usage'), 'filesystem')
496
self.assertEqual(block.get_property('id-type'), 'ext2')
498
loop.call_delete_sync(no_options, None)
500
def test_loop_ro(self):
501
'''loop device R/O'''
503
with tempfile.NamedTemporaryFile() as f:
504
f.truncate(100000000)
505
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
507
(path, out_fd_list) = self.manager.call_loop_setup_sync(
508
GLib.Variant('h', 0), # fd index
509
GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
514
obj = self.client.get_object(path)
515
loop = obj.get_property('loop')
516
block = obj.get_property('block')
517
self.assertNotEqual(block, None)
518
self.assertNotEqual(loop, None)
519
self.assertEqual(obj.get_property('filesystem'), None)
522
self.assertEqual(loop.get_property('backing-file'), f.name)
524
# can't format due to permission error
525
self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
528
self.assertEqual(block.get_property('id-label'), '')
529
self.assertEqual(block.get_property('id-usage'), '')
530
self.assertEqual(block.get_property('id-type'), '')
533
loop.call_delete_sync(no_options, None)
535
# ----------------------------------------------------------------------------
537
class Drive(UDisksTestCase):
541
self.drive = self.client.get_drive_for_block(self.udisks_block())
542
self.assertNotEqual(self.drive, None)
544
def test_properties(self):
545
'''properties of UDisksDrive object'''
547
self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
548
self.assertEqual(self.drive.get_property('vendor'), 'Linux')
549
self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
550
self.assertEqual(self.drive.get_property('media-available'), True)
551
self.assertEqual(self.drive.get_property('optical'), False)
553
self.assertNotEqual(len(self.drive.get_property('serial')), 0)
554
self.assertNotEqual(len(self.drive.get_property('revision')), 0)
556
# ----------------------------------------------------------------------------
558
class FS(UDisksTestCase):
559
'''Test detection of all supported file systems'''
562
self.workdir = tempfile.mkdtemp()
563
self.block = self.udisks_block()
564
self.assertNotEqual(self.block, None)
567
if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
568
sys.stderr.write('[cleanup unmount] ')
569
shutil.rmtree (self.workdir)
572
'''properties of zeroed out device'''
575
self.assertEqual(self.block.get_property('device'), self.device)
576
self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
577
self.assertEqual(self.block.get_property('hint-system'), True)
578
self.assertEqual(self.block.get_property('id-label'), '')
579
self.assertEqual(self.block.get_property('id-usage'), '')
580
self.assertEqual(self.block.get_property('id-type'), '')
581
self.assertEqual(self.block.get_property('id-uuid'), '')
582
self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
583
obj = self.client.get_object(self.block.get_object_path())
584
self.assertEqual(obj.get_property('filesystem'), None)
585
self.assertEqual(obj.get_property('partition'), None)
586
self.assertEqual(obj.get_property('partition-table'), None)
590
self._do_fs_check('ext2')
594
self._do_fs_check('ext3')
598
self._do_fs_check('ext4')
600
def test_btrfs(self):
602
self._do_fs_check('btrfs')
604
def test_minix(self):
606
self._do_fs_check('minix')
610
self._do_fs_check('xfs')
614
self._do_fs_check('ntfs')
618
self._do_fs_check('vfat')
620
def test_reiserfs(self):
622
self._do_fs_check('reiserfs')
626
self._do_fs_check('swap')
628
def test_nilfs2(self):
630
self._do_fs_check('nilfs2')
632
def test_empty(self):
635
self.mkfs('ext4', 'foo')
636
block = self.udisks_block()
637
self.assertEqual(block.get_property('id-usage'), 'filesystem')
638
self.assertEqual(block.get_property('id-type'), 'ext4')
639
self.assertEqual(block.get_property('id-label'), 'foo')
640
self.assertNotEqual(self.udisks_filesystem(), None)
642
self.fs_create(None, 'empty', no_options)
644
self.assertEqual(block.get_property('id-usage'), '')
645
self.assertEqual(block.get_property('id-type'), '')
646
self.assertEqual(block.get_property('id-label'), '')
647
self.assertEqual(self.udisks_filesystem(), None)
649
def test_create_fs_unknown_type(self):
650
'''Format() with unknown type'''
653
self.fs_create(None, 'bogus', no_options)
654
self.fail('Expected failure for bogus file system')
655
except GLib.GError as e:
656
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
657
self.assertTrue('type bogus' in e.message)
659
def test_create_fs_unsupported_label(self):
660
'''Format() with unsupported label'''
662
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
664
self.fs_create(None, 'minix', options)
665
self.fail('Expected failure for unsupported label')
666
except GLib.GError as e:
667
self.assertTrue('UDisks2.Error.NotSupported' in e.message)
669
def test_force_removal(self):
670
'''fs: forced removal'''
672
# create a fs and mount it
673
self.mkfs('ext4', 'udiskstest')
674
fs = self.udisks_filesystem()
675
mount_path = fs.call_mount_sync(no_options, None)
676
self.assertTrue(mount_path.endswith('udiskstest'))
677
self.assertTrue('/media/' in mount_path)
678
self.assertTrue(self.is_mountpoint(mount_path))
680
dev_t = os.stat(self.devname()).st_rdev
682
# removal should clean up mounts
683
self.remove_device(self.device)
684
self.assertFalse(os.path.exists(mount_path))
685
self.assertEqual(self.client.get_block_for_dev(dev_t), None)
687
# after putting it back, it should be mountable again
689
fs = self.udisks_filesystem()
690
self.assertEqual(fs.get_property('mount-points'), [])
692
mount_path = fs.call_mount_sync(no_options, None)
693
self.assertTrue(mount_path.endswith('udiskstest'))
694
self.assertTrue('/media/' in mount_path)
695
self.assertTrue(self.is_mountpoint(mount_path))
697
self.assertEqual(fs.get_property('mount-points'), [mount_path])
699
self.retry_busy(fs.call_unmount_sync, no_options, None)
701
self.assertEqual(fs.get_property('mount-points'), [])
703
def _do_fs_check(self, type):
704
'''Run checks for a particular file system.'''
706
if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
707
stdout=subprocess.PIPE) != 0:
708
sys.stderr.write('[no mkfs.%s, skip] ' % type)
710
# check correct D-Bus exception
712
self.fs_create(None, type, no_options)
713
self.fail('Expected failure for missing mkfs.' + type)
714
except GLib.GError as e:
715
self.assertTrue('UDisks2.Error.Failed' in e.message)
718
# do checks with command line tools (mkfs/mount/umount)
719
sys.stderr.write('[cli] ')
722
self._do_cli_check(type)
724
self._do_cli_check(type, 'test%stst' % type)
726
# put a different fs here instead of zeroing, so that we verify that
727
# udisks overrides existing FS (e. g. XFS complains then), and does not
728
# leave traces of other FS around
734
# do checks with udisks operations
735
sys.stderr.write('[ud] ')
736
self._do_udisks_check(type)
738
self._do_udisks_check(type, 'test%stst' % type)
739
# also test fs_create with an empty label
740
self._do_udisks_check(type, '')
742
def _do_cli_check(self, type, label=None):
743
'''udisks correctly picks up file system changes from command line tools'''
745
self.mkfs(type, label)
747
block = self.udisks_block()
749
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
751
self.assertEqual(block.get_property('id-type'), type)
752
self.assertEqual(block.get_property('id-label'), label or '')
753
self.assertEqual(block.get_property('hint-name'), '')
755
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
757
obj = self.client.get_object(self.block.get_object_path())
758
self.assertEqual(obj.get_property('partition'), None)
759
self.assertEqual(obj.get_property('partition-table'), None)
761
fs = obj.get_property('filesystem')
763
self.assertEqual(fs, None)
765
self.assertNotEqual(fs, None)
771
if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
772
stdout=subprocess.PIPE) == 0:
773
# prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
774
# defaults to ntfs-3g if installed); TODO: check other distros
775
mount_prog = 'mount.ntfs-3g'
778
ret = subprocess.call([mount_prog, self.device, self.workdir])
781
sys.stderr.write('[missing kernel driver, skip] ')
783
self.assertEqual(ret, 0)
786
self.assertEqual(fs.get_property('mount-points'), [self.workdir])
789
subprocess.call(['umount', self.workdir])
791
self.assertEqual(fs.get_property('mount-points'), [])
793
def _do_udisks_check(self, type, label=None):
794
'''udisks API correctly changes file system'''
797
if label is not None:
798
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
801
self.fs_create(None, type, options)
805
self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
806
self.assertEqual(id['ID_FS_TYPE'], type)
807
self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
809
block = self.udisks_block()
810
self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
811
self.assertEqual(block.get_property('id-type'), type)
812
self.assertEqual(block.get_property('id-label'), label or '')
817
obj = self.client.get_object(self.block.get_object_path())
818
self.assertEqual(obj.get_property('partition'), None)
819
self.assertEqual(obj.get_property('partition-table'), None)
821
fs = self.udisks_filesystem()
822
self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
823
self.assertEqual(fs.get_property('mount-points'), [])
826
mount_path = fs.call_mount_sync(no_options, None)
828
self.assertTrue(mount_path.startswith('/run/media/'), mount_path)
830
self.assertTrue(mount_path.endswith(label))
833
self.assertEqual(fs.get_property('mount-points'), [mount_path])
834
self.assertTrue(self.is_mountpoint(mount_path))
836
# no ownership taken, should be root owned
837
st = os.stat(mount_path)
838
self.assertEqual((st.st_uid, st.st_gid), (0, 0))
840
self._do_file_perms_checks(type, mount_path)
843
self.retry_busy(fs.call_unmount_sync, no_options, None)
844
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
845
self.assertEqual(fs.get_property('mount-points'), [mount_path])
847
# create fs with taking ownership (daemon:mail == 1:8)
848
#if supports_unix_owners:
849
# options.append('take_ownership_uid=1')
850
# options.append('take_ownership_gid=8')
851
# self.fs_create(None, type, options)
852
# mount_path = iface.FilesystemMount('', [])
853
# st = os.stat(mount_path)
854
# self.assertEqual((st.st_uid, st.st_gid), (1, 8))
855
# self.retry_busy(self.partition_iface().FilesystemUnmount, [])
856
# self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
860
l = 'n"a\m\\"e' + type
862
# VFAT does not support some characters
863
self.assertRaises(GLib.GError, fs.call_set_label_sync, l,
867
fs.call_set_label_sync(l, no_options, None)
868
except GLib.GError as e:
869
if 'UDisks2.Error.NotSupported' in e.message:
870
# these fses are known to not support relabeling
871
self.assertTrue(type in ['minix', 'btrfs'])
877
block = self.udisks_block()
878
blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
879
'\\x5c', '\\').replace('\\x24', '$')
880
self.sync_workaround()
882
# EXFAIL: often (but not always) the label appears in all upper case
883
self.assertEqual(blkid_label.upper(), l.upper())
884
self.assertEqual(block.get_property('id-label').upper(), l.upper())
886
self.assertEqual(blkid_label, l)
887
self.assertEqual(block.get_property('id-label'), l)
889
# test setting empty label
890
fs.call_set_label_sync('', no_options, None)
891
self.sync_workaround()
892
self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
893
self.assertEqual(block.get_property('id-label'), '')
895
# check fs - Not implemented in udisks yet
896
#self.assertEqual(iface.FilesystemCheck([]), True)
898
def _do_file_perms_checks(self, type, mount_point):
899
'''Check for permissions for data files and executables.
901
This particularly checks sane and useful permissions on non-Unix file
904
if type in BROKEN_PERMISSIONS_FS:
907
f = os.path.join(mount_point, 'simpledata.txt')
909
self.assertTrue(os.access(f, os.R_OK))
910
self.assertTrue(os.access(f, os.W_OK))
911
self.assertFalse(os.access(f, os.X_OK))
913
f = os.path.join(mount_point, 'simple.exe')
914
shutil.copy('/bin/bash', f)
915
self.assertTrue(os.access(f, os.R_OK))
916
self.assertTrue(os.access(f, os.W_OK))
917
self.assertTrue(os.access(f, os.X_OK))
919
os.mkdir(os.path.join(mount_point, 'subdir'))
920
f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
922
self.assertTrue(os.access(f, os.R_OK))
923
self.assertTrue(os.access(f, os.W_OK))
924
self.assertFalse(os.access(f, os.X_OK))
926
f = os.path.join(mount_point, 'subdir', 'subdir.exe')
927
shutil.copy('/bin/bash', f)
928
self.assertTrue(os.access(f, os.R_OK))
929
self.assertTrue(os.access(f, os.W_OK))
930
self.assertTrue(os.access(f, os.X_OK))
932
## ----------------------------------------------------------------------------
934
class Smart(UDisksTestCase):
935
'''Check SMART operation.'''
938
'''SMART status of first internal hard disk
940
This is a best-effort readonly test.
944
if not os.path.exists(hd):
945
sys.stderr.write('[skip] ')
948
has_smart = subprocess.call(['skdump', '--can-smart', hd],
949
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
951
block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
952
self.assertNotEqual(block, None)
953
drive = self.client.get_drive_for_block(block)
954
ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
955
self.assertEqual(ata != None, has_smart)
958
sys.stderr.write('[avail] ')
959
self.assertEqual(ata.get_property('smart-supported'), True)
960
self.assertEqual(ata.get_property('smart-enabled'), True)
962
# wait for SMART data to be read
963
while ata.get_property('smart-updated') == 0:
964
sys.stderr.write('[wait for data] ')
967
# this is of course not truly correct for a test suite, but let's
968
# consider it a courtesy for developers :-)
969
self.assertEqual(ata.get_property('smart-failing'), False)
970
self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
972
sys.stderr.write('[N/A] ')
975
# ----------------------------------------------------------------------------
977
class Luks(UDisksTestCase):
981
'''clean up behind failed test cases'''
983
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
985
encrypted = crypt_obj.get_property('encrypted')
988
encrypted.call_lock_sync(no_options, None)
989
sys.stderr.write('[cleanup lock] ')
993
# needs to run before the other tests
994
def test_0_create_teardown(self):
995
'''LUKS create/teardown'''
997
self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
998
'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
999
'label': GLib.Variant('s', 'treasure'),
1003
block = self.udisks_block()
1004
obj = self.client.get_object(block.get_object_path())
1005
self.assertEqual(obj.get_property('filesystem'), None)
1006
encrypted = obj.get_property('encrypted')
1007
self.assertNotEqual(encrypted, None)
1009
# check crypted device info
1010
self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
1011
self.assertEqual(block.get_property('id-usage'), 'crypto')
1012
self.assertEqual(block.get_property('id-label'), '')
1013
self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
1014
self.assertEqual(block.get_property('device'), self.devname())
1016
# check whether we can lock/unlock; we also need this to get the
1018
encrypted.call_lock_sync(no_options, None)
1019
self.assertRaises(GLib.GError, encrypted.call_lock_sync,
1023
self.assertRaises(GLib.GError, encrypted.call_unlock_sync,
1024
'h4ckpassword', no_options, None)
1026
clear_path = encrypted.call_unlock_sync('s3kr1t',
1029
# check cleartext device info
1030
clear_obj = self.client.get_object(clear_path)
1031
self.assertEqual(clear_obj.get_property('encrypted'), None)
1032
clear_block = clear_obj.get_property('block')
1033
self.assertEqual(clear_block.get_property('id-type'), 'ext4')
1034
self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
1035
self.assertEqual(clear_block.get_property('id-label'), 'treasure')
1036
self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
1037
clear_dev = clear_block.get_property('device')
1038
self.assertNotEqual(clear_dev, None)
1039
self.assertEqual(clear_block.get_property('id-uuid'),
1040
self.blkid(device=clear_dev)['ID_FS_UUID'])
1042
clear_fs = clear_obj.get_property('filesystem')
1043
self.assertEqual(clear_fs.get_property('mount-points'), [])
1045
# check that we do not leak key information
1046
udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
1047
stdout=subprocess.PIPE)
1048
out = udev_dump.communicate()[0]
1049
self.assertFalse(b's3kr1t' in out, 'password in udev properties')
1050
self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
1053
# tear down cleartext device
1054
encrypted.call_lock_sync(no_options, None)
1055
self.assertFalse(os.path.exists(clear_dev))
1057
def test_luks_mount(self):
1058
'''LUKS mount/unmount'''
1060
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1061
encrypted = crypt_obj.get_property('encrypted')
1063
path = encrypted.call_unlock_sync('s3kr1t',
1065
self.client.settle()
1066
obj = self.client.get_object(path)
1067
fs = obj.get_property('filesystem')
1068
self.assertNotEqual(fs, None)
1071
mount_path = fs.call_mount_sync(no_options, None)
1074
self.assertTrue('/media/' in mount_path)
1075
self.assertTrue(mount_path.endswith('treasure'))
1076
self.assertTrue(self.is_mountpoint(mount_path))
1077
self.client.settle()
1078
self.assertEqual(fs.get_property('mount-points'), [mount_path])
1082
encrypted.call_lock_sync(no_options, None)
1083
self.fail('Lock() unexpectedly succeeded on mounted file system')
1084
except GLib.GError as e:
1085
self.assertTrue('UDisks2.Error.Failed' in e.message)
1088
self.retry_busy(fs.call_unmount_sync, no_options, None)
1089
self.client.settle()
1090
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1091
self.assertEqual(fs.get_property('mount-points'), [])
1094
encrypted.call_lock_sync(no_options, None)
1095
self.client.settle()
1096
self.assertEqual(self.client.get_object(path), None)
1098
def test_luks_forced_removal(self):
1099
'''LUKS forced removal'''
1101
# unlock and mount it
1102
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1103
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1106
fs = self.client.get_object(path).get_property('filesystem')
1107
mount_path = fs.call_mount_sync(no_options, None)
1108
self.assertTrue('/media/' in mount_path)
1109
self.assertTrue(mount_path.endswith('treasure'))
1111
# removal should clean up mounts
1112
self.remove_device(self.device)
1113
self.assertFalse(os.path.exists(mount_path))
1114
self.assertEqual(self.client.get_object(path), None)
1116
# after putting it back, it should be mountable again
1117
self.readd_devices()
1118
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1119
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
1121
self.client.settle()
1122
fs = self.client.get_object(path).get_property('filesystem')
1123
mount_path = fs.call_mount_sync(no_options, None)
1124
self.assertTrue('/media/' in mount_path)
1125
self.assertTrue(mount_path.endswith('treasure'))
1128
self.retry_busy(fs.call_unmount_sync, no_options, None)
1129
self.client.settle()
1130
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1131
self.assertEqual(fs.get_property('mount-points'), [])
1134
crypt_obj.get_property('encrypted').call_lock_sync(
1136
self.client.settle()
1137
self.assertEqual(self.client.get_object(path), None)
1139
# ----------------------------------------------------------------------------
1141
class Polkit(UDisksTestCase):
1142
'''Check operation with polkit.'''
1145
env = os.environ.copy()
1146
env['POLKIT_DEBUG'] = '1'
1147
self.polkit = subprocess.Popen([self.polkit_path],
1148
stdout=self.daemon_log, stderr=subprocess.STDOUT, env=env)
1149
assert self.polkit.pid, 'polkitd failed to start'
1152
os.kill(self.polkit.pid, signal.SIGTERM)
1153
os.waitpid(self.polkit.pid, 0)
1155
def test_internal_fs_root(self):
1156
'''Create FS on internal drive as root'''
1158
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitext4')})
1159
self.fs_create(None, 'ext4', options)
1160
block = self.udisks_block()
1161
self.assertEqual(block.get_property('id-usage'), 'filesystem')
1162
self.assertEqual(block.get_property('id-type'), 'ext4')
1163
self.assertEqual(block.get_property('id-label'), 'polkitext4')
1165
def test_internal_fs_nobody(self):
1166
'''Try to create FS on internal drive as nobody'''
1168
# ensure we have a mountable file system
1169
self.fs_create(None, 'ext4', no_options)
1171
nobody = pwd.getpwnam('nobody')
1172
assert nobody.pw_uid > 0
1174
# we cannot just change euid and do the call, as polkit will remember
1175
# our process which started out as root; so call the external tool
1176
tool_mount = subprocess.Popen([self.tool_path, 'mount',
1177
'--no-user-interaction', '-b', self.device],
1178
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1179
universal_newlines=True,
1180
preexec_fn=lambda: (os.setgid(nobody.pw_gid), os.setuid(nobody.pw_uid)))
1181
out, err = tool_mount.communicate()
1182
self.assertTrue('Error.NotAuthorized' in err, err)
1184
# ----------------------------------------------------------------------------
1186
if __name__ == '__main__':
1187
argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
1188
argparser.add_argument('-l', '--log-file', dest='logfile',
1189
help='write daemon log to a file')
1190
argparser.add_argument('-w', '--no-workarounds',
1191
action="store_true", default=False,
1192
help='Disable workarounds for race conditions in the D-BUS API')
1193
argparser.add_argument('testname', nargs='*',
1194
help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
1195
args = argparser.parse_args()
1197
workaround_syncs = not args.no_workarounds
1199
UDisksTestCase.init(logfile=args.logfile)
1201
tests = unittest.TestLoader().loadTestsFromNames(args.testname,
1202
__import__('__main__'))
1204
tests = unittest.TestLoader().loadTestsFromName('__main__')
1205
if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():