~ubuntu-branches/ubuntu/trusty/udisks2/trusty-proposed

« back to all changes in this revision

Viewing changes to debian/local/integration-test

  • Committer: Package Import Robot
  • Author(s): Martin Pitt
  • Date: 2012-07-28 13:35:04 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20120728133504-jmxgy789jegi5vmo
Tags: 1.99.0-1
* New upstream release (LP: #1030268)
  - Support Realtek rts5229 SD/MMC card readers. (LP: #1022497)
* Drop 00git_no_polkit_fallback.patch, upstream now.
* Drop debian/local/integration-test, shipped in upstream tarball now.
* debian/tests/upstream-system: Run test suite from upstream source.
* debian/tests/control: Simplify Depends: line using "@".
* debian/tests/control: Drop undefined "no-build-needed" feature.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python3
2
 
#
3
 
# udisks2 integration test suite
4
 
#
5
 
# Run in udisks built tree to test local built binaries (needs
6
 
# --localstatedir=/var), or from anywhere else to test system installed
7
 
# binaries. 
8
 
#
9
 
# Usage:
10
 
# - Run all tests: 
11
 
#   src/tests/integration-test
12
 
# - Run only a particular class of tests:
13
 
#   src/tests/integration-test Drive
14
 
# - Run only a single test:
15
 
#   src/tests/integration-test FS.test_ext3
16
 
#
17
 
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
18
 
#
19
 
# This program is free software; you can redistribute it and/or modify
20
 
# it under the terms of the GNU General Public License as published by
21
 
# the Free Software Foundation; either version 2 of the License, or
22
 
# (at your option) any later version.
23
 
#
24
 
# This program is distributed in the hope that it will be useful,
25
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
26
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27
 
# GNU General Public License for more details.
28
 
 
29
 
# TODO:
30
 
# - add and test method for changing LUKS passphrase
31
 
# - test Format with take-ownership
32
 
 
33
 
import sys
34
 
import os
35
 
import pwd
36
 
 
37
 
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
38
 
libdir = os.path.join(srcdir, 'udisks', '.libs')
39
 
 
40
 
# as we can't change LD_LIBRARY_PATH within a running program, and doing
41
 
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
42
 
# nasty hack
43
 
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
44
 
    os.environ['LD_LIBRARY_PATH'] = libdir
45
 
    os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
46
 
            srcdir,
47
 
            os.environ.get('GI_TYPELIB_PATH', ''))
48
 
    os.execv(sys.argv[0], sys.argv)
49
 
    assert False, 'not expecting to land here'
50
 
 
51
 
import subprocess
52
 
import unittest
53
 
import tempfile
54
 
import atexit
55
 
import time
56
 
import shutil
57
 
import signal
58
 
import argparse
59
 
import re
60
 
from glob import glob
61
 
from gi.repository import GLib, Gio, UDisks
62
 
 
63
 
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
64
 
VDEV_SIZE = 300000000 # size of virtual test device
65
 
 
66
 
# Those file systems are known to have a broken handling of permissions, in
67
 
# particular the executable bit
68
 
BROKEN_PERMISSIONS_FS = ['ntfs']
69
 
 
70
 
# Some D-BUS API methods cause properties to not be up to date yet when a
71
 
# method call finishes, thus we do an udevadm settle as a workaround. Those
72
 
# methods should eventually get fixed properly, but it's unnerving to have
73
 
# the tests fail on them when you are working on something else. This flag
74
 
# gets set by the --no-workarounds option to disable those syncs, so that these
75
 
# race conditions can be fixed.
76
 
workaround_syncs = False
77
 
 
78
 
no_options = GLib.Variant('a{sv}', {})
79
 
 
80
 
# ----------------------------------------------------------------------------
81
 
 
82
 
class UDisksTestCase(unittest.TestCase):
83
 
    '''Base class for udisks test cases.
84
 
    
85
 
    This provides static functions which are useful for all test cases.
86
 
    '''
87
 
    tool_path = None
88
 
    daemon = None
89
 
    daemon_log = None
90
 
    device = None
91
 
 
92
 
    client = None
93
 
    manager = None
94
 
 
95
 
    @classmethod
96
 
    def init(klass, logfile=None):
97
 
        '''start daemon and set up test environment'''
98
 
 
99
 
        if os.geteuid() != 0:
100
 
            print('this test suite needs to run as root', file=sys.stderr)
101
 
            sys.exit(0)
102
 
 
103
 
        # run from local build tree if we are in one, otherwise use system instance
104
 
        daemon_path = os.path.join(srcdir, 'src', 'udisksd')
105
 
        if (os.access (daemon_path, os.X_OK)):
106
 
            klass.tool_path = 'tools/.libs/udisksctl'
107
 
            print('Testing binaries from local build tree')
108
 
            klass.check_build_tree_config()
109
 
        else:
110
 
            print('Testing installed system binaries')
111
 
            daemon_path = None
112
 
            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
113
 
                if l.startswith('Exec='):
114
 
                    daemon_path = l.split('=', 1)[1].split()[0]
115
 
                    break
116
 
            assert daemon_path, 'could not determine daemon path from D-BUS .service file'
117
 
 
118
 
            klass.tool_path = 'udisksctl'
119
 
        print('daemon path: ' + daemon_path)
120
 
 
121
 
        for l in open('/usr/share/dbus-1/system-services/org.freedesktop.PolicyKit1.service'):
122
 
            if l.startswith('Exec='):
123
 
                klass.polkit_path = l.split('=', 1)[1].split()[0].strip()
124
 
                break
125
 
        assert klass.polkit_path, 'could not determine polkit path from D-BUS .service file'
126
 
        print('polkitd path: ' + klass.polkit_path)
127
 
 
128
 
        klass.device = klass.setup_vdev()
129
 
 
130
 
        # use a D-BUS config which permits root and nobody
131
 
        klass.dbus_conf = tempfile.NamedTemporaryFile()
132
 
        klass.dbus_conf.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN"
133
 
 "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
134
 
<busconfig>
135
 
  <type>test</type>
136
 
  <listen>unix:tmpdir=/tmp</listen>
137
 
  <policy context="default">
138
 
    <allow send_destination="*" eavesdrop="true"/>
139
 
    <allow eavesdrop="true"/>
140
 
    <allow own="*"/>
141
 
    <allow user="root"/>
142
 
    <allow user="nobody"/>
143
 
  </policy>
144
 
</busconfig>
145
 
''')
146
 
        klass.dbus_conf.flush()
147
 
 
148
 
        # start polkit and udisks on a private DBus
149
 
        dbus = subprocess.Popen(['dbus-launch', '--config-file=' + klass.dbus_conf.name], 
150
 
                stdout=subprocess.PIPE, universal_newlines=True)
151
 
        dbus_out = dbus.communicate()[0]
152
 
        for l in dbus_out.splitlines():
153
 
            k, v = l.split('=', 1)
154
 
            if k == 'DBUS_SESSION_BUS_PID':
155
 
                klass.dbus_pid = int(v)
156
 
            if k == 'DBUS_SESSION_BUS_ADDRESS':
157
 
                os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = v
158
 
        # do not try to communicate with the current desktop session; this will
159
 
        # confuse it, as it cannot see this D-BUS instance
160
 
        try:
161
 
            del os.environ['DISPLAY']
162
 
        except KeyError:
163
 
            pass
164
 
        if logfile:
165
 
            klass.daemon_log = open(logfile, 'w')
166
 
        else:
167
 
            klass.daemon_log = tempfile.TemporaryFile()
168
 
        klass.daemon = subprocess.Popen([daemon_path],
169
 
            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
170
 
        assert klass.daemon.pid, 'daemon failed to start'
171
 
 
172
 
        atexit.register(klass.cleanup)
173
 
 
174
 
        # wait until the daemon has started up
175
 
        timeout = 10
176
 
        while klass.manager is None and timeout > 0:
177
 
            time.sleep(0.2)
178
 
            klass.client = UDisks.Client.new_sync(None)
179
 
            assert klass.client != None
180
 
            klass.manager = klass.client.get_manager()
181
 
            timeout -= 1
182
 
        assert klass.manager, 'daemon failed to start'
183
 
 
184
 
        klass.sync()
185
 
 
186
 
    @classmethod
187
 
    def cleanup(klass):
188
 
        '''stop daemon again and clean up test environment'''
189
 
 
190
 
        subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
191
 
 
192
 
        os.kill(klass.daemon.pid, signal.SIGTERM)
193
 
        os.wait()
194
 
        klass.daemon = None
195
 
 
196
 
        klass.teardown_vdev(klass.device)
197
 
        klass.device = None
198
 
 
199
 
        klass.dbus_conf.close()
200
 
        del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
201
 
        os.kill(klass.dbus_pid, signal.SIGTERM)
202
 
 
203
 
    @classmethod
204
 
    def sync(klass):
205
 
        '''Wait until pending events finished processing.
206
 
        
207
 
        This should only be called for situations where we genuinely have an
208
 
        asynchronous response, like invoking a CLI program and waiting for
209
 
        udev/udisks to catch up on the change events.
210
 
        '''
211
 
        subprocess.call(['udevadm', 'settle'])
212
 
        klass.client.settle()
213
 
 
214
 
    @classmethod
215
 
    def sync_workaround(klass):
216
 
        '''Wait until pending events finished processing (bug workaround).
217
 
        
218
 
        This should be called for race conditions in the D-BUS API which cause
219
 
        properties to not be up to date yet when a method call finishes. Those
220
 
        should eventually get fixed properly, but it's unnerving to have the
221
 
        tests fail on them when you are working on something else.
222
 
 
223
 
        This sync is not done if running with --no-workarounds.
224
 
        '''
225
 
        if workaround_syncs:
226
 
            klass.sync()
227
 
 
228
 
    @classmethod
229
 
    def zero_device(klass):
230
 
        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
231
 
                stderr=subprocess.PIPE)
232
 
        klass.sync()
233
 
 
234
 
    @classmethod
235
 
    def devname(klass, partition=None):
236
 
        '''Get name of test device or one of its partitions'''
237
 
 
238
 
        if partition:
239
 
            if klass.device[-1].isdigit():
240
 
                return klass.device + 'p' + str(partition)
241
 
            else:
242
 
                return klass.device + str(partition)
243
 
        else:
244
 
            return klass.device
245
 
 
246
 
    @classmethod
247
 
    def udisks_block(klass, partition=None):
248
 
        '''Get UDisksBlock object for test device or partition'''
249
 
 
250
 
        assert klass.client
251
 
        devname = klass.devname(partition)
252
 
        dev_t = os.stat(devname).st_rdev
253
 
        block = klass.client.get_block_for_dev(dev_t)
254
 
        assert block, 'did not find an UDisksBlock object for %s' % devname
255
 
        return block
256
 
 
257
 
    @classmethod
258
 
    def udisks_filesystem(klass, partition=None):
259
 
        '''Get UDisksFilesystem object for test device or partition
260
 
        
261
 
        Return None if there is no file system on that device.
262
 
        '''
263
 
        block = klass.udisks_block(partition)
264
 
        return klass.client.get_object(block.get_object_path()).get_property('filesystem')
265
 
 
266
 
    @classmethod
267
 
    def blkid(klass, partition=None, device=None):
268
 
        '''Call blkid and return dictionary of results.'''
269
 
 
270
 
        if not device:
271
 
            device = klass.devname(partition)
272
 
        result = {}
273
 
        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
274
 
        for l in cmd.stdout:
275
 
            (key, value) = l.decode('UTF-8').split('=', 1)
276
 
            result[key] = value.strip()
277
 
        assert cmd.wait() == 0
278
 
        return result
279
 
 
280
 
    @classmethod
281
 
    def is_mountpoint(klass, path):
282
 
        '''Check if given path is a mount point.'''
283
 
 
284
 
        return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
285
 
 
286
 
    @classmethod
287
 
    def mkfs(klass, type, label=None, partition=None):
288
 
        '''Create file system using mkfs.'''
289
 
 
290
 
        if type == 'minix':
291
 
            assert label is None, 'minix does not support labels'
292
 
 
293
 
        # work around mkswap not properly cleaning up an existing reiserfs
294
 
        # signature (mailed kzak about it)
295
 
        if type == 'swap':
296
 
            subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
297
 
                    stdout=subprocess.PIPE)
298
 
 
299
 
        mkcmd =     { 'swap': 'mkswap',
300
 
                    }
301
 
        label_opt = { 'vfat': '-n', 
302
 
                      'reiserfs': '-l',
303
 
                    }
304
 
        extra_opt = { 'vfat': [ '-I', '-F', '32'],
305
 
                      'swap': ['-f'],
306
 
                      'xfs': ['-f'], # XFS complains if there's an existing FS, so force
307
 
                      'ext2': ['-F'], # ext* complains about using entire device, so force
308
 
                      'ext3': ['-F'],
309
 
                      'ext4': ['-F'],
310
 
                      'ntfs': ['-F'],
311
 
                      'reiserfs': ['-ff'],
312
 
                    }
313
 
 
314
 
        cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
315
 
        if label:
316
 
            cmd += [label_opt.get(type, '-L'), label]
317
 
        cmd.append(klass.devname(partition))
318
 
 
319
 
        subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
320
 
 
321
 
        # kernel/udev generally detect those changes itself, but do not quite
322
 
        # tell us when they are done; so do a little kludge here to know how
323
 
        # long we need to wait
324
 
        subprocess.call(['udevadm', 'trigger', '--action=change',
325
 
            '--sysname-match=' + os.path.basename(klass.devname(partition))])
326
 
        klass.sync()
327
 
 
328
 
    @classmethod
329
 
    def fs_create(klass, partition, type, options):
330
 
        '''Create file system using udisks.'''
331
 
 
332
 
        block = klass.udisks_block(partition)
333
 
        block.call_format_sync(type, options, None)
334
 
        klass.sync_workaround()
335
 
 
336
 
    @classmethod
337
 
    def retry_busy(klass, fn, *args):
338
 
        '''Call a function until it does not fail with "Busy".'''
339
 
 
340
 
        timeout = 10
341
 
        while timeout >= 0:
342
 
            try:
343
 
                return fn(*args)
344
 
            except GLib.GError as e:
345
 
                if not 'UDisks2.Error.DeviceBusy' in e.message:
346
 
                    raise
347
 
                sys.stderr.write('[busy] ')
348
 
                time.sleep(0.3)
349
 
                timeout -= 1
350
 
 
351
 
    @classmethod
352
 
    def check_build_tree_config(klass):
353
 
        '''Check configuration of build tree'''
354
 
 
355
 
        # read make variables
356
 
        make_vars = {}
357
 
        var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
358
 
        make = subprocess.Popen(['make', '-p', '/dev/null'],
359
 
                stdout=subprocess.PIPE)
360
 
        for l in make.stdout:
361
 
            l = l.decode('UTF-8')
362
 
            m = var_re.match(l)
363
 
            if m:
364
 
                make_vars[m.group(1)] = m.group(2)
365
 
        make.wait()
366
 
 
367
 
        # expand make variables
368
 
        subst_re = re.compile('\${([a-zA-Z_]+)}')
369
 
        for (k, v) in make_vars.items():
370
 
            while True:
371
 
                m = subst_re.search(v)
372
 
                if m:
373
 
                    v = subst_re.sub(make_vars.get(m.group(1), ''), v)
374
 
                    make_vars[k] = v
375
 
                else:
376
 
                    break
377
 
 
378
 
        # check localstatedir
379
 
        for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
380
 
                os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
381
 
            if not os.path.exists(d):
382
 
                sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
383
 
                sys.exit(0)
384
 
        
385
 
    @classmethod
386
 
    def setup_vdev(klass):
387
 
        '''create virtual test device
388
 
        
389
 
        It is zeroed out initially.
390
 
 
391
 
        Return the device path.
392
 
        '''
393
 
        # ensure that the scsi_debug module is loaded
394
 
        if os.path.isdir('/sys/module/scsi_debug'):
395
 
            sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
396
 
            sys.exit(1)
397
 
 
398
 
        assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
399
 
            VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
400
 
 
401
 
        # wait until all drives are created
402
 
        dirs = []
403
 
        while len(dirs) < 1:
404
 
            dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
405
 
            time.sleep(0.1)
406
 
        assert len(dirs) == 1
407
 
 
408
 
        # determine the debug block devices
409
 
        devs = os.listdir(dirs[0])
410
 
        assert len(devs) == 1
411
 
        dev = '/dev/' + devs[0]
412
 
        assert os.path.exists(dev)
413
 
 
414
 
        # let's be 100% sure that we pick a virtual one
415
 
        assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
416
 
 
417
 
        print('Set up test device: ' + dev)
418
 
        return dev
419
 
 
420
 
    @classmethod
421
 
    def teardown_vdev(klass, device):
422
 
        '''release and remove virtual test device'''
423
 
 
424
 
        klass.remove_device(device)
425
 
        assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
426
 
                'Failure to rmmod scsi_debug'
427
 
 
428
 
    @classmethod
429
 
    def remove_device(klass, device):
430
 
        '''remove virtual test device'''
431
 
 
432
 
        device = device.split('/')[-1]
433
 
        if os.path.exists('/sys/block/' + device):
434
 
            f = open('/sys/block/%s/device/delete' % device, 'w')
435
 
            f.write('1')
436
 
            f.close()
437
 
        while os.path.exists(device):
438
 
            time.sleep(0.1)
439
 
        klass.sync()
440
 
        time.sleep(0.5) # TODO
441
 
 
442
 
    @classmethod
443
 
    def readd_devices(klass):
444
 
        '''re-add virtual test devices after removal'''
445
 
 
446
 
        scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
447
 
        assert len(scan_files) > 0
448
 
        for f in scan_files:
449
 
            open(f, 'w').write('- - -\n')
450
 
        while not os.path.exists(klass.device):
451
 
            time.sleep(0.1)
452
 
        time.sleep(0.5)
453
 
        klass.sync()
454
 
 
455
 
# ----------------------------------------------------------------------------
456
 
 
457
 
class Manager(UDisksTestCase):
458
 
    '''UDisksManager operations'''
459
 
 
460
 
    def test_version(self):
461
 
        '''daemon version'''
462
 
 
463
 
        self.assertTrue(self.manager.get_property('version')[0].isdigit())
464
 
 
465
 
    def test_loop_rw(self):
466
 
        '''loop device R/W'''
467
 
 
468
 
        with tempfile.NamedTemporaryFile() as f:
469
 
            f.truncate(100000000)
470
 
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
471
 
 
472
 
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
473
 
                GLib.Variant('h', 0), # fd index
474
 
                no_options,
475
 
                fd_list,
476
 
                None)
477
 
            self.client.settle()
478
 
 
479
 
            obj = self.client.get_object(path)
480
 
            loop = obj.get_property('loop')
481
 
            block = obj.get_property('block')
482
 
            self.assertNotEqual(block, None)
483
 
            self.assertNotEqual(loop, None)
484
 
            self.assertEqual(obj.get_property('filesystem'), None)
485
 
 
486
 
            try:
487
 
                self.assertEqual(loop.get_property('backing-file'), f.name)
488
 
 
489
 
                options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
490
 
                block.call_format_sync('ext2', options, None)
491
 
                self.client.settle()
492
 
                self.assertNotEqual(obj.get_property('filesystem'), None)
493
 
 
494
 
                self.assertEqual(block.get_property('id-label'), 'foo')
495
 
                self.assertEqual(block.get_property('id-usage'), 'filesystem')
496
 
                self.assertEqual(block.get_property('id-type'), 'ext2')
497
 
            finally:
498
 
                loop.call_delete_sync(no_options, None)
499
 
 
500
 
    def test_loop_ro(self):
501
 
        '''loop device R/O'''
502
 
 
503
 
        with tempfile.NamedTemporaryFile() as f:
504
 
            f.truncate(100000000)
505
 
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
506
 
 
507
 
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
508
 
                GLib.Variant('h', 0), # fd index
509
 
                GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
510
 
                fd_list,
511
 
                None)
512
 
            self.client.settle()
513
 
 
514
 
            obj = self.client.get_object(path)
515
 
            loop = obj.get_property('loop')
516
 
            block = obj.get_property('block')
517
 
            self.assertNotEqual(block, None)
518
 
            self.assertNotEqual(loop, None)
519
 
            self.assertEqual(obj.get_property('filesystem'), None)
520
 
 
521
 
            try:
522
 
                self.assertEqual(loop.get_property('backing-file'), f.name)
523
 
 
524
 
                # can't format due to permission error
525
 
                self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
526
 
                        no_options, None)
527
 
 
528
 
                self.assertEqual(block.get_property('id-label'), '')
529
 
                self.assertEqual(block.get_property('id-usage'), '')
530
 
                self.assertEqual(block.get_property('id-type'), '')
531
 
            finally:
532
 
                self.client.settle()
533
 
                loop.call_delete_sync(no_options, None)
534
 
 
535
 
# ----------------------------------------------------------------------------
536
 
 
537
 
class Drive(UDisksTestCase):
538
 
    '''UDisksDrive'''
539
 
 
540
 
    def setUp(self):
541
 
        self.drive = self.client.get_drive_for_block(self.udisks_block())
542
 
        self.assertNotEqual(self.drive, None)
543
 
 
544
 
    def test_properties(self):
545
 
        '''properties of UDisksDrive object'''
546
 
 
547
 
        self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
548
 
        self.assertEqual(self.drive.get_property('vendor'), 'Linux')
549
 
        self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
550
 
        self.assertEqual(self.drive.get_property('media-available'), True)
551
 
        self.assertEqual(self.drive.get_property('optical'), False)
552
 
 
553
 
        self.assertNotEqual(len(self.drive.get_property('serial')), 0)
554
 
        self.assertNotEqual(len(self.drive.get_property('revision')), 0)
555
 
 
556
 
# ----------------------------------------------------------------------------
557
 
 
558
 
class FS(UDisksTestCase):
559
 
    '''Test detection of all supported file systems'''
560
 
 
561
 
    def setUp(self):
562
 
        self.workdir = tempfile.mkdtemp()
563
 
        self.block = self.udisks_block()
564
 
        self.assertNotEqual(self.block, None)
565
 
 
566
 
    def tearDown(self):
567
 
        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
568
 
            sys.stderr.write('[cleanup unmount] ')
569
 
        shutil.rmtree (self.workdir)
570
 
 
571
 
    def test_zero(self):
572
 
        '''properties of zeroed out device'''
573
 
 
574
 
        self.zero_device()
575
 
        self.assertEqual(self.block.get_property('device'), self.device)
576
 
        self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
577
 
        self.assertEqual(self.block.get_property('hint-system'), True)
578
 
        self.assertEqual(self.block.get_property('id-label'), '')
579
 
        self.assertEqual(self.block.get_property('id-usage'), '')
580
 
        self.assertEqual(self.block.get_property('id-type'), '')
581
 
        self.assertEqual(self.block.get_property('id-uuid'), '')
582
 
        self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
583
 
        obj = self.client.get_object(self.block.get_object_path())
584
 
        self.assertEqual(obj.get_property('filesystem'), None)
585
 
        self.assertEqual(obj.get_property('partition'), None)
586
 
        self.assertEqual(obj.get_property('partition-table'), None)
587
 
 
588
 
    def test_ext2(self):
589
 
        '''fs: ext2'''
590
 
        self._do_fs_check('ext2')
591
 
 
592
 
    def test_ext3(self):
593
 
        '''fs: ext3'''
594
 
        self._do_fs_check('ext3')
595
 
 
596
 
    def test_ext4(self):
597
 
        '''fs: ext4'''
598
 
        self._do_fs_check('ext4')
599
 
 
600
 
    def test_btrfs(self):
601
 
        '''fs: btrfs'''
602
 
        self._do_fs_check('btrfs')
603
 
 
604
 
    def test_minix(self):
605
 
        '''fs: minix'''
606
 
        self._do_fs_check('minix')
607
 
 
608
 
    def test_xfs(self):
609
 
        '''fs: XFS'''
610
 
        self._do_fs_check('xfs')
611
 
 
612
 
    def test_ntfs(self):
613
 
        '''fs: NTFS'''
614
 
        self._do_fs_check('ntfs')
615
 
 
616
 
    def test_vfat(self):
617
 
        '''fs: FAT'''
618
 
        self._do_fs_check('vfat')
619
 
 
620
 
    def test_reiserfs(self):
621
 
        '''fs: reiserfs'''
622
 
        self._do_fs_check('reiserfs')
623
 
 
624
 
    def test_swap(self):
625
 
        '''fs: swap'''
626
 
        self._do_fs_check('swap')
627
 
 
628
 
    def test_nilfs2(self):
629
 
        '''fs: nilfs2'''
630
 
        self._do_fs_check('nilfs2')
631
 
 
632
 
    def test_empty(self):
633
 
        '''fs: empty'''
634
 
 
635
 
        self.mkfs('ext4', 'foo')
636
 
        block = self.udisks_block()
637
 
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
638
 
        self.assertEqual(block.get_property('id-type'), 'ext4')
639
 
        self.assertEqual(block.get_property('id-label'), 'foo')
640
 
        self.assertNotEqual(self.udisks_filesystem(), None)
641
 
 
642
 
        self.fs_create(None, 'empty', no_options)
643
 
 
644
 
        self.assertEqual(block.get_property('id-usage'), '')
645
 
        self.assertEqual(block.get_property('id-type'), '')
646
 
        self.assertEqual(block.get_property('id-label'), '')
647
 
        self.assertEqual(self.udisks_filesystem(), None)
648
 
 
649
 
    def test_create_fs_unknown_type(self):
650
 
        '''Format() with unknown type'''
651
 
 
652
 
        try:
653
 
            self.fs_create(None, 'bogus', no_options)
654
 
            self.fail('Expected failure for bogus file system')
655
 
        except GLib.GError as e:
656
 
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
657
 
            self.assertTrue('type bogus' in e.message)
658
 
 
659
 
    def test_create_fs_unsupported_label(self):
660
 
        '''Format() with unsupported label'''
661
 
 
662
 
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
663
 
        try:
664
 
            self.fs_create(None, 'minix', options)
665
 
            self.fail('Expected failure for unsupported label')
666
 
        except GLib.GError as e:
667
 
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
668
 
 
669
 
    def test_force_removal(self):
670
 
        '''fs: forced removal'''
671
 
 
672
 
        # create a fs and mount it
673
 
        self.mkfs('ext4', 'udiskstest')
674
 
        fs = self.udisks_filesystem()
675
 
        mount_path = fs.call_mount_sync(no_options, None)
676
 
        self.assertTrue(mount_path.endswith('udiskstest'))
677
 
        self.assertTrue('/media/' in mount_path)
678
 
        self.assertTrue(self.is_mountpoint(mount_path))
679
 
 
680
 
        dev_t = os.stat(self.devname()).st_rdev
681
 
 
682
 
        # removal should clean up mounts
683
 
        self.remove_device(self.device)
684
 
        self.assertFalse(os.path.exists(mount_path))
685
 
        self.assertEqual(self.client.get_block_for_dev(dev_t), None)
686
 
 
687
 
        # after putting it back, it should be mountable again
688
 
        self.readd_devices()
689
 
        fs = self.udisks_filesystem()
690
 
        self.assertEqual(fs.get_property('mount-points'), [])
691
 
 
692
 
        mount_path = fs.call_mount_sync(no_options, None)
693
 
        self.assertTrue(mount_path.endswith('udiskstest'))
694
 
        self.assertTrue('/media/' in mount_path)
695
 
        self.assertTrue(self.is_mountpoint(mount_path))
696
 
        self.client.settle()
697
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
698
 
 
699
 
        self.retry_busy(fs.call_unmount_sync, no_options, None)
700
 
        self.client.settle()
701
 
        self.assertEqual(fs.get_property('mount-points'), [])
702
 
 
703
 
    def _do_fs_check(self, type):
704
 
        '''Run checks for a particular file system.'''
705
 
 
706
 
        if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
707
 
                stdout=subprocess.PIPE) != 0:
708
 
            sys.stderr.write('[no mkfs.%s, skip] ' % type)
709
 
 
710
 
            # check correct D-Bus exception
711
 
            try:
712
 
                self.fs_create(None, type, no_options)
713
 
                self.fail('Expected failure for missing mkfs.' + type)
714
 
            except GLib.GError as e:
715
 
                self.assertTrue('UDisks2.Error.Failed' in e.message)
716
 
            return
717
 
 
718
 
        # do checks with command line tools (mkfs/mount/umount)
719
 
        sys.stderr.write('[cli] ')
720
 
        sys.stderr.flush()
721
 
 
722
 
        self._do_cli_check(type)
723
 
        if type != 'minix':
724
 
            self._do_cli_check(type, 'test%stst' % type)
725
 
 
726
 
        # put a different fs here instead of zeroing, so that we verify that
727
 
        # udisks overrides existing FS (e. g. XFS complains then), and does not
728
 
        # leave traces of other FS around
729
 
        if type == 'ext3':
730
 
            self.mkfs('swap')
731
 
        else:
732
 
            self.mkfs('ext3')
733
 
 
734
 
        # do checks with udisks operations
735
 
        sys.stderr.write('[ud] ')
736
 
        self._do_udisks_check(type)
737
 
        if type != 'minix':
738
 
            self._do_udisks_check(type, 'test%stst' % type)
739
 
            # also test fs_create with an empty label
740
 
            self._do_udisks_check(type, '')
741
 
 
742
 
    def _do_cli_check(self, type, label=None):
743
 
        '''udisks correctly picks up file system changes from command line tools'''
744
 
 
745
 
        self.mkfs(type, label)
746
 
 
747
 
        block = self.udisks_block()
748
 
 
749
 
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
750
 
 
751
 
        self.assertEqual(block.get_property('id-type'), type)
752
 
        self.assertEqual(block.get_property('id-label'), label or '')
753
 
        self.assertEqual(block.get_property('hint-name'), '')
754
 
        if type != 'minix':
755
 
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
756
 
 
757
 
        obj = self.client.get_object(self.block.get_object_path())
758
 
        self.assertEqual(obj.get_property('partition'), None)
759
 
        self.assertEqual(obj.get_property('partition-table'), None)
760
 
 
761
 
        fs = obj.get_property('filesystem')
762
 
        if type == 'swap':
763
 
            self.assertEqual(fs, None)
764
 
        else:
765
 
            self.assertNotEqual(fs, None)
766
 
 
767
 
        if type == 'swap':
768
 
            return
769
 
 
770
 
        # mount it
771
 
        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
772
 
                stdout=subprocess.PIPE) == 0:
773
 
            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
774
 
            # defaults to ntfs-3g if installed); TODO: check other distros
775
 
            mount_prog = 'mount.ntfs-3g'
776
 
        else:
777
 
            mount_prog = 'mount'
778
 
        ret = subprocess.call([mount_prog, self.device, self.workdir])
779
 
        if ret == 32:
780
 
            # missing fs driver
781
 
            sys.stderr.write('[missing kernel driver, skip] ')
782
 
            return
783
 
        self.assertEqual(ret, 0)
784
 
 
785
 
        self.sync()
786
 
        self.assertEqual(fs.get_property('mount-points'), [self.workdir])
787
 
 
788
 
        # unmount it
789
 
        subprocess.call(['umount', self.workdir])
790
 
        self.sync()
791
 
        self.assertEqual(fs.get_property('mount-points'), [])
792
 
 
793
 
    def _do_udisks_check(self, type, label=None):
794
 
        '''udisks API correctly changes file system'''
795
 
 
796
 
        # create fs 
797
 
        if label is not None:
798
 
            options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
799
 
        else:
800
 
            options = no_options
801
 
        self.fs_create(None, type, options)
802
 
 
803
 
        # properties
804
 
        id = self.blkid()
805
 
        self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
806
 
        self.assertEqual(id['ID_FS_TYPE'], type)
807
 
        self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
808
 
 
809
 
        block = self.udisks_block()
810
 
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
811
 
        self.assertEqual(block.get_property('id-type'), type)
812
 
        self.assertEqual(block.get_property('id-label'), label or '')
813
 
 
814
 
        if type == 'swap':
815
 
            return
816
 
 
817
 
        obj = self.client.get_object(self.block.get_object_path())
818
 
        self.assertEqual(obj.get_property('partition'), None)
819
 
        self.assertEqual(obj.get_property('partition-table'), None)
820
 
 
821
 
        fs = self.udisks_filesystem()
822
 
        self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
823
 
        self.assertEqual(fs.get_property('mount-points'), [])
824
 
 
825
 
        # mount
826
 
        mount_path = fs.call_mount_sync(no_options, None)
827
 
 
828
 
        self.assertTrue(mount_path.startswith('/run/media/'), mount_path)
829
 
        if label:
830
 
            self.assertTrue(mount_path.endswith(label))
831
 
 
832
 
        self.sync()
833
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
834
 
        self.assertTrue(self.is_mountpoint(mount_path))
835
 
 
836
 
        # no ownership taken, should be root owned
837
 
        st = os.stat(mount_path)
838
 
        self.assertEqual((st.st_uid, st.st_gid), (0, 0))
839
 
 
840
 
        self._do_file_perms_checks(type, mount_path)
841
 
 
842
 
        # unmount
843
 
        self.retry_busy(fs.call_unmount_sync, no_options, None)
844
 
        self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
845
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
846
 
 
847
 
        # create fs with taking ownership (daemon:mail == 1:8)
848
 
        #if supports_unix_owners:
849
 
        #    options.append('take_ownership_uid=1')
850
 
        #    options.append('take_ownership_gid=8')
851
 
        #    self.fs_create(None, type, options)
852
 
        #    mount_path = iface.FilesystemMount('', [])
853
 
        #    st = os.stat(mount_path)
854
 
        #    self.assertEqual((st.st_uid, st.st_gid), (1, 8))
855
 
        #    self.retry_busy(self.partition_iface().FilesystemUnmount, [])
856
 
        #    self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
857
 
 
858
 
        # change label
859
 
        supported = True
860
 
        l = 'n"a\m\\"e' + type
861
 
        if type == 'vfat':
862
 
            # VFAT does not support some characters
863
 
            self.assertRaises(GLib.GError, fs.call_set_label_sync, l, 
864
 
                    no_options, None)
865
 
            l = "n@a$me"
866
 
        try:
867
 
            fs.call_set_label_sync(l, no_options, None)
868
 
        except GLib.GError as e:
869
 
            if 'UDisks2.Error.NotSupported' in e.message:
870
 
                # these fses are known to not support relabeling
871
 
                self.assertTrue(type in ['minix', 'btrfs'])
872
 
                supported = False
873
 
            else:
874
 
                raise
875
 
 
876
 
        if supported:
877
 
            block = self.udisks_block()
878
 
            blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
879
 
                    '\\x5c', '\\').replace('\\x24', '$')
880
 
            self.sync_workaround()
881
 
            if type == 'vfat':
882
 
                # EXFAIL: often (but not always) the label appears in all upper case
883
 
                self.assertEqual(blkid_label.upper(), l.upper())
884
 
                self.assertEqual(block.get_property('id-label').upper(), l.upper())
885
 
            else:
886
 
                self.assertEqual(blkid_label, l)
887
 
                self.assertEqual(block.get_property('id-label'), l)
888
 
 
889
 
            # test setting empty label
890
 
            fs.call_set_label_sync('', no_options, None)
891
 
            self.sync_workaround()
892
 
            self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
893
 
            self.assertEqual(block.get_property('id-label'), '')
894
 
 
895
 
        # check fs - Not implemented in udisks yet
896
 
        #self.assertEqual(iface.FilesystemCheck([]), True)
897
 
 
898
 
    def _do_file_perms_checks(self, type, mount_point):
899
 
        '''Check for permissions for data files and executables.
900
 
 
901
 
        This particularly checks sane and useful permissions on non-Unix file
902
 
        systems like vfat.
903
 
        '''
904
 
        if type in BROKEN_PERMISSIONS_FS:
905
 
            return
906
 
 
907
 
        f = os.path.join(mount_point, 'simpledata.txt')
908
 
        open(f, 'w').close()
909
 
        self.assertTrue(os.access(f, os.R_OK))
910
 
        self.assertTrue(os.access(f, os.W_OK))
911
 
        self.assertFalse(os.access(f, os.X_OK))
912
 
 
913
 
        f = os.path.join(mount_point, 'simple.exe')
914
 
        shutil.copy('/bin/bash', f)
915
 
        self.assertTrue(os.access(f, os.R_OK))
916
 
        self.assertTrue(os.access(f, os.W_OK))
917
 
        self.assertTrue(os.access(f, os.X_OK))
918
 
 
919
 
        os.mkdir(os.path.join(mount_point, 'subdir'))
920
 
        f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
921
 
        open(f, 'w').close()
922
 
        self.assertTrue(os.access(f, os.R_OK))
923
 
        self.assertTrue(os.access(f, os.W_OK))
924
 
        self.assertFalse(os.access(f, os.X_OK))
925
 
 
926
 
        f = os.path.join(mount_point, 'subdir', 'subdir.exe')
927
 
        shutil.copy('/bin/bash', f)
928
 
        self.assertTrue(os.access(f, os.R_OK))
929
 
        self.assertTrue(os.access(f, os.W_OK))
930
 
        self.assertTrue(os.access(f, os.X_OK))
931
 
 
932
 
## ----------------------------------------------------------------------------
933
 
 
934
 
class Smart(UDisksTestCase):
935
 
    '''Check SMART operation.'''
936
 
 
937
 
    def test_sda(self):
938
 
        '''SMART status of first internal hard disk
939
 
        
940
 
        This is a best-effort readonly test.
941
 
        '''
942
 
        hd = '/dev/sda'
943
 
 
944
 
        if not os.path.exists(hd):
945
 
            sys.stderr.write('[skip] ')
946
 
            return
947
 
 
948
 
        has_smart = subprocess.call(['skdump', '--can-smart', hd],
949
 
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
950
 
 
951
 
        block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
952
 
        self.assertNotEqual(block, None)
953
 
        drive = self.client.get_drive_for_block(block)
954
 
        ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
955
 
        self.assertEqual(ata != None, has_smart)
956
 
 
957
 
        if has_smart:
958
 
            sys.stderr.write('[avail] ')
959
 
            self.assertEqual(ata.get_property('smart-supported'), True)
960
 
            self.assertEqual(ata.get_property('smart-enabled'), True)
961
 
 
962
 
            # wait for SMART data to be read
963
 
            while ata.get_property('smart-updated') == 0:
964
 
                sys.stderr.write('[wait for data] ')
965
 
                time.sleep(0.5)
966
 
 
967
 
            # this is of course not truly correct for a test suite, but let's
968
 
            # consider it a courtesy for developers :-)
969
 
            self.assertEqual(ata.get_property('smart-failing'), False)
970
 
            self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
971
 
        else:
972
 
            sys.stderr.write('[N/A] ')
973
 
 
974
 
 
975
 
# ----------------------------------------------------------------------------
976
 
 
977
 
class Luks(UDisksTestCase):
978
 
    '''Check LUKS.'''
979
 
 
980
 
    def tearDown(self):
981
 
        '''clean up behind failed test cases'''
982
 
 
983
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
984
 
        if crypt_obj:
985
 
            encrypted = crypt_obj.get_property('encrypted')
986
 
            if encrypted:
987
 
                try:
988
 
                    encrypted.call_lock_sync(no_options, None)
989
 
                    sys.stderr.write('[cleanup lock] ')
990
 
                except GLib.GError:
991
 
                    pass
992
 
 
993
 
    # needs to run before the other tests
994
 
    def test_0_create_teardown(self):
995
 
        '''LUKS create/teardown'''
996
 
 
997
 
        self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
998
 
            'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
999
 
            'label': GLib.Variant('s', 'treasure'),
1000
 
            }))
1001
 
 
1002
 
        try:
1003
 
            block = self.udisks_block()
1004
 
            obj = self.client.get_object(block.get_object_path())
1005
 
            self.assertEqual(obj.get_property('filesystem'), None)
1006
 
            encrypted = obj.get_property('encrypted')
1007
 
            self.assertNotEqual(encrypted, None)
1008
 
 
1009
 
            # check crypted device info
1010
 
            self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
1011
 
            self.assertEqual(block.get_property('id-usage'), 'crypto')
1012
 
            self.assertEqual(block.get_property('id-label'), '')
1013
 
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
1014
 
            self.assertEqual(block.get_property('device'), self.devname())
1015
 
 
1016
 
            # check whether we can lock/unlock; we also need this to get the
1017
 
            # cleartext device
1018
 
            encrypted.call_lock_sync(no_options, None)
1019
 
            self.assertRaises(GLib.GError, encrypted.call_lock_sync, 
1020
 
                    no_options, None)
1021
 
            
1022
 
            # wrong password
1023
 
            self.assertRaises(GLib.GError, encrypted.call_unlock_sync, 
1024
 
                    'h4ckpassword', no_options, None)
1025
 
            # right password
1026
 
            clear_path = encrypted.call_unlock_sync('s3kr1t', 
1027
 
                    no_options, None)
1028
 
 
1029
 
            # check cleartext device info
1030
 
            clear_obj = self.client.get_object(clear_path)
1031
 
            self.assertEqual(clear_obj.get_property('encrypted'), None)
1032
 
            clear_block = clear_obj.get_property('block')
1033
 
            self.assertEqual(clear_block.get_property('id-type'), 'ext4')
1034
 
            self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
1035
 
            self.assertEqual(clear_block.get_property('id-label'), 'treasure')
1036
 
            self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
1037
 
            clear_dev = clear_block.get_property('device')
1038
 
            self.assertNotEqual(clear_dev, None)
1039
 
            self.assertEqual(clear_block.get_property('id-uuid'),
1040
 
                    self.blkid(device=clear_dev)['ID_FS_UUID'])
1041
 
 
1042
 
            clear_fs = clear_obj.get_property('filesystem')
1043
 
            self.assertEqual(clear_fs.get_property('mount-points'), [])
1044
 
 
1045
 
            # check that we do not leak key information
1046
 
            udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
1047
 
                    stdout=subprocess.PIPE)
1048
 
            out = udev_dump.communicate()[0]
1049
 
            self.assertFalse(b's3kr1t' in out, 'password in udev properties')
1050
 
            self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
1051
 
 
1052
 
        finally:
1053
 
            # tear down cleartext device
1054
 
            encrypted.call_lock_sync(no_options, None)
1055
 
            self.assertFalse(os.path.exists(clear_dev))
1056
 
 
1057
 
    def test_luks_mount(self):
1058
 
        '''LUKS mount/unmount'''
1059
 
 
1060
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1061
 
        encrypted = crypt_obj.get_property('encrypted')
1062
 
 
1063
 
        path = encrypted.call_unlock_sync('s3kr1t', 
1064
 
                no_options, None)
1065
 
        self.client.settle()
1066
 
        obj = self.client.get_object(path)
1067
 
        fs = obj.get_property('filesystem')
1068
 
        self.assertNotEqual(fs, None)
1069
 
 
1070
 
        # mount
1071
 
        mount_path = fs.call_mount_sync(no_options, None)
1072
 
 
1073
 
        try:
1074
 
            self.assertTrue('/media/' in mount_path)
1075
 
            self.assertTrue(mount_path.endswith('treasure'))
1076
 
            self.assertTrue(self.is_mountpoint(mount_path))
1077
 
            self.client.settle()
1078
 
            self.assertEqual(fs.get_property('mount-points'), [mount_path])
1079
 
 
1080
 
            # can't lock, busy
1081
 
            try:
1082
 
                encrypted.call_lock_sync(no_options, None)
1083
 
                self.fail('Lock() unexpectedly succeeded on mounted file system')
1084
 
            except GLib.GError as e:
1085
 
                self.assertTrue('UDisks2.Error.Failed' in e.message)
1086
 
        finally:
1087
 
            # umount
1088
 
            self.retry_busy(fs.call_unmount_sync, no_options, None)
1089
 
            self.client.settle()
1090
 
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1091
 
            self.assertEqual(fs.get_property('mount-points'), [])
1092
 
 
1093
 
            # lock
1094
 
            encrypted.call_lock_sync(no_options, None)
1095
 
            self.client.settle()
1096
 
            self.assertEqual(self.client.get_object(path), None)
1097
 
 
1098
 
    def test_luks_forced_removal(self):
1099
 
        '''LUKS forced removal'''
1100
 
 
1101
 
        # unlock and mount it
1102
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1103
 
        path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
1104
 
                no_options, None)
1105
 
        try:
1106
 
            fs = self.client.get_object(path).get_property('filesystem')
1107
 
            mount_path = fs.call_mount_sync(no_options, None)
1108
 
            self.assertTrue('/media/' in mount_path)
1109
 
            self.assertTrue(mount_path.endswith('treasure'))
1110
 
 
1111
 
            # removal should clean up mounts
1112
 
            self.remove_device(self.device)
1113
 
            self.assertFalse(os.path.exists(mount_path))
1114
 
            self.assertEqual(self.client.get_object(path), None)
1115
 
 
1116
 
            # after putting it back, it should be mountable again
1117
 
            self.readd_devices()
1118
 
            crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1119
 
            path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
1120
 
                    no_options, None)
1121
 
            self.client.settle()
1122
 
            fs = self.client.get_object(path).get_property('filesystem')
1123
 
            mount_path = fs.call_mount_sync(no_options, None)
1124
 
            self.assertTrue('/media/' in mount_path)
1125
 
            self.assertTrue(mount_path.endswith('treasure'))
1126
 
 
1127
 
            # umount
1128
 
            self.retry_busy(fs.call_unmount_sync, no_options, None)
1129
 
            self.client.settle()
1130
 
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1131
 
            self.assertEqual(fs.get_property('mount-points'), [])
1132
 
        finally:
1133
 
            # lock
1134
 
            crypt_obj.get_property('encrypted').call_lock_sync(
1135
 
                    no_options, None)
1136
 
            self.client.settle()
1137
 
            self.assertEqual(self.client.get_object(path), None)
1138
 
 
1139
 
# ----------------------------------------------------------------------------
1140
 
 
1141
 
class Polkit(UDisksTestCase):
1142
 
    '''Check operation with polkit.'''
1143
 
 
1144
 
    def setUp(self):
1145
 
        env = os.environ.copy()
1146
 
        env['POLKIT_DEBUG'] = '1'
1147
 
        self.polkit = subprocess.Popen([self.polkit_path],
1148
 
                stdout=self.daemon_log, stderr=subprocess.STDOUT, env=env)
1149
 
        assert self.polkit.pid, 'polkitd failed to start'
1150
 
 
1151
 
    def tearDown(self):
1152
 
        os.kill(self.polkit.pid, signal.SIGTERM)
1153
 
        os.waitpid(self.polkit.pid, 0)
1154
 
 
1155
 
    def test_internal_fs_root(self):
1156
 
        '''Create FS on internal drive as root'''
1157
 
 
1158
 
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitext4')})
1159
 
        self.fs_create(None, 'ext4', options)
1160
 
        block = self.udisks_block()
1161
 
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
1162
 
        self.assertEqual(block.get_property('id-type'), 'ext4')
1163
 
        self.assertEqual(block.get_property('id-label'), 'polkitext4')
1164
 
 
1165
 
    def test_internal_fs_nobody(self):
1166
 
        '''Try to create FS on internal drive as nobody'''
1167
 
 
1168
 
        # ensure we have a mountable file system
1169
 
        self.fs_create(None, 'ext4', no_options)
1170
 
 
1171
 
        nobody = pwd.getpwnam('nobody')
1172
 
        assert nobody.pw_uid > 0
1173
 
 
1174
 
        # we cannot just change euid and do the call, as polkit will remember
1175
 
        # our process which started out as root; so call the external tool
1176
 
        tool_mount = subprocess.Popen([self.tool_path, 'mount',
1177
 
            '--no-user-interaction', '-b', self.device],
1178
 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1179
 
            universal_newlines=True,
1180
 
            preexec_fn=lambda: (os.setgid(nobody.pw_gid), os.setuid(nobody.pw_uid)))
1181
 
        out, err = tool_mount.communicate()
1182
 
        self.assertTrue('Error.NotAuthorized' in err, err)
1183
 
 
1184
 
# ----------------------------------------------------------------------------
1185
 
 
1186
 
if __name__ == '__main__':
1187
 
    argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
1188
 
    argparser.add_argument('-l', '--log-file', dest='logfile',
1189
 
            help='write daemon log to a file')
1190
 
    argparser.add_argument('-w', '--no-workarounds',
1191
 
            action="store_true", default=False,
1192
 
            help='Disable workarounds for race conditions in the D-BUS API')
1193
 
    argparser.add_argument('testname', nargs='*',
1194
 
            help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
1195
 
    args = argparser.parse_args()
1196
 
 
1197
 
    workaround_syncs = not args.no_workarounds
1198
 
 
1199
 
    UDisksTestCase.init(logfile=args.logfile)
1200
 
    if args.testname:
1201
 
        tests = unittest.TestLoader().loadTestsFromNames(args.testname,
1202
 
                __import__('__main__'))
1203
 
    else:
1204
 
        tests = unittest.TestLoader().loadTestsFromName('__main__')
1205
 
    if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
1206
 
        sys.exit(0)
1207
 
    else:
1208
 
        sys.exit(1)
1209